/
logs_store.go
154 lines (143 loc) · 4.41 KB
/
logs_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package kubernetes
import (
"bufio"
"context"
"io"
"net/http"
"strings"
"time"
"github.com/brigadecore/brigade-foundations/retries"
"github.com/brigadecore/brigade/v2/apiserver/internal/api"
"github.com/brigadecore/brigade/v2/apiserver/internal/meta"
myk8s "github.com/brigadecore/brigade/v2/internal/kubernetes"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/kubernetes"
)
// logsStore is a Kubernetes-based implementation of the api.LogsStore
// interface.
type logsStore struct {
kubeClient kubernetes.Interface
}
// NewLogsStore returns a Kubernetes-based implementation of the api.LogsStore
// interface. It can stream logs directly from a Worker or Job's underlying pod.
// In practice, this is useful for very near-term retrieval of Worker and Job
// logs without incurring the latency inherent in other implementations that
// rely on a log aggregator having forwarded and stored log entries. This
// implementation will error, however, once the relevant pod has been deleted.
// Callers should be prepared to fall back on another implementation of the
// api.LogsStore interface, with the assumption that by the time a Worker's or
// Job's pod has been deleted, all of its logs have been aggregated and stored.
func NewLogsStore(kubeClient kubernetes.Interface) api.LogsStore {
return &logsStore{
kubeClient: kubeClient,
}
}
func (l *logsStore) StreamLogs(
ctx context.Context,
project api.Project,
event api.Event,
selector api.LogsSelector,
opts api.LogStreamOptions,
) (<-chan api.LogEntry, error) {
podName := podNameFromSelector(event.ID, selector)
req := l.kubeClient.CoreV1().Pods(project.Kubernetes.Namespace).GetLogs(
podName,
&v1.PodLogOptions{
Container: selector.Container,
Timestamps: true,
Follow: opts.Follow,
},
)
// The LogsService only would have called us for a Worker or Job that has
// already moved past the PENDING and STARTING phases. So at this point, the
// only two possibilities are that the Pod exists OR that it DID exist and has
// already blinked out of existence after completion. If it's gone, we just
// return a *meta.ErrNotFound and the LogsService will fall back to the cool
// logs. If it exists, but the target container is still initializing, we
// retry.
var podLogs io.ReadCloser
var err error
if err = retries.ManageRetries(
ctx,
"waiting for container to be initialized",
50, // A generous number of retries. Let the client hang up if they want.
10*time.Second,
func() (bool, error) {
podLogs, err = req.Stream(ctx)
if err != nil {
if statusErr, ok := err.(*k8sErrors.StatusError); ok {
if statusErr.Status().Code == http.StatusNotFound {
return false, &meta.ErrNotFound{ // Don't retry
Type: "Pod",
ID: podName,
}
}
if strings.Contains(
statusErr.Error(),
"is waiting to start: PodInitializing",
) || strings.Contains(
statusErr.Error(),
"is waiting to start: ContainerCreating",
) {
return true, nil // Retry
}
}
// Something else is wrong
return false, errors.Wrapf( // Don't retry
err,
"error opening log stream for pod %q in namespace %q",
podName,
project.Kubernetes.Namespace,
)
}
return false, nil // We got what we wanted
},
); err != nil {
return nil, err
}
logEntryCh := make(chan api.LogEntry)
go func() {
defer podLogs.Close()
defer close(logEntryCh)
buffer := bufio.NewReader(podLogs)
for {
logEntry := api.LogEntry{}
logLine, err := buffer.ReadString('\n')
if err == io.EOF {
break
}
if len(logLine) == 0 {
continue
}
// The last character should be a newline that we don't want, so let's
// remove that
logLine = logLine[:len(logLine)-1]
logLineParts := strings.SplitN(logLine, " ", 2)
if len(logLineParts) == 2 {
timeStr := logLineParts[0]
t, err := time.Parse(time.RFC3339, timeStr)
if err == nil {
logEntry.Time = &t
}
logEntry.Message = logLineParts[1]
} else {
logEntry.Message = logLine
}
select {
case logEntryCh <- logEntry:
case <-ctx.Done():
return
}
}
podLogs.Close()
}()
return logEntryCh, nil
}
func podNameFromSelector(eventID string, selector api.LogsSelector) string {
if selector.Job == "" { // We want worker logs
return myk8s.WorkerPodName(eventID)
}
return myk8s.JobPodName(eventID, selector.Job) // We want job logs
}