Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package datadog.communication.serialization;

// TODO @FunctionalInterface
@FunctionalInterface
public interface Mapper<T> {
void map(T data, Writable packer);

default void reset() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ public <T> boolean format(T message, Mapper<T> mapper) {
// max capacity, then reject the message
if (buffer.flush()) {
try {
mapper.reset();
mapper.map(message, this);
buffer.mark();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ public interface RemoteMapper extends Mapper<List<? extends CoreSpan<?>>> {

int messageBufferSize();

void reset();

String endpoint();

class NoopRemoteMapper implements RemoteMapper {
Expand All @@ -55,9 +53,6 @@ public int messageBufferSize() {
return 0;
}

@Override
public void reset() {}

@Override
public String endpoint() {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import datadog.communication.serialization.Writable;
import datadog.communication.serialization.msgpack.MsgPackWriter;
import datadog.trace.api.Config;
import datadog.trace.api.ProcessTags;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.common.writer.Payload;
Expand Down Expand Up @@ -35,6 +34,7 @@ public final class TraceMapperV0_4 implements TraceMapper {
: null;

private final int size;
private boolean firstSpanWritten;

public TraceMapperV0_4(int size) {
this.size = size;
Expand All @@ -47,21 +47,19 @@ public TraceMapperV0_4() {
private static final class MetaWriter implements MetadataConsumer {

private Writable writable;
private boolean firstSpanInChunk;
private boolean lastSpanInChunk;
private boolean firstSpanInTrace;
private boolean lastSpanInTrace;
private boolean firstSpanInPayload;

MetaWriter withWritable(Writable writable) {
this.writable = writable;
return this;
}

MetaWriter forFirstSpanInChunk(final boolean firstSpanInChunk) {
this.firstSpanInChunk = firstSpanInChunk;
return this;
}

MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) {
this.lastSpanInChunk = lastSpanInChunk;
MetaWriter forSpan(boolean firstInTrace, boolean lastInTrace, boolean firstInPayload) {
this.firstSpanInTrace = firstInTrace;
this.lastSpanInTrace = lastInTrace;
this.firstSpanInPayload = firstInPayload;
return this;
}

Expand All @@ -70,9 +68,8 @@ public void accept(Metadata metadata) {
if (TAG_CACHE != null) TAG_CACHE.recalibrate();
if (VALUE_CACHE != null) VALUE_CACHE.recalibrate();

final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk;
final UTF8BytesString processTags =
firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null;
final boolean writeSamplingPriority = firstSpanInTrace || lastSpanInTrace;
final UTF8BytesString processTags = firstSpanInPayload ? metadata.processTags() : null;
int metaSize =
metadata.getBaggage().size()
+ metadata.getTags().size()
Expand Down Expand Up @@ -301,12 +298,12 @@ public void map(List<? extends CoreSpan<?>> trace, final Writable writable) {
span.processTagsAndBaggage(
metaWriter
.withWritable(writable)
.forFirstSpanInChunk(i == 0)
.forLastSpanInChunk(i == trace.size() - 1));
.forSpan(i == 0, i == trace.size() - 1, !firstSpanWritten));
if (!metaStruct.isEmpty()) {
/* 13 */
metaStructWriter.withWritable(writable).write(metaStruct);
}
firstSpanWritten = true;
}
}

Expand All @@ -321,7 +318,9 @@ public int messageBufferSize() {
}

@Override
public void reset() {}
public void reset() {
firstSpanWritten = false;
}

@Override
public String endpoint() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import datadog.communication.serialization.Writable;
import datadog.communication.serialization.WritableFormatter;
import datadog.communication.serialization.msgpack.MsgPackWriter;
import datadog.trace.api.ProcessTags;
import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags;
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.common.writer.Payload;
Expand All @@ -33,6 +32,7 @@ public final class TraceMapperV0_5 implements TraceMapper {

private final MetaWriter metaWriter = new MetaWriter();
private final int size;
private boolean firstSpanWritten;

public TraceMapperV0_5() {
this(2 << 20);
Expand Down Expand Up @@ -79,10 +79,10 @@ public void map(final List<? extends CoreSpan<?>> trace, final Writable writable
span.processTagsAndBaggage(
metaWriter
.withWritable(writable)
.forFirstSpanInChunk(i == 0)
.forLastSpanInChunk(i == trace.size() - 1));
.forSpan(i == 0, i == trace.size() - 1, !firstSpanWritten));
/* 12 */
writeDictionaryEncoded(writable, span.getType());
firstSpanWritten = true;
}
}

Expand Down Expand Up @@ -115,6 +115,7 @@ public int messageBufferSize() {
public void reset() {
dictionary.reset();
encoding.clear();
firstSpanWritten = false;
}

@Override
Expand Down Expand Up @@ -181,29 +182,26 @@ private List<ByteBuffer> toList() {
private final class MetaWriter implements MetadataConsumer {

private Writable writable;
private boolean firstSpanInChunk;
private boolean lastSpanInChunk;
private boolean firstSpanInTrace;
private boolean lastSpanInTrace;
private boolean firstSpanInPayload;

MetaWriter withWritable(final Writable writable) {
this.writable = writable;
return this;
}

MetaWriter forFirstSpanInChunk(final boolean firstSpanInChunk) {
this.firstSpanInChunk = firstSpanInChunk;
return this;
}

MetaWriter forLastSpanInChunk(final boolean lastSpanInChunk) {
this.lastSpanInChunk = lastSpanInChunk;
MetaWriter forSpan(boolean firstInTrace, boolean lastInTrace, boolean firstInPayload) {
this.firstSpanInTrace = firstInTrace;
this.lastSpanInTrace = lastInTrace;
this.firstSpanInPayload = firstInPayload;
return this;
}

@Override
public void accept(Metadata metadata) {
final boolean writeSamplingPriority = firstSpanInChunk || lastSpanInChunk;
final UTF8BytesString processTags =
firstSpanInChunk ? ProcessTags.getTagsForSerialization() : null;
final boolean writeSamplingPriority = firstSpanInTrace || lastSpanInTrace;
final UTF8BytesString processTags = firstSpanInPayload ? metadata.processTags() : null;
int metaSize =
metadata.getBaggage().size()
+ metadata.getTags().size()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package datadog.trace.common.writer

import static datadog.trace.api.config.GeneralConfig.EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED

import datadog.trace.api.DDSpanId
import datadog.trace.api.DDTraceId
import datadog.trace.api.ProcessTags
import datadog.trace.api.StatsDClient
import datadog.trace.api.sampling.PrioritySampling
import datadog.trace.api.datastreams.NoopPathwayContext
Expand Down Expand Up @@ -194,6 +197,9 @@ class DDAgentWriterCombinedTest extends DDCoreSpecification {
@Timeout(30)
def "test default buffer size for #agentVersion"() {
setup:
// disable process tags since they are only written on the first span and it will break the trace size estimation
injectSysConfig(EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED, "false")
ProcessTags.reset()
def api = Mock(DDAgentApi)
def discovery = Mock(DDAgentFeaturesDiscovery)
def writer = DDAgentWriter.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ class TraceMapperV04PayloadTest extends DDSpecification {
if (!packer.format(trace, traceMapper)) {
verifier.skipLargeTrace()
tracesFitInBuffer = false
// in the real like the mapper is always reset each trace.
// here we need to force it when we fail since the buffer will be reset as well
traceMapper.reset()
}
}
packer.flush()
Expand Down Expand Up @@ -239,7 +242,7 @@ class TraceMapperV04PayloadTest extends DDSpecification {
if (expectedTraces.isEmpty() && messageCount == 0) {
return
}
boolean hasProcessTags = false
int processTagsCount = 0
try {
Payload payload = mapper.newPayload().withBody(messageCount, buffer)
payload.writeTo(this)
Expand Down Expand Up @@ -351,7 +354,7 @@ class TraceMapperV04PayloadTest extends DDSpecification {
assertTrue(Config.get().isExperimentalPropagateProcessTagsEnabled())
assertEquals(0, k)
assertEquals(ProcessTags.tagsForSerialization.toString(), entry.getValue())
hasProcessTags = true
processTagsCount++
} else {
Object tag = expectedSpan.getTag(entry.getKey())
if (null != tag) {
Expand Down Expand Up @@ -379,10 +382,10 @@ class TraceMapperV04PayloadTest extends DDSpecification {
} catch (IOException e) {
Assertions.fail(e.getMessage())
} finally {
assert hasProcessTags == Config.get().isExperimentalPropagateProcessTagsEnabled()
mapper.reset()
captured.position(0)
captured.limit(captured.capacity())
assert processTagsCount == (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {

@Override
void accept(int messageCount, ByteBuffer buffer) {
def hasProcessTags = false
def processTagsCount = 0
try {
Payload payload = mapper.newPayload().withBody(messageCount, buffer)
payload.writeTo(this)
Expand Down Expand Up @@ -268,7 +268,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {
} else if(DDTags.ORIGIN_KEY.equals(entry.getKey())) {
assertEquals(expectedSpan.getOrigin(), entry.getValue())
} else if (DDTags.PROCESS_TAGS.equals(entry.getKey())) {
hasProcessTags = true
processTagsCount++
assertTrue(Config.get().isExperimentalPropagateProcessTagsEnabled())
assertEquals(0, k)
assertEquals(ProcessTags.tagsForSerialization.toString(), entry.getValue())
Expand Down Expand Up @@ -338,7 +338,7 @@ class TraceMapperV05PayloadTest extends DDSpecification {
} catch (IOException e) {
Assert.fail(e.getMessage())
} finally {
assert hasProcessTags == Config.get().isExperimentalPropagateProcessTagsEnabled()
assert processTagsCount == (Config.get().isExperimentalPropagateProcessTagsEnabled() ? 1 : 0)
mapper.reset()
captured.position(0)
captured.limit(captured.capacity())
Expand Down