Skip to content
This repository has been archived by the owner on Aug 30, 2019. It is now read-only.

Commit

Permalink
Improvements to trace writer and payload queue behaviour.
Browse files Browse the repository at this point in the history
* Payload queue could become stuck if queued payloads all expired due
  to max age before next flush (queue would become of size 0, so we'd
  never leave the queue state but we'd also never schedule a new retry).
* When we can't serialize a trace payload, we should reset our buffers
  because otherwise we'll probably end up in a serialization error
  loop and no progress will be made.
  • Loading branch information
AlexJF committed Mar 9, 2018
1 parent 0638620 commit 80a3929
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 5 deletions.
12 changes: 7 additions & 5 deletions writer/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,9 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error {
s.queuing = true
}

// Start by discarding payloads that are too old, freeing up memory
s.discardOldPayloads()

for s.conf.MaxQueuedPayloads > 0 && s.queuedPayloads.Len() >= s.conf.MaxQueuedPayloads {
log.Debugf("Dropping existing payload because max queued payloads reached: %d", s.conf.MaxQueuedPayloads)
if _, err := s.dropOldestPayload("max queued payloads reached"); err != nil {
Expand Down Expand Up @@ -287,12 +290,12 @@ func (s *QueuablePayloadSender) enqueue(payload *Payload) error {
}

func (s *QueuablePayloadSender) flushQueue() error {
if s.NumQueuedPayloads() == 0 {
return nil
}

log.Debugf("Attempting to flush queue with %d payloads", s.NumQueuedPayloads())

// Start by discarding payloads that are too old
s.discardOldPayloads()

// For the remaining ones, try to send them one by one
var next *list.Element
for e := s.queuedPayloads.Front(); e != nil; e = next {
payload := e.Value.(*Payload)
Expand All @@ -306,7 +309,6 @@ func (s *QueuablePayloadSender) flushQueue() error {
retryNum, delay := s.backoffTimer.ScheduleRetry(err)
log.Debugf("Got retriable error. Retrying flush later: retry=%d, delay=%s, err=%v",
retryNum, delay, err)
s.discardOldPayloads()
s.notifyRetry(payload, err, delay, retryNum)
// Don't try to send following. We'll flush all later.
return err
Expand Down
70 changes: 70 additions & 0 deletions writer/payload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/DataDog/datadog-trace-agent/backoff"
"github.com/DataDog/datadog-trace-agent/fixtures"
writerconfig "github.com/DataDog/datadog-trace-agent/writer/config"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -468,3 +469,72 @@ func TestQueuablePayloadSender_MaxAge(t *testing.T) {
assert.Contains(monitor.FailureEvents[0].Error.Error(), "older than max age",
"Monitor failure event should mention correct reason for error")
}

func TestQueuablePayloadSender_RetryOfTooOldQueue(t *testing.T) {
assert := assert.New(t)

// Given an endpoint that continuously throws out retriable errors
flakyEndpoint := &testEndpoint{}
flakyEndpoint.SetError(&RetriableError{err: fmt.Errorf("bleh"), endpoint: flakyEndpoint})

// And a backoff timer that triggers every 100ms
testBackoffTimer := backoff.NewCustomTimer(func(numRetries int, err error) time.Duration {
return 100 * time.Millisecond
})

// And a queuable sender using said endpoint and timer and with a meager max age of 200ms
conf := writerconfig.DefaultQueuablePayloadSenderConf()
conf.MaxAge = 200 * time.Millisecond
queuableSender := NewCustomQueuablePayloadSender(flakyEndpoint, conf)
queuableSender.backoffTimer = testBackoffTimer
syncBarrier := make(chan interface{})
queuableSender.syncBarrier = syncBarrier

// And a test monitor for that sender
monitor := newTestPayloadSenderMonitor(queuableSender)

monitor.Start()
queuableSender.Start()

// When sending two payloads one after the other
payload1 := RandomPayload()
queuableSender.Send(payload1)
payload2 := RandomPayload()
queuableSender.Send(payload2)

// And then sleeping for 500ms
time.Sleep(600 * time.Millisecond)

// Then, eventually, during one of the retries those 2 payloads should end up being discarded and our queue
// will end up with a size of 0 and a flush call will be made for a queue of size 0

// Then send a third payload
payload3 := RandomPayload()
queuableSender.Send(payload3)

// Wait for payload to be queued
syncBarrier <- nil

// Then, when the endpoint finally works
flakyEndpoint.SetError(nil)

// Wait for a retry
time.Sleep(200 * time.Millisecond)

// Then we should have no queued payloads
assert.Equal(0, queuableSender.NumQueuedPayloads(), "We should have no queued payloads")

// When we stop the sender
queuableSender.Stop()
monitor.Stop()

// Then endpoint should have received only payload3. Because payload1 and payload2 were too old after the failed
// retry (first TriggerTick).
assert.Equal([]Payload{*payload3}, flakyEndpoint.SuccessPayloads(), "Endpoint should have received only payload 3")

// And monitor should have received failed events for payload1 and payload2 with correct reason
assert.Equal([]Payload{*payload1, *payload2}, monitor.FailurePayloads(),
"Monitor should agree with endpoint on failed payloads")
assert.Contains(monitor.FailureEvents[0].Error.Error(), "older than max age",
"Monitor failure event should mention correct reason for error")
}
4 changes: 4 additions & 0 deletions writer/trace_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ func (w *TraceWriter) flush() {
serialized, err := proto.Marshal(&tracePayload)
if err != nil {
log.Errorf("failed to serialize trace payload, data got dropped, err: %s", err)
w.resetBuffer()
return
}

Expand Down Expand Up @@ -244,7 +245,10 @@ func (w *TraceWriter) flush() {

log.Debugf("flushing traces=%v transactions=%v", len(w.traces), len(w.transactions))
w.payloadSender.Send(payload)
w.resetBuffer()
}

func (w *TraceWriter) resetBuffer() {
// Reset traces
w.traces = w.traces[:0]
w.transactions = w.transactions[:0]
Expand Down

0 comments on commit 80a3929

Please sign in to comment.