/
buffered_logger.go
373 lines (326 loc) · 11.5 KB
/
buffered_logger.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package logger
import (
"context"
"fmt"
"io"
"sync"
"sync/atomic"
"time"
"github.com/aws/shim-loggers-for-containerd/debug"
types "github.com/docker/docker/api/types/backend"
dockerlogger "github.com/docker/docker/daemon/logger"
"golang.org/x/sync/errgroup"
)
const (
expectedNumOfPipes = 2
// This value is adopted from Docker:
// https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L140
ringCap = 1000
)
// bufferedLogger is a wrapper of underlying log driver and an intermediate ring
// buffer between container pipes and underlying log driver.
type bufferedLogger struct {
l LogDriver
buffer *ringBuffer
// bufReadSizeInBytes determines how many bytes to read at a time from the source input when
// sending data to the ringBuffer.
bufReadSizeInBytes int
containerID string
}
// Adopted from https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L128
// as this struct is not exported.
type ringBuffer struct {
// A mutex lock is used here when writing/reading log messages from the queue
// as there exists three go routines accessing the buffer.
lock sync.Mutex
// A condition variable wait is used here to notify goroutines that get access to
// the buffer should wait or continue.
wait *sync.Cond
// current total bytes stored in the buffer
curSizeInBytes int
// maximum bytes capacity provided by the buffer
maxSizeInBytes int
// queue saves all the log messages read from pipes exposed by containerd, and
// is consumed by underlying log driver.
queue []*dockerlogger.Message
// closedPipesCount is the number of closed container pipes for a single container.
closedPipesCount int
// isClosed indicates if ring buffer is closed.
isClosed bool
}
// NewBufferedLogger creates a logger with the provided LoggerOpt,
// a buffer with customized max size and a channel monitor if stdout
// and stderr pipes are closed.
func NewBufferedLogger(l LogDriver, bufferReadSize int, maxBufferSize int, containerID string) LogDriver {
return &bufferedLogger{
l: l,
buffer: newLoggerBuffer(maxBufferSize),
bufReadSizeInBytes: bufferReadSize,
containerID: containerID,
}
}
// newLoggerBuffer creates a buffer that stores messages which are
// from container and consumed by sub-level log drivers.
func newLoggerBuffer(maxBufferSize int) *ringBuffer {
rb := &ringBuffer{
maxSizeInBytes: maxBufferSize,
queue: make([]*dockerlogger.Message, 0, ringCap),
closedPipesCount: 0,
isClosed: false,
}
rb.wait = sync.NewCond(&rb.lock)
return rb
}
// Start starts the non-blocking mode logger.
func (bl *bufferedLogger) Start(
ctx context.Context,
cleanupTime *time.Duration,
ready func() error,
) error {
pipeNameToPipe, err := bl.l.GetPipes()
if err != nil {
return err
}
var logWG sync.WaitGroup
logWG.Add(1)
stopTracingLogRoutingChan := make(chan bool, 1)
atomic.StoreUint64(&bytesReadFromSrc, 0)
atomic.StoreUint64(&bytesSentToDst, 0)
atomic.StoreUint64(&numberOfNewLineChars, 0)
go func() {
startTracingLogRouting(bl.containerID, stopTracingLogRoutingChan)
logWG.Done()
}()
defer func() {
debug.SendEventsToLog(DaemonName, "Sending signal to stop the ticker.", debug.DEBUG, 0)
stopTracingLogRoutingChan <- true
logWG.Wait()
}()
errGroup, ctx := errgroup.WithContext(ctx)
// Start the goroutine of underlying log driver to consume logs from ring buffer and
// send logs to destination when there's any.
errGroup.Go(func() error {
debug.SendEventsToLog(DaemonName, "Starting consuming logs from ring buffer", debug.INFO, 0)
return bl.sendLogMessagesToDestination(cleanupTime)
})
// Start reading logs from container pipes.
for pn, p := range pipeNameToPipe {
// Copy pn and p to new variables source and pipe, accordingly.
source := pn
pipe := p
errGroup.Go(func() error {
debug.SendEventsToLog(DaemonName, fmt.Sprintf("Reading logs from pipe %s", source), debug.DEBUG, 0)
logErr := bl.saveLogMessagesToRingBuffer(ctx, pipe, source)
if logErr != nil {
err := fmt.Errorf("failed to send logs from pipe %s: %w", source, logErr)
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
return nil
})
}
// Signal that the container is ready to be started
if err := ready(); err != nil {
return fmt.Errorf("failed to check container ready status: %w", err)
}
// Wait() will return the first error it receives.
return errGroup.Wait()
}
// saveLogMessagesToRingBuffer saves container log messages to ring buffer.
func (bl *bufferedLogger) saveLogMessagesToRingBuffer(
ctx context.Context,
f io.Reader,
source string,
) error {
if err := bl.Read(ctx, f, source, bl.bufReadSizeInBytes, bl.saveSingleLogMessageToRingBuffer); err != nil {
err := fmt.Errorf("failed to read logs from %s pipe: %w", source, err)
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
// No messages in the pipe, send signal to closed pipe channel.
debug.SendEventsToLog(DaemonName, fmt.Sprintf("Pipe %s is closed", source), debug.INFO, 1)
bl.buffer.closedPipesCount++
// If both container pipes are closed, wake up the Dequeue goroutine which is waiting on wait.
if bl.buffer.closedPipesCount == expectedNumOfPipes {
bl.buffer.isClosed = true
bl.buffer.wait.Broadcast()
}
return nil
}
// Read reads log messages from container pipe and saves them to ring buffer line by line.
func (bl *bufferedLogger) Read(
ctx context.Context,
pipe io.Reader,
source string,
bufferSizeInBytes int,
sendLogMsgToDest sendLogToDestFunc,
) error {
return bl.l.Read(ctx, pipe, source, bufferSizeInBytes, sendLogMsgToDest)
}
// saveSingleLogMessageToRingBuffer enqueues a single line of log message to ring buffer.
func (bl *bufferedLogger) saveSingleLogMessageToRingBuffer(
line []byte,
source string,
isPartialMsg, isLastPartial bool,
partialID string,
partialOrdinal int,
msgTimestamp time.Time,
) error {
if debug.Verbose {
debug.SendEventsToLog(bl.containerID,
fmt.Sprintf("[Pipe %s] Scanned message: %s", source, string(line)),
debug.DEBUG, 0)
}
message := newMessage(line, source, msgTimestamp)
if isPartialMsg {
message.PLogMetaData = &types.PartialLogMetaData{ID: partialID, Ordinal: partialOrdinal, Last: isLastPartial}
}
err := bl.buffer.Enqueue(message)
if err != nil {
return fmt.Errorf("failed to save logs to buffer: %w", err)
}
return nil
}
// sendLogMessagesToDestination consumes logs from ring buffer and use the
// underlying log driver to send logs to destination.
func (bl *bufferedLogger) sendLogMessagesToDestination(cleanupTime *time.Duration) error {
// Keep sending log message to destination defined by the underlying log driver until
// the ring buffer is closed.
for !bl.buffer.isClosed {
if err := bl.sendLogMessageToDestination(); err != nil {
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
}
// If both container pipes are closed, flush messages left in ring buffer.
debug.SendEventsToLog(DaemonName, "All pipes are closed, flushing buffer.", debug.INFO, 0)
if err := bl.flushMessages(); err != nil {
debug.SendEventsToLog(DaemonName, err.Error(), debug.ERROR, 1)
return err
}
// Sleep sometime to let shim logger clean up, for example, to allow enough time for the last
// few log messages be flushed to destination like CloudWatch.
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("Sleeping %s for cleanning up.", cleanupTime.String()),
debug.INFO, 0)
time.Sleep(*cleanupTime)
return nil
}
// sendLogMessageToDestination dequeues a single log message from buffer and sends to destination.
func (bl *bufferedLogger) sendLogMessageToDestination() error {
msg, err := bl.buffer.Dequeue()
// Do an early return if ring buffer is closed.
if bl.buffer.isClosed {
return nil
}
if err != nil {
return fmt.Errorf("failed to read logs from buffer: %w", err)
}
err = bl.Log(msg)
if err != nil {
return fmt.Errorf("failed to send logs to destination: %w", err)
}
return nil
}
// flushMessages flushes all the messages left in the ring buffer to
// destination after container pipes are closed.
func (bl *bufferedLogger) flushMessages() error {
messages := bl.buffer.Flush()
for _, msg := range messages {
err := bl.Log(msg)
if err != nil {
return fmt.Errorf("unable to flush the remaining messages to destination: %w", err)
}
}
return nil
}
// Log lets underlying log driver send logs to destination.
func (bl *bufferedLogger) Log(message *dockerlogger.Message) error {
if debug.Verbose {
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("[BUFFER] Sending message: %s", string(message.Line)),
debug.DEBUG, 0)
}
return bl.l.Log(message)
}
// GetPipes gets pipes of container and its name that exposed by containerd.
func (bl *bufferedLogger) GetPipes() (map[string]io.Reader, error) {
return bl.l.GetPipes()
}
// Adopted from https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L155
// as messageRing struct is not exported.
// Enqueue adds a single log message to the tail of intermediate buffer.
func (b *ringBuffer) Enqueue(msg *dockerlogger.Message) error {
b.lock.Lock()
defer b.lock.Unlock()
lineSizeInBytes := len(msg.Line)
// If there is already at least one log message in the queue and not enough space left
// for the new coming log message to take up, drop this log message. Otherwise, save this
// message to ring buffer anyway.
if len(b.queue) > 0 &&
b.curSizeInBytes+lineSizeInBytes > b.maxSizeInBytes {
if debug.Verbose {
debug.SendEventsToLog(DaemonName,
"buffer is full/message is too long, waiting for available bytes",
debug.DEBUG, 0)
debug.SendEventsToLog(DaemonName,
fmt.Sprintf("message size: %d, current buffer size: %d, max buffer size %d",
lineSizeInBytes,
b.curSizeInBytes,
b.maxSizeInBytes),
debug.DEBUG, 0)
}
// Wake up "Dequeue" or the other "Enqueue" go routine (called by the other pipe)
// waiting on current mutex lock if there's any
b.wait.Signal()
return nil
}
b.queue = append(b.queue, msg)
b.curSizeInBytes += lineSizeInBytes
// Wake up "Dequeue" or the other "Enqueue" go routine (called by the other pipe)
// waiting on current mutex lock if there's any
b.wait.Signal()
return nil
}
// Adopted from https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L179
// as messageRing struct is not exported.
// Dequeue gets a line of log message from the head of intermediate buffer.
func (b *ringBuffer) Dequeue() (*dockerlogger.Message, error) {
b.lock.Lock()
defer b.lock.Unlock()
// If there is no log yet in the buffer, and the ring buffer is still open, wait
// suspends current go routine.
for len(b.queue) == 0 && !b.isClosed {
if debug.Verbose {
debug.SendEventsToLog(DaemonName,
"No messages in queue, waiting...",
debug.DEBUG, 0)
}
b.wait.Wait()
}
// Directly return if ring buffer is closed.
if b.isClosed {
return nil, nil //nolint: nilnil // swallow the error
}
// Get and remove the oldest message saved in buffer/queue from head and update
// the current used bytes of buffer.
msg := b.queue[0]
b.queue = b.queue[1:]
b.curSizeInBytes -= len(msg.Line)
return msg, nil
}
// Adopted from https://github.com/moby/moby/blob/master/daemon/logger/ring.go#L215
// as messageRing struct is not exported.
// Flush flushes all the messages left in the buffer and clear queue.
func (b *ringBuffer) Flush() []*dockerlogger.Message {
b.lock.Lock()
defer b.lock.Unlock()
if len(b.queue) == 0 {
return make([]*dockerlogger.Message, 0)
}
messages := b.queue
b.queue = make([]*dockerlogger.Message, 0)
return messages
}