Skip to content

Commit

Permalink
add a test for backoff behaviour
Browse files Browse the repository at this point in the history
  • Loading branch information
richardstartin committed Jun 30, 2020
1 parent 9dc0378 commit e2f46a9
Showing 1 changed file with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package datadog.trace.api.writer
import com.timgroup.statsd.StatsDClient
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.common.writer.DDAgentWriter
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.DDAgentApi
import datadog.trace.common.writer.ddagent.DispatchingDisruptor
import datadog.trace.common.writer.ddagent.Monitor
import datadog.trace.common.writer.ddagent.MsgPackStatefulSerializer
import datadog.trace.common.writer.ddagent.TraceBuffer
Expand All @@ -18,6 +19,7 @@ import org.msgpack.core.buffer.ArrayBufferOutput
import spock.lang.Retry
import spock.lang.Timeout

import java.util.concurrent.Executors
import java.util.concurrent.Phaser
import java.util.concurrent.Semaphore
import java.util.concurrent.TimeUnit
Expand Down Expand Up @@ -672,6 +674,50 @@ class DDAgentWriterTest extends DDSpecification {
writer.close()
}

def "test backed up buffer"() {
setup:
System.gc() // seems to reduce false negatives
def minimalTrace = createMinimalTrace()
AtomicInteger backedUp = new AtomicInteger(0)
def serializer = Spy(new MsgPackStatefulSerializer())
def monitor = Mock(Monitor)
def api = Mock(DDAgentApi)
def arbitrator = Executors.newSingleThreadScheduledExecutor()
def dispatcher = new DispatchingDisruptor(2,
DDAgentWriter.toEventFactory(serializer),
api,
monitor,
Mock(DDAgentWriter))
dispatcher.start()

when:
api.sendSerializedTraces(_) >> {
// wait to be unblocked, by which time the test aims to have created a backlog
Thread.sleep(500)
DDAgentApi.Response.success(200)
}
// if the code below can't run within a second,
// the test will fail, but this behaviour is hard to provoke with latches
monitor.onBackedUpTraceBuffer() >> { backedUp.incrementAndGet() }
long txnId1 = dispatcher.beginTransaction()
serializer.reset(dispatcher.getTraceBuffer(txnId1))
serializer.serialize(minimalTrace)
dispatcher.commit(txnId1)
long txnId2 = dispatcher.beginTransaction()
serializer.reset(dispatcher.getTraceBuffer(txnId2))
serializer.serialize(minimalTrace)
dispatcher.commit(txnId2)
long txnId3 = dispatcher.beginTransaction()

then:
backedUp.get() >= 1 // reported the congestion
txnId3 == 2 // made progress eventually

cleanup:
dispatcher.close()
arbitrator.shutdownNow()
}

static int calculateSize(List<DDSpan> trace) {
def buffer = new ArrayBufferOutput()
def packer = MessagePack.DEFAULT_PACKER_CONFIG
Expand Down

0 comments on commit e2f46a9

Please sign in to comment.