From 8cd94abce67fe648a35395e0d51a26ea81cacc85 Mon Sep 17 00:00:00 2001 From: Richard Startin Date: Fri, 22 May 2020 11:20:19 +0100 Subject: [PATCH 1/2] allow user to disable kafka time in queue tag --- .../instrumentation/kafka_clients/KafkaDecorator.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java index 78c2d3c67e4..fbfb0814233 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java @@ -5,6 +5,7 @@ import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.RECORD_QUEUE_TIME_MS; +import datadog.trace.api.Config; import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -15,6 +16,7 @@ import org.apache.kafka.clients.producer.ProducerRecord; public abstract class KafkaDecorator extends ClientDecorator { + public static final KafkaDecorator PRODUCER_DECORATE = new KafkaDecorator() { @Override @@ -65,9 +67,11 @@ public void onConsume(final AgentSpan span, final ConsumerRecord record) { span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic); span.setTag(PARTITION, record.partition()); span.setTag(OFFSET, record.offset()); - final long produceTime = record.timestamp(); - final long consumeTime = TimeUnit.NANOSECONDS.toMillis(span.getStartTime()); - span.setTag(RECORD_QUEUE_TIME_MS, Math.max(0L, consumeTime - produceTime)); + if (Config.get().isKafkaClientPropagationEnabled()) { + final long produceTime = record.timestamp(); + final long consumeTime = TimeUnit.NANOSECONDS.toMillis(span.getStartTime()); + span.setTag(RECORD_QUEUE_TIME_MS, Math.max(0L, consumeTime - produceTime)); + } } } From 2a04e739727700018f45684c398a9580286c1529 Mon Sep 17 00:00:00 2001 From: Richard Startin Date: Fri, 22 May 2020 18:54:49 +0100 Subject: [PATCH 2/2] consult ConsumerRecord.timestampType to detect whether there is no timestamp --- .../trace/instrumentation/kafka_clients/KafkaDecorator.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java index fbfb0814233..a190a7743d0 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java @@ -5,7 +5,6 @@ import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.PARTITION; import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.RECORD_QUEUE_TIME_MS; -import datadog.trace.api.Config; import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -14,6 +13,7 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.TimestampType; public abstract class KafkaDecorator extends ClientDecorator { @@ -67,7 +67,8 @@ public void onConsume(final AgentSpan span, final ConsumerRecord record) { span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic); span.setTag(PARTITION, record.partition()); span.setTag(OFFSET, record.offset()); - if (Config.get().isKafkaClientPropagationEnabled()) { + // don't record a duration if the message was sent from an old Kafka client + if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { final long produceTime = record.timestamp(); final long consumeTime = TimeUnit.NANOSECONDS.toMillis(span.getStartTime()); span.setTag(RECORD_QUEUE_TIME_MS, Math.max(0L, consumeTime - produceTime));