diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java index efe80cb1a38..31704720628 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java @@ -7,6 +7,7 @@ public class TracingIterable implements Iterable { private final Iterable delegate; private final String operationName; private final KafkaDecorator decorator; + private boolean firstIterator = true; public TracingIterable( final Iterable delegate, @@ -19,6 +20,17 @@ public TracingIterable( @Override public Iterator iterator() { - return new TracingIterator(delegate.iterator(), operationName, decorator); + Iterator it; + // We should only return one iterator with tracing. + // However, this is not thread-safe, but usually the first (hopefully only) traversal of + // ConsumerRecords is performed in the same thread that called poll() + if (this.firstIterator) { + it = new TracingIterator(delegate.iterator(), operationName, decorator); + firstIterator = false; + } else { + it = delegate.iterator(); + } + + return it; } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java index f7e12cb233a..cd8bddbbfd5 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java @@ -1,23 +1,19 @@ package datadog.trace.instrumentation.kafka_clients; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.ListIterator; import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingList implements List { +public class TracingList extends TracingIterable implements List { private final List delegate; - private final String operationName; - private final KafkaDecorator decorator; public TracingList( final List delegate, final String operationName, final KafkaDecorator decorator) { + super(delegate, operationName, decorator); this.delegate = delegate; - this.operationName = operationName; - this.decorator = decorator; } @Override @@ -35,11 +31,6 @@ public boolean contains(final Object o) { return delegate.contains(o); } - @Override - public Iterator iterator() { - return new TracingIterator(delegate.iterator(), operationName, decorator); - } - @Override public Object[] toArray() { return delegate.toArray(); @@ -137,6 +128,10 @@ public ListIterator listIterator(final int index) { @Override public List subList(final int fromIndex, final int toIndex) { - return new TracingList(delegate.subList(fromIndex, toIndex), operationName, decorator); + // TODO: the API for subList is not really good to instrument it in context of Kafka + // Consumer so we will not do that for now + // Kafka is essentially a sequential commit log. We should only enable tracing when traversing + // sequentially with an iterator + return delegate.subList(fromIndex, toIndex); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index b22b6c30e48..3ea04550aa3 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -1,6 +1,11 @@ import datadog.trace.agent.test.AgentTestRunner +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord -import org.junit.ClassRule +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition +import org.junit.Rule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate @@ -9,7 +14,6 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils -import spock.lang.Shared import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit @@ -17,8 +21,7 @@ import java.util.concurrent.TimeUnit class KafkaClientTest extends AgentTestRunner { static final SHARED_TOPIC = "shared.topic" - @Shared - @ClassRule + @Rule KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC) def "test kafka produce and consume"() { @@ -121,4 +124,83 @@ class KafkaClientTest extends AgentTestRunner { container?.stop() } + def "test records(TopicPartition) kafka consume"() { + setup: + + // set up the Kafka consumer properties + def kafkaPartition = 0 + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + + then: + TEST_WRITER.waitForTraces(1) + def records = new LinkedBlockingQueue>() + def pollResult = KafkaTestUtils.getRecords(consumer) + + def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() + + def first = null + if (recs.hasNext()) { + first = recs.next() + } + + then: + recs.hasNext() == false + first.value() == greeting + first.key() == null + + assertTraces(2) { + trace(0, 1) { + // PRODUCER span 0 + span(0) { + serviceName "kafka" + operationName "kafka.produce" + resourceName "Produce Topic $SHARED_TOPIC" + spanType "queue" + errored false + parent() + tags { + "component" "java-kafka" + "span.kind" "producer" + "kafka.partition" { it >= 0 } + defaultTags(true) + } + } + } + trace(1, 1) { + // CONSUMER span 0 + span(0) { + serviceName "kafka" + operationName "kafka.consume" + resourceName "Consume Topic $SHARED_TOPIC" + spanType "queue" + errored false + childOf TEST_WRITER[0][0] + tags { + "component" "java-kafka" + "span.kind" "consumer" + "partition" { it >= 0 } + "offset" 0 + defaultTags(true) + } + } + } + } + + cleanup: + consumer.close() + producer.close() + + } + }