Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Improvements to trace writer and payload queue behaviour. #396

Merged
merged 1 commit into from
Mar 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions writer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error {
s.queuing = true
}

// Start by discarding payloads that are too old, freeing up memory
s.discardOldPayloads()

for s.conf.MaxQueuedPayloads > 0 && s.queuedPayloads.Len() >= s.conf.MaxQueuedPayloads {
log.Debugf("Dropping existing payload because max queued payloads reached: %d", s.conf.MaxQueuedPayloads)
if _, err := s.dropOldestPayload("max queued payloads reached"); err != nil {
Expand Down Expand Up @@ -287,12 +290,12 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error {
}

func (s *QueuablePayloadSender) flushQueue() error {
if s.NumQueuedPayloads() == 0 {
return nil
}

log.Debugf("Attempting to flush queue with %d payloads", s.NumQueuedPayloads())

// Start by discarding payloads that are too old
s.discardOldPayloads()

// For the remaining ones, try to send them one by one
var next *list.Element
for e := s.queuedPayloads.Front(); e != nil; e = next {
payload := e.Value.(*Payload)
Expand All @@ -306,7 +309,6 @@ func (s *QueuablePayloadSender) flushQueue() error {
retryNum, delay := s.backoffTimer.ScheduleRetry(err)
log.Debugf("Got retriable error. Retrying flush later: retry=%d, delay=%s, err=%v",
retryNum, delay, err)
s.discardOldPayloads()
s.notifyRetry(payload, err, delay, retryNum)
// Don't try to send following. We'll flush all later.
return err
Expand Down
70 changes: 70 additions & 0 deletions writer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/DataDog/datadog-trace-agent/backoff"
"github.com/DataDog/datadog-trace-agent/fixtures"
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -468,3 +469,72 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) {
assert.Contains(monitor.FailureEvents[0].Error.Error(), "older than max age",
"Monitor failure event should mention correct reason for error")
}

func TestQueuablePayloadSender_RetryOfTooOldQueue(t *testing.T) {
assert := assert.New(t)

// Given an endpoint that continuously throws out retriable errors
flakyEndpoint := &testEndpoint{}
flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint})

// And a backoff timer that triggers every 100ms
testBackoffTimer := backoff.NewCustomTimer(func(numRetries int, err error) time.Duration {
return 100 * time.Millisecond
})

// And a queuable sender using said endpoint and timer and with a meager max age of 200ms
conf := writerconfig.DefaultQueuablePayloadSenderConf()
conf.MaxAge = 200 * time.Millisecond
queuableSender := NewCustomQueuablePayloadSender(flakyEndpoint, conf)
queuableSender.backoffTimer = testBackoffTimer
syncBarrier := make(chan interface{})
queuableSender.syncBarrier = syncBarrier

// And a test monitor for that sender
monitor := newTestPayloadSenderMonitor(queuableSender)

monitor.Start()
queuableSender.Start()

// When sending two payloads one after the other
payload1 := RandomPayload()
queuableSender.Send(payload1)
payload2 := RandomPayload()
queuableSender.Send(payload2)

// And then sleeping for 500ms
time.Sleep(600 * time.Millisecond)

// Then, eventually, during one of the retries those 2 payloads should end up being discarded and our queue
// will end up with a size of 0 and a flush call will be made for a queue of size 0

// Then send a third payload
payload3 := RandomPayload()
queuableSender.Send(payload3)

// Wait for payload to be queued
syncBarrier <- nil

// Then, when the endpoint finally works
flakyEndpoint.SetError(nil)

// Wait for a retry
time.Sleep(200 * time.Millisecond)

// Then we should have no queued payloads
assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads")

// When we stop the sender
queuableSender.Stop()
monitor.Stop()

// Then endpoint should have received only payload3. Because payload1 and payload2 were too old after the failed
// retry (first TriggerTick).
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3")

// And monitor should have received failed events for payload1 and payload2 with correct reason
assert.Equal([]Payload{*payload1, *payload2}, monitor.FailurePayloads(),
"Monitor should agree with endpoint on failed payloads")
assert.Contains(monitor.FailureEvents[0].Error.Error(), "older than max age",
"Monitor failure event should mention correct reason for error")
}
1 change: 1 addition & 0 deletions writer/service_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (w *ServiceWriter) flush() {
data, err := model.EncodeServicesPayload(w.serviceBuffer)
if err != nil {
log.Errorf("error while encoding service payload: %v", err)
w.serviceBuffer = make(model.ServicesMetadata)
return
}

Expand Down
14 changes: 9 additions & 5 deletions writer/service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@ func TestServiceWriter_ServiceHandling(t *testing.T) {
// When sending it
serviceChannel <- metadata1

// And then immediately sending another set of service metadata
metadata2 := fixtures.RandomServices(10, 10)
serviceChannel <- metadata2

// And then waiting for more than flush period
time.Sleep(2 * serviceWriter.conf.FlushPeriod)

// And then sending another set of service metadata
metadata2 := fixtures.RandomServices(10, 10)
serviceChannel <- metadata2
// And then sending a third set of service metadata
metadata3 := fixtures.RandomServices(10, 10)
serviceChannel <- metadata3

// And stopping service writer before flush ticker ticks (should still flush on exit though)
close(serviceChannel)
Expand All @@ -64,8 +68,8 @@ func TestServiceWriter_ServiceHandling(t *testing.T) {
successPayloads := testEndpoint.SuccessPayloads()

assert.Len(successPayloads, 2, "There should be 2 payloads")
assertMetadata(assert, expectedHeaders, metadata1, successPayloads[0])
assertMetadata(assert, expectedHeaders, mergedMetadata, successPayloads[1])
assertMetadata(assert, expectedHeaders, mergedMetadata, successPayloads[0])
assertMetadata(assert, expectedHeaders, metadata3, successPayloads[1])
}

func TestServiceWriter_UpdateInfoHandling(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions writer/trace_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (w *TraceWriter) flush() {
serialized, err := proto.Marshal(&tracePayload)
if err != nil {
log.Errorf("failed to serialize trace payload, data got dropped, err: %s", err)
w.resetBuffer()
return
}

Expand Down Expand Up @@ -244,7 +245,10 @@ func (w *TraceWriter) flush() {

log.Debugf("flushing traces=%v transactions=%v", len(w.traces), len(w.transactions))
w.payloadSender.Send(payload)
w.resetBuffer()
}

func (w *TraceWriter) resetBuffer() {
// Reset traces
w.traces = w.traces[:0]
w.transactions = w.transactions[:0]
Expand Down