Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
Improvements to trace/service writer and payload queue behaviour.
Browse files Browse the repository at this point in the history
* Payload queue could become stuck if queued payloads all expired due
  to max age before next flush (queue would become of size 0, so we'd
  never leave the queue state but we'd also never schedule a new retry).
* When we can't serialize a trace or service payload, we should reset
  our buffers because otherwise we'll probably end up in a serialization
  error loop and no progress will be made.
* Fixed a flaky test in service writer after logic changes to the
  buffering mechanism.
  • Loading branch information
AlexJF committed Mar 9, 2018
1 parent 0638620 commit 7633fd1
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 10 deletions.
12 changes: 7 additions & 5 deletions writer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error {
s.queuing = true
}

// Start by discarding payloads that are too old, freeing up memory
s.discardOldPayloads()

for s.conf.MaxQueuedPayloads > 0 && s.queuedPayloads.Len() >= s.conf.MaxQueuedPayloads {
log.Debugf("Dropping existing payload because max queued payloads reached: %d", s.conf.MaxQueuedPayloads)
if _, err := s.dropOldestPayload("max queued payloads reached"); err != nil {
Expand Down Expand Up @@ -287,12 +290,12 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error {
}

func (s *QueuablePayloadSender) flushQueue() error {
if s.NumQueuedPayloads() == 0 {
return nil
}

log.Debugf("Attempting to flush queue with %d payloads", s.NumQueuedPayloads())

// Start by discarding payloads that are too old
s.discardOldPayloads()

// For the remaining ones, try to send them one by one
var next *list.Element
for e := s.queuedPayloads.Front(); e != nil; e = next {
payload := e.Value.(*Payload)
Expand All @@ -306,7 +309,6 @@ func (s *QueuablePayloadSender) flushQueue() error {
retryNum, delay := s.backoffTimer.ScheduleRetry(err)
log.Debugf("Got retriable error. Retrying flush later: retry=%d, delay=%s, err=%v",
retryNum, delay, err)
s.discardOldPayloads()
s.notifyRetry(payload, err, delay, retryNum)
// Don't try to send following. We'll flush all later.
return err
Expand Down
70 changes: 70 additions & 0 deletions writer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/DataDog/datadog-trace-agent/backoff"
"github.com/DataDog/datadog-trace-agent/fixtures"
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -468,3 +469,72 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) {
assert.Contains(monitor.FailureEvents[0].Error.Error(), "older than max age",
"Monitor failure event should mention correct reason for error")
}

func TestQueuablePayloadSender_RetryOfTooOldQueue(t *testing.T) {
assert := assert.New(t)

// Given an endpoint that continuously throws out retriable errors
flakyEndpoint := &testEndpoint{}
flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint})

// And a backoff timer that triggers every 100ms
testBackoffTimer := backoff.NewCustomTimer(func(numRetries int, err error) time.Duration {
return 100 * time.Millisecond
})

// And a queuable sender using said endpoint and timer and with a meager max age of 200ms
conf := writerconfig.DefaultQueuablePayloadSenderConf()
conf.MaxAge = 200 * time.Millisecond
queuableSender := NewCustomQueuablePayloadSender(flakyEndpoint, conf)
queuableSender.backoffTimer = testBackoffTimer
syncBarrier := make(chan interface{})
queuableSender.syncBarrier = syncBarrier

// And a test monitor for that sender
monitor := newTestPayloadSenderMonitor(queuableSender)

monitor.Start()
queuableSender.Start()

// When sending two payloads one after the other
payload1 := RandomPayload()
queuableSender.Send(payload1)
payload2 := RandomPayload()
queuableSender.Send(payload2)

// And then sleeping for 500ms
time.Sleep(600 * time.Millisecond)

// Then, eventually, during one of the retries those 2 payloads should end up being discarded and our queue
// will end up with a size of 0 and a flush call will be made for a queue of size 0

// Then send a third payload
payload3 := RandomPayload()
queuableSender.Send(payload3)

// Wait for payload to be queued
syncBarrier <- nil

// Then, when the endpoint finally works
flakyEndpoint.SetError(nil)

// Wait for a retry
time.Sleep(200 * time.Millisecond)

// Then we should have no queued payloads
assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads")

// When we stop the sender
queuableSender.Stop()
monitor.Stop()

// Then endpoint should have received only payload3. Because payload1 and payload2 were too old after the failed
// retry (first TriggerTick).
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3")

// And monitor should have received failed events for payload1 and payload2 with correct reason
assert.Equal([]Payload{*payload1, *payload2}, monitor.FailurePayloads(),
"Monitor should agree with endpoint on failed payloads")
assert.Contains(monitor.FailureEvents[0].Error.Error(), "older than max age",
"Monitor failure event should mention correct reason for error")
}
1 change: 1 addition & 0 deletions writer/service_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ func (w *ServiceWriter) flush() {
data, err := model.EncodeServicesPayload(w.serviceBuffer)
if err != nil {
log.Errorf("error while encoding service payload: %v", err)
w.serviceBuffer = make(model.ServicesMetadata)
return
}

Expand Down
14 changes: 9 additions & 5 deletions writer/service_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,16 @@ func TestServiceWriter_ServiceHandling(t *testing.T) {
// When sending it
serviceChannel <- metadata1

// And then immediately sending another set of service metadata
metadata2 := fixtures.RandomServices(10, 10)
serviceChannel <- metadata2

// And then waiting for more than flush period
time.Sleep(2 * serviceWriter.conf.FlushPeriod)

// And then sending another set of service metadata
metadata2 := fixtures.RandomServices(10, 10)
serviceChannel <- metadata2
// And then sending a third set of service metadata
metadata3 := fixtures.RandomServices(10, 10)
serviceChannel <- metadata3

// And stopping service writer before flush ticker ticks (should still flush on exit though)
close(serviceChannel)
Expand All @@ -64,8 +68,8 @@ func TestServiceWriter_ServiceHandling(t *testing.T) {
successPayloads := testEndpoint.SuccessPayloads()

assert.Len(successPayloads, 2, "There should be 2 payloads")
assertMetadata(assert, expectedHeaders, metadata1, successPayloads[0])
assertMetadata(assert, expectedHeaders, mergedMetadata, successPayloads[1])
assertMetadata(assert, expectedHeaders, mergedMetadata, successPayloads[0])
assertMetadata(assert, expectedHeaders, metadata3, successPayloads[1])
}

func TestServiceWriter_UpdateInfoHandling(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions writer/trace_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (w *TraceWriter) flush() {
serialized, err := proto.Marshal(&tracePayload)
if err != nil {
log.Errorf("failed to serialize trace payload, data got dropped, err: %s", err)
w.resetBuffer()
return
}

Expand Down Expand Up @@ -244,7 +245,10 @@ func (w *TraceWriter) flush() {

log.Debugf("flushing traces=%v transactions=%v", len(w.traces), len(w.transactions))
w.payloadSender.Send(payload)
w.resetBuffer()
}

func (w *TraceWriter) resetBuffer() {
// Reset traces
w.traces = w.traces[:0]
w.transactions = w.transactions[:0]
Expand Down

0 comments on commit 7633fd1

Please sign in to comment.