Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Kafka poll span when DSM is enabled #6969

Merged
merged 5 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
package datadog.trace.instrumentation.kafka_clients;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT;
import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_POLL;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
Expand All @@ -13,6 +18,8 @@
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -130,18 +137,43 @@ public static void muzzleCheck(ConsumerRecord record) {
* KafkaConsumer class.
*/
public static class RecordsAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static AgentScope onEnter() {
boolean dataStreamsEnabled;
if (activeSpan() != null) {
dataStreamsEnabled = activeSpan().traceConfig().isDataStreamsEnabled();
} else {
dataStreamsEnabled = Config.get().isDataStreamsEnabled();
}
if (dataStreamsEnabled) {
final AgentSpan span = startSpan(KAFKA_POLL);
return activateSpan(span);
}
return null;
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void captureGroup(
@Advice.This KafkaConsumer consumer, @Advice.Return ConsumerRecords records) {
if (records == null) {
return;
@Advice.Enter final AgentScope scope,
@Advice.This KafkaConsumer consumer,
@Advice.Return ConsumerRecords records) {
int recordsCount = 0;
if (records != null) {
KafkaConsumerInfo kafkaConsumerInfo =
InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer);
if (kafkaConsumerInfo != null) {
InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class)
.put(records, kafkaConsumerInfo);
}
recordsCount = records.count();
}
KafkaConsumerInfo kafkaConsumerInfo =
InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer);
if (kafkaConsumerInfo != null) {
InstrumentationContext.get(ConsumerRecords.class, KafkaConsumerInfo.class)
.put(records, kafkaConsumerInfo);
if (scope == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe do this scope null check once at the beginning of the method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated a bit the code, the hard part is that I want to attach the kafka consumer info to records even if the scope is null.

return;
}
AgentSpan span = scope.span();
span.setTag(KAFKA_RECORDS_COUNT, recordsCount);
span.finish();
scope.close();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class KafkaDecorator extends MessagingClientDecorator {
public static final CharSequence KAFKA_CONSUME =
UTF8BytesString.create(
SpanNaming.instance().namingSchema().messaging().inboundOperation(KAFKA));

public static final CharSequence KAFKA_POLL = UTF8BytesString.create("kafka.poll");
public static final CharSequence KAFKA_PRODUCE =
UTF8BytesString.create(
SpanNaming.instance().namingSchema().messaging().outboundOperation(KAFKA));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import datadog.trace.common.writer.ListWriter

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope
Expand Down Expand Up @@ -53,13 +55,57 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {

public static final LinkedHashMap<String, String> PRODUCER_PATHWAY_EDGE_TAGS

// filter out Kafka poll, since the function is called in a loop, giving inconsistent results
final ListWriter.Filter dropKafkaPoll = new ListWriter.Filter() {
@Override
boolean accept(List<DDSpan> trace) {
return !(trace.size() == 1 &&
trace.get(0).getResourceName().toString().equals("kafka.poll"))
}
}

final ListWriter.Filter dropEmptyKafkaPoll = new ListWriter.Filter() {
@Override
boolean accept(List<DDSpan> trace) {
return !(trace.size() == 1 &&
trace.get(0).getResourceName().toString().equals("kafka.poll") &&
trace.get(0).getTag(InstrumentationTags.KAFKA_RECORDS_COUNT).equals(0))
}
}

// TraceID, start times & names changed based on the configuration, so overriding the sort to give consistent test results
private static class SortKafkaTraces implements Comparator<List<DDSpan>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the usual test sorting mechanisms not sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, it doesn't look like it 😢
trace ID are inconsistent because poll can start before produce (and finish after)
names are not good because the kafka consume span has different names depending on the configs
start times are not good either, same reason as trace IDs

@Override
int compare(List<DDSpan> o1, List<DDSpan> o2) {
return rootSpanTrace(o1) - rootSpanTrace(o2)
}

int rootSpanTrace(List<DDSpan> trace) {
assert !trace.isEmpty()
def rootSpan = trace.get(0).localRootSpan
switch (rootSpan.operationName.toString()) {
case "parent":
return 3
case "kafka.poll":
return 2
default:
return 1
}
}
}


static {
PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<>(3)
PRODUCER_PATHWAY_EDGE_TAGS.put("direction", "out")
PRODUCER_PATHWAY_EDGE_TAGS.put("topic", SHARED_TOPIC)
PRODUCER_PATHWAY_EDGE_TAGS.put("type", "kafka")
}

def setup() {
TEST_WRITER.setFilter(dropKafkaPoll)
}

@Override
int version() {
0
Expand Down Expand Up @@ -124,9 +170,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
}
}

if (isDataStreamsEnabled()) {
}

cleanup:
producer.close()
}
Expand All @@ -137,6 +180,7 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
if (isDataStreamsEnabled()) {
senderProps.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, 1000)
}
TEST_WRITER.setFilter(dropEmptyKafkaPoll)
KafkaProducer<String, String> producer = new KafkaProducer<>(senderProps, new StringSerializer(), new StringSerializer())
String clusterId = ""
if (isDataStreamsEnabled()) {
Expand Down Expand Up @@ -203,28 +247,37 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
received.value() == greeting
received.key() == null

assertTraces(2, SORT_TRACES_BY_ID) {
trace(3) {
basicSpan(it, "parent")
basicSpan(it, "producer callback", span(0))
producerSpan(it, senderProps, span(0), false)
}
int nTraces = isDataStreamsEnabled() ? 3 : 2
int produceTraceIdx = nTraces - 1
TEST_WRITER.waitForTraces(nTraces)
def traces = (Arrays.asList(TEST_WRITER.toArray()) as List<List<DDSpan>>)
Collections.sort(traces, new SortKafkaTraces())
assertTraces(nTraces, new SortKafkaTraces()) {
if (hasQueueSpan()) {
trace(2) {
consumerSpan(it, consumerProperties, trace(1)[1])
queueSpan(it, trace(0)[2])
consumerSpan(it, consumerProperties, span(1))
queueSpan(it, trace(produceTraceIdx)[2])
}
} else {
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[2])
consumerSpan(it, consumerProperties, trace(produceTraceIdx)[2])
}
}
if (isDataStreamsEnabled()) {
trace(1, {
pollSpan(it)
})
}
trace(3) {
basicSpan(it, "parent")
basicSpan(it, "producer callback", span(0))
producerSpan(it, senderProps, span(0), false)
}
}

