diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java index 78c2d3c67e4..a190a7743d0 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaDecorator.java @@ -13,8 +13,10 @@ import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.record.TimestampType; public abstract class KafkaDecorator extends ClientDecorator { + public static final KafkaDecorator PRODUCER_DECORATE = new KafkaDecorator() { @Override @@ -65,9 +67,12 @@ public void onConsume(final AgentSpan span, final ConsumerRecord record) { span.setTag(DDTags.RESOURCE_NAME, "Consume Topic " + topic); span.setTag(PARTITION, record.partition()); span.setTag(OFFSET, record.offset()); - final long produceTime = record.timestamp(); - final long consumeTime = TimeUnit.NANOSECONDS.toMillis(span.getStartTime()); - span.setTag(RECORD_QUEUE_TIME_MS, Math.max(0L, consumeTime - produceTime)); + // don't record a duration if the message was sent from an old Kafka client + if (record.timestampType() != TimestampType.NO_TIMESTAMP_TYPE) { + final long produceTime = record.timestamp(); + final long consumeTime = TimeUnit.NANOSECONDS.toMillis(span.getStartTime()); + span.setTag(RECORD_QUEUE_TIME_MS, Math.max(0L, consumeTime - produceTime)); + } } }