diff --git a/dd-trace-core/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy index d3e01fa36f5..8893c6a4d99 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/api/writer/DDAgentWriterTest.groovy @@ -3,7 +3,8 @@ package datadog.trace.api.writer import com.timgroup.statsd.StatsDClient import datadog.trace.api.sampling.PrioritySampling import datadog.trace.common.writer.DDAgentWriter -import datadog.trace.common.writer.ddagent.DDAgentApi +import datadog.trace.common.writer.ddagent.DDAgentApi +import datadog.trace.common.writer.ddagent.DispatchingDisruptor import datadog.trace.common.writer.ddagent.Monitor import datadog.trace.common.writer.ddagent.MsgPackStatefulSerializer import datadog.trace.common.writer.ddagent.TraceBuffer @@ -18,6 +19,7 @@ import org.msgpack.core.buffer.ArrayBufferOutput import spock.lang.Retry import spock.lang.Timeout +import java.util.concurrent.Executors import java.util.concurrent.Phaser import java.util.concurrent.Semaphore import java.util.concurrent.TimeUnit @@ -672,6 +674,50 @@ class DDAgentWriterTest extends DDSpecification { writer.close() } + def "test backed up buffer"() { + setup: + System.gc() // seems to reduce false negatives + def minimalTrace = createMinimalTrace() + AtomicInteger backedUp = new AtomicInteger(0) + def serializer = Spy(new MsgPackStatefulSerializer()) + def monitor = Mock(Monitor) + def api = Mock(DDAgentApi) + def arbitrator = Executors.newSingleThreadScheduledExecutor() + def dispatcher = new DispatchingDisruptor(2, + DDAgentWriter.toEventFactory(serializer), + api, + monitor, + Mock(DDAgentWriter)) + dispatcher.start() + + when: + api.sendSerializedTraces(_) >> { + // wait to be unblocked, by which time the test aims to have created a backlog + Thread.sleep(500) + DDAgentApi.Response.success(200) + } + // if the code below can't run within a second, + // the test will fail, but this behaviour is hard to provoke with latches + monitor.onBackedUpTraceBuffer() >> { backedUp.incrementAndGet() } + long txnId1 = dispatcher.beginTransaction() + serializer.reset(dispatcher.getTraceBuffer(txnId1)) + serializer.serialize(minimalTrace) + dispatcher.commit(txnId1) + long txnId2 = dispatcher.beginTransaction() + serializer.reset(dispatcher.getTraceBuffer(txnId2)) + serializer.serialize(minimalTrace) + dispatcher.commit(txnId2) + long txnId3 = dispatcher.beginTransaction() + + then: + backedUp.get() >= 1 // reported the congestion + txnId3 == 2 // made progress eventually + + cleanup: + dispatcher.close() + arbitrator.shutdownNow() + } + static int calculateSize(List trace) { def buffer = new ArrayBufferOutput() def packer = MessagePack.DEFAULT_PACKER_CONFIG