diff --git a/communication/src/main/java/datadog/communication/serialization/Mapper.java b/communication/src/main/java/datadog/communication/serialization/Mapper.java index 549391cbc33..f0c07681ccf 100644 --- a/communication/src/main/java/datadog/communication/serialization/Mapper.java +++ b/communication/src/main/java/datadog/communication/serialization/Mapper.java @@ -1,6 +1,8 @@ package datadog.communication.serialization; -// TODO @FunctionalInterface +@FunctionalInterface public interface Mapper { void map(T data, Writable packer); + + default void reset() {} } diff --git a/communication/src/main/java/datadog/communication/serialization/msgpack/MsgPackWriter.java b/communication/src/main/java/datadog/communication/serialization/msgpack/MsgPackWriter.java index 9d3d5154e74..107e1f94efd 100644 --- a/communication/src/main/java/datadog/communication/serialization/msgpack/MsgPackWriter.java +++ b/communication/src/main/java/datadog/communication/serialization/msgpack/MsgPackWriter.java @@ -90,6 +90,7 @@ public boolean format(T message, Mapper mapper) { // max capacity, then reject the message if (buffer.flush()) { try { + mapper.reset(); mapper.map(message, this); buffer.mark(); return true; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteMapper.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteMapper.java index cd54b1441a0..0db05e55f3a 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteMapper.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/RemoteMapper.java @@ -36,8 +36,6 @@ public interface RemoteMapper extends Mapper>> { int messageBufferSize(); - void reset(); - String endpoint(); class NoopRemoteMapper implements RemoteMapper { @@ -55,9 +53,6 @@ public int messageBufferSize() { return 0; } - @Override - public void reset() {} - @Override public String endpoint() { return null; diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java index 6c9fca64112..2dd66fe78cc 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_4.java @@ -7,7 +7,6 @@ import datadog.communication.serialization.Writable; import datadog.communication.serialization.msgpack.MsgPackWriter; import datadog.trace.api.Config; -import datadog.trace.api.ProcessTags; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.writer.Payload; @@ -35,6 +34,7 @@ public final class TraceMapperV0_4 implements TraceMapper { : null; private final int size; + private boolean firstSpanWritten; public TraceMapperV0_4(int size) { this.size = size; @@ -47,21 +47,19 @@ public TraceMapperV0_4() { private static final class MetaWriter implements MetadataConsumer { private Writable writable; - private boolean firstSpanInChunk; - private boolean lastSpanInChunk; + private boolean firstSpanInTrace; + private boolean lastSpanInTrace; + private boolean firstSpanInPayload; MetaWriter withWritable(Writable writable) { this.writable = writable; return this; } - MetaWriter forFirstSpanInChunk(final boolean firstSpanInChunk) { - this.firstSpanInChunk = firstSpanInChunk; - return this; - } - - MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) { - this.lastSpanInChunk = lastSpanInChunk; + MetaWriter forSpan(boolean firstInTrace, boolean lastInTrace, boolean firstInPayload) { + this.firstSpanInTrace = firstInTrace; + this.lastSpanInTrace = lastInTrace; + this.firstSpanInPayload = firstInPayload; return this; } @@ -70,9 +68,8 @@ public void accept(Metadata metadata) { if (TAG_CACHE != null) TAG_CACHE.recalibrate(); if (VALUE_CACHE != null) VALUE_CACHE.recalibrate(); - final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk; - final UTF8BytesString processTags = - firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null; + final boolean writeSamplingPriority = firstSpanInTrace || lastSpanInTrace; + final UTF8BytesString processTags = firstSpanInPayload ? metadata.processTags() : null; int metaSize = metadata.getBaggage().size() + metadata.getTags().size() @@ -301,12 +298,12 @@ public void map(List> trace, final Writable writable) { span.processTagsAndBaggage( metaWriter .withWritable(writable) - .forFirstSpanInChunk(i == 0) - .forLastSpanInChunk(i == trace.size() - 1)); + .forSpan(i == 0, i == trace.size() - 1, !firstSpanWritten)); if (!metaStruct.isEmpty()) { /* 13 */ metaStructWriter.withWritable(writable).write(metaStruct); } + firstSpanWritten = true; } } @@ -321,7 +318,9 @@ public int messageBufferSize() { } @Override - public void reset() {} + public void reset() { + firstSpanWritten = false; + } @Override public String endpoint() { diff --git a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_5.java b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_5.java index c89582e71ec..4e564c0019c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_5.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/writer/ddagent/TraceMapperV0_5.java @@ -7,7 +7,6 @@ import datadog.communication.serialization.Writable; import datadog.communication.serialization.WritableFormatter; import datadog.communication.serialization.msgpack.MsgPackWriter; -import datadog.trace.api.ProcessTags; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.writer.Payload; @@ -33,6 +32,7 @@ public final class TraceMapperV0_5 implements TraceMapper { private final MetaWriter metaWriter = new MetaWriter(); private final int size; + private boolean firstSpanWritten; public TraceMapperV0_5() { this(2 << 20); @@ -79,10 +79,10 @@ public void map(final List> trace, final Writable writable span.processTagsAndBaggage( metaWriter .withWritable(writable) - .forFirstSpanInChunk(i == 0) - .forLastSpanInChunk(i == trace.size() - 1)); + .forSpan(i == 0, i == trace.size() - 1, !firstSpanWritten)); /* 12 */ writeDictionaryEncoded(writable, span.getType()); + firstSpanWritten = true; } } @@ -115,6 +115,7 @@ public int messageBufferSize() { public void reset() { dictionary.reset(); encoding.clear(); + firstSpanWritten = false; } @Override @@ -181,29 +182,26 @@ private List toList() { private final class MetaWriter implements MetadataConsumer { private Writable writable; - private boolean firstSpanInChunk; - private boolean lastSpanInChunk; + private boolean firstSpanInTrace; + private boolean lastSpanInTrace; + private boolean firstSpanInPayload; MetaWriter withWritable(final Writable writable) { this.writable = writable; return this; } - MetaWriter forFirstSpanInChunk(final boolean firstSpanInChunk) { - this.firstSpanInChunk = firstSpanInChunk; - return this; - } - - MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) { - this.lastSpanInChunk = lastSpanInChunk; + MetaWriter forSpan(boolean firstInTrace, boolean lastInTrace, boolean firstInPayload) { + this.firstSpanInTrace = firstInTrace; + this.lastSpanInTrace = lastInTrace; + this.firstSpanInPayload = firstInPayload; return this; } @Override public void accept(Metadata metadata) { - final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk; - final UTF8BytesString processTags = - firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null; + final boolean writeSamplingPriority = firstSpanInTrace || lastSpanInTrace; + final UTF8BytesString processTags = firstSpanInPayload ? metadata.processTags() : null; int metaSize = metadata.getBaggage().size() + metadata.getTags().size() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterCombinedTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterCombinedTest.groovy index 1b3a94b42a1..d70c78edb95 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterCombinedTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/DDAgentWriterCombinedTest.groovy @@ -1,7 +1,10 @@ package datadog.trace.common.writer +import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED + import datadog.trace.api.DDSpanId import datadog.trace.api.DDTraceId +import datadog.trace.api.ProcessTags import datadog.trace.api.StatsDClient import datadog.trace.api.sampling.PrioritySampling import datadog.trace.api.datastreams.NoopPathwayContext @@ -194,6 +197,9 @@ class DDAgentWriterCombinedTest extends DDCoreSpecification { @Timeout(30) def "test default buffer size for #agentVersion"() { setup: + // disable process tags since they are only written on the first span and it will break the trace size estimation + injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false") + ProcessTags.reset() def api = Mock(DDAgentApi) def discovery = Mock(DDAgentFeaturesDiscovery) def writer = DDAgentWriter.builder() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV04PayloadTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV04PayloadTest.groovy index 6f0d685668a..b655510d102 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV04PayloadTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV04PayloadTest.groovy @@ -55,6 +55,9 @@ class TraceMapperV04PayloadTest extends DDSpecification { if (!packer.format(trace, traceMapper)) { verifier.skipLargeTrace() tracesFitInBuffer = false + // in the real like the mapper is always reset each trace. + // here we need to force it when we fail since the buffer will be reset as well + traceMapper.reset() } } packer.flush() @@ -239,7 +242,7 @@ class TraceMapperV04PayloadTest extends DDSpecification { if (expectedTraces.isEmpty() && messageCount == 0) { return } - boolean hasProcessTags = false + int processTagsCount = 0 try { Payload payload = mapper.newPayload().withBody(messageCount, buffer) payload.writeTo(this) @@ -351,7 +354,7 @@ class TraceMapperV04PayloadTest extends DDSpecification { assertTrue(Config.get().isExperimentalPropagateProcessTagsEnabled()) assertEquals(0, k) assertEquals(ProcessTags.tagsForSerialization.toString(), entry.getValue()) - hasProcessTags = true + processTagsCount++ } else { Object tag = expectedSpan.getTag(entry.getKey()) if (null != tag) { @@ -379,10 +382,10 @@ class TraceMapperV04PayloadTest extends DDSpecification { } catch (IOException e) { Assertions.fail(e.getMessage()) } finally { - assert hasProcessTags == Config.get().isExperimentalPropagateProcessTagsEnabled() mapper.reset() captured.position(0) captured.limit(captured.capacity()) + assert processTagsCount == (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0) } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV05PayloadTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV05PayloadTest.groovy index 30cf95e1353..bc39ad6640d 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV05PayloadTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/writer/ddagent/TraceMapperV05PayloadTest.groovy @@ -216,7 +216,7 @@ class TraceMapperV05PayloadTest extends DDSpecification { @Override void accept(int messageCount, ByteBuffer buffer) { - def hasProcessTags = false + def processTagsCount = 0 try { Payload payload = mapper.newPayload().withBody(messageCount, buffer) payload.writeTo(this) @@ -268,7 +268,7 @@ class TraceMapperV05PayloadTest extends DDSpecification { } else if(DDTags.ORIGIN_KEY.equals(entry.getKey())) { assertEquals(expectedSpan.getOrigin(), entry.getValue()) } else if (DDTags.PROCESS_TAGS.equals(entry.getKey())) { - hasProcessTags = true + processTagsCount++ assertTrue(Config.get().isExperimentalPropagateProcessTagsEnabled()) assertEquals(0, k) assertEquals(ProcessTags.tagsForSerialization.toString(), entry.getValue()) @@ -338,7 +338,7 @@ class TraceMapperV05PayloadTest extends DDSpecification { } catch (IOException e) { Assert.fail(e.getMessage()) } finally { - assert hasProcessTags == Config.get().isExperimentalPropagateProcessTagsEnabled() + assert processTagsCount == (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0) mapper.reset() captured.position(0) captured.limit(captured.capacity())