Skip to content

Commit

Permalink
kafka: Add poll span when DSM is enabled (#6969)
Browse files Browse the repository at this point in the history
* kafka: Add poll span when DSM is enabled

* update poll method
  • Loading branch information
piochelepiotr committed May 9, 2024
1 parent e1d7174 commit 6df14c1
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package datadog.trace.instrumentation.kafka_clients;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_POLL;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
Expand All @@ -13,6 +18,8 @@
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -130,18 +137,43 @@ public static void muzzleCheck(ConsumerRecord record) {
* KafkaConsumer class.
*/
public static class RecordsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter() {
boolean dataStreamsEnabled;
if (activeSpan() != null) {
dataStreamsEnabled = activeSpan().traceConfig().isDataStreamsEnabled();
} else {
dataStreamsEnabled = Config.get().isDataStreamsEnabled();
}
if (dataStreamsEnabled) {
final AgentSpan span = startSpan(KAFKA_POLL);
return activateSpan(span);
}
return null;
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void captureGroup(
@Advice.This KafkaConsumer consumer, @Advice.Return ConsumerRecords records) {
if (records == null) {
return;
@Advice.Enter final AgentScope scope,
@Advice.This KafkaConsumer consumer,
@Advice.Return ConsumerRecords records) {
int recordsCount = 0;
if (records != null) {
KafkaConsumerInfo kafkaConsumerInfo =
InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer);
if (kafkaConsumerInfo != null) {
InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class)
.put(records, kafkaConsumerInfo);
}
recordsCount = records.count();
}
KafkaConsumerInfo kafkaConsumerInfo =
InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer);
if (kafkaConsumerInfo != null) {
InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class)
.put(records, kafkaConsumerInfo);
if (scope == null) {
return;
}
AgentSpan span = scope.span();
span.setTag(KAFKA_RECORDS_COUNT, recordsCount);
span.finish();
scope.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class KafkaDecorator extends MessagingClientDecorator {
public static final CharSequence KAFKA_CONSUME =
UTF8BytesString.create(
SpanNaming.instance().namingSchema().messaging().inboundOperation(KAFKA));

public static final CharSequence KAFKA_POLL = UTF8BytesString.create("kafka.poll");
public static final CharSequence KAFKA_PRODUCE =
UTF8BytesString.create(
SpanNaming.instance().namingSchema().messaging().outboundOperation(KAFKA));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datadog.trace.common.writer.ListWriter

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
Expand Down Expand Up @@ -53,13 +55,57 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {

public static final LinkedHashMap<String, String> PRODUCER_PATHWAY_EDGE_TAGS

// filter out Kafka poll, since the function is called in a loop, giving inconsistent results
final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() {
@Override
boolean accept(List<DDSpan> trace) {
return !(trace.size() == 1 &&
trace.get(0).getResourceName().toString().equals("kafka.poll"))
}
}

final ListWriter.Filter dropEmptyKafkaPoll = new ListWriter.Filter() {
@Override
boolean accept(List<DDSpan> trace) {
return !(trace.size() == 1 &&
trace.get(0).getResourceName().toString().equals("kafka.poll") &&
trace.get(0).getTag(InstrumentationTags.KAFKA_RECORDS_COUNT).equals(0))
}
}

// TraceID, start times & names changed based on the configuration, so overriding the sort to give consistent test results
private static class SortKafkaTraces implements Comparator<List<DDSpan>> {
@Override
int compare(List<DDSpan> o1, List<DDSpan> o2) {
return rootSpanTrace(o1) - rootSpanTrace(o2)
}

int rootSpanTrace(List<DDSpan> trace) {
assert !trace.isEmpty()
def rootSpan = trace.get(0).localRootSpan
switch (rootSpan.operationName.toString()) {
case "parent":
return 3
case "kafka.poll":
return 2
default:
return 1
}
}
}


static {
PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<>(3)
PRODUCER_PATHWAY_EDGE_TAGS.put("direction", "out")
PRODUCER_PATHWAY_EDGE_TAGS.put("topic", SHARED_TOPIC)
PRODUCER_PATHWAY_EDGE_TAGS.put("type", "kafka")
}

def setup() {
TEST_WRITER.setFilter(dropKafkaPoll)
}

@Override
int version() {
0
Expand Down Expand Up @@ -124,9 +170,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
}
}

if (isDataStreamsEnabled()) {
}

cleanup:
producer.close()
}
Expand All @@ -137,6 +180,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
if (isDataStreamsEnabled()) {
senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000)
}
TEST_WRITER.setFilter(dropEmptyKafkaPoll)
KafkaProducer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())
String clusterId = ""
if (isDataStreamsEnabled()) {
Expand Down Expand Up @@ -203,28 +247,37 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
received.value() == greeting
received.key() == null

assertTraces(2, SORT_TRACES_BY_ID) {
trace(3) {
basicSpan(it, "parent")
basicSpan(it, "producer callback", span(0))
producerSpan(it, senderProps, span(0), false)
}
int nTraces = isDataStreamsEnabled() ? 3 : 2
int produceTraceIdx = nTraces - 1
TEST_WRITER.waitForTraces(nTraces)
def traces = (Arrays.asList(TEST_WRITER.toArray()) as List<List<DDSpan>>)
Collections.sort(traces, new SortKafkaTraces())
assertTraces(nTraces, new SortKafkaTraces()) {
if (hasQueueSpan()) {
trace(2) {
consumerSpan(it, consumerProperties, trace(1)[1])
queueSpan(it, trace(0)[2])
consumerSpan(it, consumerProperties, span(1))
queueSpan(it, trace(produceTraceIdx)[2])
}
} else {
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[2])
consumerSpan(it, consumerProperties, trace(produceTraceIdx)[2])
}
}
if (isDataStreamsEnabled()) {
trace(1, {
pollSpan(it)
})
}
trace(3) {
basicSpan(it, "parent")
basicSpan(it, "producer callback", span(0))
producerSpan(it, senderProps, span(0), false)
}
}

def headers = received.headers()
headers.iterator().hasNext()
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[0][2].traceId}"
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[0][2].spanId}"
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${traces[produceTraceIdx][2].traceId}"
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${traces[produceTraceIdx][2].spanId}"

if (isDataStreamsEnabled()) {
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
Expand Down Expand Up @@ -1069,6 +1122,27 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
}
}

def pollSpan(
TraceAssert trace,
int recordCount = 1,
DDSpan parentSpan = null,
Range offset = 0..0,
boolean tombstone = false,
boolean distributedRootSpan = !hasQueueSpan()
) {
trace.span {
serviceName Config.get().getServiceName()
operationName "kafka.poll"
resourceName "kafka.poll"
errored false
measured false
tags {
"$InstrumentationTags.KAFKA_RECORDS_COUNT" recordCount
defaultTags(true)
}
}
}

def waitForKafkaMetadataUpdate(KafkaTemplate kafkaTemplate) {
kafkaTemplate.flush()
Producer<String, String> wrappedProducer = kafkaTemplate.getTheProducer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class InstrumentationTags {
public static final String PROCESSOR_NAME = "processor.name";
public static final String RECORD_QUEUE_TIME_MS = "record.queue_time_ms";
public static final String RECORD_END_TO_END_DURATION_MS = "record.e2e_duration_ms";
public static final String KAFKA_RECORDS_COUNT = "kafka.records_count";
public static final String TOMBSTONE = "tombstone";
public static final String AWS_AGENT = "aws.agent";
public static final String AWS_SERVICE = "aws.service";
Expand Down

0 comments on commit 6df14c1

Please sign in to comment.