Skip to content
This repository has been archived by the owner on Oct 13, 2023. It is now read-only.

Commit

Permalink
Merge pull request #151 from thaJeztah/backport-fix-awslogs
Browse files Browse the repository at this point in the history
[17.06.1] Fix awslogs driver repeating last event - #34292
  • Loading branch information
andrewhsu committed Aug 1, 2017
2 parents 96d8424 + ce7517d commit fd33a51
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 9 deletions.
15 changes: 7 additions & 8 deletions components/engine/daemon/logger/awslogs/cloudwatchlogs.go
Expand Up @@ -384,33 +384,32 @@ func (l *logStream) collectBatch() {
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
}
l.publishBatch(events)
events = events[:0]
case msg, more := <-l.messages:
if !more {
// Flush event buffer
// Flush event buffer and release resources
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
l.publishBatch(events)
events = events[:0]
return
}
if eventBufferTimestamp == 0 {
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
}
unprocessedLine := msg.Line
if l.multilinePattern != nil {
if l.multilinePattern.Match(unprocessedLine) {
// This is a new log event so flush the current eventBuffer to events
if l.multilinePattern.Match(unprocessedLine) || len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
// This is a new log event or we will exceed max bytes per event
// so flush the current eventBuffer to events and reset timestamp
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
// If we will exceed max bytes per event flush the current event buffer before appending
if len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
// Append new line
processedLine := append(unprocessedLine, "\n"...)
eventBuffer = append(eventBuffer, processedLine...)
Expand Down
16 changes: 15 additions & 1 deletion components/engine/daemon/logger/awslogs/cloudwatchlogs_test.go
Expand Up @@ -641,14 +641,28 @@ func TestCollectBatchMultilinePatternMaxEventAge(t *testing.T) {
})

// Fire ticker batchPublishFrequency seconds later
ticks <- time.Now().Add(batchPublishFrequency * time.Second)
ticks <- time.Now().Add(batchPublishFrequency + time.Second)

// Verify single multiline event is flushed after maximum event buffer age (batchPublishFrequency)
argument := <-mockClient.putLogEventsArgument
assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
assert.Equal(t, logline+"\n"+logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")

// Log an event 1 second later
stream.Log(&logger.Message{
Line: []byte(logline),
Timestamp: time.Now().Add(time.Second),
})

// Fire ticker another batchPublishFrequency seconds later
ticks <- time.Now().Add(2*batchPublishFrequency + time.Second)

// Verify the event buffer is truly flushed - we should only receive a single event
argument = <-mockClient.putLogEventsArgument
assert.NotNil(t, argument, "Expected non-nil PutLogEventsInput")
assert.Equal(t, 1, len(argument.LogEvents), "Expected single multiline event")
assert.Equal(t, logline+"\n", *argument.LogEvents[0].Message, "Received incorrect multiline message")
stream.Close()
}

Expand Down

0 comments on commit fd33a51

Please sign in to comment.