Skip to content
This repository has been archived by the owner on Oct 13, 2023. It is now read-only.

[17.06.1] Fix awslogs driver repeating last event - #34292 #151

Merged
merged 1 commit into from Aug 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 7 additions & 8 deletions components/engine/daemon/logger/awslogs/cloudwatchlogs.go
Expand Up @@ -384,33 +384,32 @@ func (l *logStream) collectBatch() {
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
}
l.publishBatch(events)
events = events[:0]
case msg, more := <-l.messages:
if !more {
// Flush event buffer
// Flush event buffer and release resources
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
l.publishBatch(events)
events = events[:0]
return
}
if eventBufferTimestamp == 0 {
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
}
unprocessedLine := msg.Line
if l.multilinePattern != nil {
if l.multilinePattern.Match(unprocessedLine) {
// This is a new log event so flush the current eventBuffer to events
if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
// If we will exceed max bytes per event flush the current event buffer before appending
if len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
// Append new line
processedLine := append(unprocessedLine, "\n"...)
eventBuffer = append(eventBuffer, processedLine...)
Expand Down
16 changes: 15 additions & 1 deletion components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go
Expand Up @@ -641,14 +641,28 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
})

// Fire ticker batchPublishFrequency seconds later
ticks <- time.Now().Add(batchPublishFrequency * time.Second)
ticks <- time.Now().Add(batchPublishFrequency + time.Second)

// Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
argument := <-mockClient.putLogEventsArgument
assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")

// Log an event 1 second later
stream.Log(&logger.Message{
Line: []byte(logline),
Timestamp: time.Now().Add(time.Second),
})

// Fire ticker another batchPublishFrequency seconds later
ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)

// Verify the event buffer is truly flushed - we should only receive a single event
argument = <-mockClient.putLogEventsArgument
assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
stream.Close()
}

Expand Down