-
Notifications
You must be signed in to change notification settings - Fork 199
/
logs.go
196 lines (177 loc) · 5.31 KB
/
logs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubernetes
import (
"bufio"
"context"
"errors"
"fmt"
"io"
"os"
"strings"
"time"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"github.com/dapr/cli/pkg/print"
)
const (
daprdContainerName = "daprd"
appIDContainerArgName = "--app-id"
// number of retries when trying to list pods for getting logs.
maxListingRetry = 10
// delay between retries of pod listing.
listingDelay = 200 * time.Microsecond
// delay before retrying for getting logs.
streamingDelay = 100 * time.Millisecond
)
// Logs fetches Dapr sidecar logs from Kubernetes.
func Logs(appID, podName, namespace string) error {
client, err := Client()
if err != nil {
return err
}
if namespace == "" {
namespace = corev1.NamespaceDefault
}
pods, err := ListPods(client, namespace, nil)
if err != nil {
return fmt.Errorf("could not get logs %w", err)
}
if podName == "" {
// no pod name specified. in case of multiple pods, the first one will be selected.
var foundDaprPod bool
for _, pod := range pods.Items {
if foundDaprPod {
break
}
for _, container := range pod.Spec.Containers {
if container.Name == daprdContainerName {
// find app ID.
for i, arg := range container.Args {
if arg == appIDContainerArgName {
id := container.Args[i+1]
if id == appID {
podName = pod.Name
foundDaprPod = true
break
}
}
}
}
}
}
if !foundDaprPod {
return fmt.Errorf("could not get logs. Please check app-id (%s) and namespace (%s)", appID, namespace)
}
}
getLogsRequest := client.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{Container: daprdContainerName, Follow: false})
logStream, err := getLogsRequest.Stream(context.TODO())
if err != nil {
return fmt.Errorf("could not get logs. Please check pod-name (%s). Error - %w", podName, err)
}
defer logStream.Close()
_, err = io.Copy(os.Stdout, logStream)
if err != nil {
return fmt.Errorf("could not get logs %w", err)
}
return nil
}
// streamContainerLogsToDisk streams all containers logs for the given selector to a given disk directory.
func streamContainerLogsToDisk(ctx context.Context, appID string, appLogWriter, daprdLogWriter io.Writer, podClient v1.PodInterface) error {
var err error
var podList *corev1.PodList
counter := 0
for {
podList, err = getPods(ctx, appID, podClient)
if err != nil {
return fmt.Errorf("error listing the pod with label %s=%s: %w", daprAppIDKey, appID, err)
}
if len(podList.Items) != 0 {
break
}
counter++
if counter == maxListingRetry {
return fmt.Errorf("error getting logs: error listing the pod with label %s=%s after %d retires", daprAppIDKey, appID, maxListingRetry)
}
// Retry after a delay.
time.Sleep(listingDelay)
}
for _, pod := range podList.Items {
print.InfoStatusEvent(os.Stdout, "Streaming logs for containers in pod %q", pod.GetName())
for _, container := range pod.Spec.Containers {
fileWriter := daprdLogWriter
if container.Name != daprdContainerName {
fileWriter = appLogWriter
}
// create a go routine for each container to stream logs into file/console.
go func(pod, containerName, appID string, fileWriter io.Writer) {
loop:
for {
req := podClient.GetLogs(pod, &corev1.PodLogOptions{
Container: containerName,
Follow: true,
})
stream, err := req.Stream(ctx)
if err != nil {
switch {
case strings.Contains(err.Error(), "Pending"):
// Retry after a delay.
time.Sleep(streamingDelay)
continue loop
case strings.Contains(err.Error(), "ContainerCreating"):
// Retry after a delay.
time.Sleep(streamingDelay)
continue loop
case errors.Is(err, context.Canceled):
return
default:
return
}
}
defer stream.Close()
if containerName != daprdContainerName {
streamScanner := bufio.NewScanner(stream)
for streamScanner.Scan() {
fmt.Fprintln(fileWriter, print.Blue(fmt.Sprintf("== APP - %s == %s", appID, streamScanner.Text())))
}
} else {
_, err = io.Copy(fileWriter, stream)
if err != nil {
switch {
case errors.Is(err, context.Canceled):
return
default:
return
}
}
}
return
}
}(pod.GetName(), container.Name, appID, fileWriter)
}
}
return nil
}
func getPods(ctx context.Context, appID string, podClient v1.PodInterface) (*corev1.PodList, error) {
listCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
labelSelector := fmt.Sprintf("%s=%s", daprAppIDKey, appID)
podList, err := podClient.List(listCtx, metav1.ListOptions{
LabelSelector: labelSelector,
})
cancel()
if err != nil {
return nil, err
}
return podList, nil
}