def headers = received.headers()
headers.iterator().hasNext()
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${TEST_WRITER[0][2].traceId}"
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${TEST_WRITER[0][2].spanId}"
new String(headers.headers("x-datadog-trace-id").iterator().next().value()) == "${traces[produceTraceIdx][2].traceId}"
new String(headers.headers("x-datadog-parent-id").iterator().next().value()) == "${traces[produceTraceIdx][2].spanId}"

if (isDataStreamsEnabled()) {
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
Expand Down Expand Up @@ -1069,6 +1122,27 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
}
}

def pollSpan(
TraceAssert trace,
int recordCount = 1,
DDSpan parentSpan = null,
Range offset = 0..0,
boolean tombstone = false,
boolean distributedRootSpan = !hasQueueSpan()
) {
trace.span {
serviceName Config.get().getServiceName()
operationName "kafka.poll"
resourceName "kafka.poll"
errored false
measured false
tags {
"$InstrumentationTags.KAFKA_RECORDS_COUNT" recordCount
defaultTags(true)
}
}
}

def waitForKafkaMetadataUpdate(KafkaTemplate kafkaTemplate) {
kafkaTemplate.flush()
Producer<String, String> wrappedProducer = kafkaTemplate.getTheProducer()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class InstrumentationTags {
public static final String PROCESSOR_NAME = "processor.name";
public static final String RECORD_QUEUE_TIME_MS = "record.queue_time_ms";
public static final String RECORD_END_TO_END_DURATION_MS = "record.e2e_duration_ms";
public static final String KAFKA_RECORDS_COUNT = "kafka.records_count";
public static final String TOMBSTONE = "tombstone";
public static final String AWS_AGENT = "aws.agent";
public static final String AWS_SERVICE = "aws.service";
Expand Down
Loading