From 2627a452c0a43ad0c5f9454151684e9cb27b2cd6 Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Thu, 18 Jul 2019 23:09:33 +0100 Subject: [PATCH 1/2] unit tests for records(TopicPartition) method of kafka-clients instrumentation. introduce safeguards when instrumenting iterator() so that duplication of traces is not allowed. do not allow subList() to be instrumented --- LICENSE-3rdparty.csv | 1 + .../kafka_clients/TracingIterable.java | 14 ++- .../kafka_clients/TracingList.java | 23 ++--- .../src/test/groovy/KafkaClientTest.groovy | 93 +++++++++++++++++++ 4 files changed, 119 insertions(+), 12 deletions(-) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 2cd62aec64b..56bc2c41cde 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -9,3 +9,4 @@ logback.xml,net.logstash.logback,Apache-2.0, import (test),org.junit,EPL-1.0,Copyright © 2002-2017 JUnit. All Rights Reserved. import (test),org.assertj,Apache-2.0,Copyright 2012-2017 the original author or authors. import (test),org.mockito,MIT,Copyright (c) 2007 Mockito contributors +kafka-clients: method records(TopicPartition) partial+test,"Aspect Software, Inc.", Apache-2.0, "Copyright (C) Aspect Software, Inc." diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java index efe80cb1a38..31704720628 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingIterable.java @@ -7,6 +7,7 @@ public class TracingIterable implements Iterable { private final Iterable delegate; private final String operationName; private final KafkaDecorator decorator; + private boolean firstIterator = true; public TracingIterable( final Iterable delegate, @@ -19,6 +20,17 @@ public TracingIterable( @Override public Iterator iterator() { - return new TracingIterator(delegate.iterator(), operationName, decorator); + Iterator it; + // We should only return one iterator with tracing. + // However, this is not thread-safe, but usually the first (hopefully only) traversal of + // ConsumerRecords is performed in the same thread that called poll() + if (this.firstIterator) { + it = new TracingIterator(delegate.iterator(), operationName, decorator); + firstIterator = false; + } else { + it = delegate.iterator(); + } + + return it; } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java index f7e12cb233a..2c6fb8c86f5 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java @@ -6,18 +6,15 @@ import java.util.ListIterator; import org.apache.kafka.clients.consumer.ConsumerRecord; -public class TracingList implements List { +public class TracingList extends TracingIterable implements List { private final List delegate; - private final String operationName; - private final KafkaDecorator decorator; public TracingList( final List delegate, final String operationName, final KafkaDecorator decorator) { + super(delegate, operationName, decorator); this.delegate = delegate; - this.operationName = operationName; - this.decorator = decorator; } @Override @@ -35,11 +32,6 @@ public boolean contains(final Object o) { return delegate.contains(o); } - @Override - public Iterator iterator() { - return new TracingIterator(delegate.iterator(), operationName, decorator); - } - @Override public Object[] toArray() { return delegate.toArray(); @@ -121,6 +113,11 @@ public int lastIndexOf(final Object o) { return delegate.lastIndexOf(o); } + @Override + public Iterator iterator() { + return super.iterator(); + } + @Override public ListIterator listIterator() { // TODO: the API for ListIterator is not really good to instrument it in context of Kafka @@ -137,6 +134,10 @@ public ListIterator listIterator(final int index) { @Override public List subList(final int fromIndex, final int toIndex) { - return new TracingList(delegate.subList(fromIndex, toIndex), operationName, decorator); + // TODO: the API for subList is not really good to instrument it in context of Kafka + // Consumer so we will not do that for now + // Kafka is essentially a sequential commit log. We should only enable tracing when traversing + // sequentially with an iterator + return delegate.subList(fromIndex, toIndex); } } diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index b22b6c30e48..094e670b7bc 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -1,5 +1,10 @@ import datadog.trace.agent.test.AgentTestRunner +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.clients.consumer.KafkaConsumer +import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.common.TopicPartition import org.junit.ClassRule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory @@ -119,6 +124,94 @@ class KafkaClientTest extends AgentTestRunner { cleanup: producerFactory.stop() container?.stop() + embeddedKafka.after() + } + + def "test records(TopicPartition) kafka consume"() { + setup: + embeddedKafka.before() + + // set up the Kafka consumer properties + def kafkaPartition = 0 + def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka) + consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest") + def consumer = new KafkaConsumer(consumerProperties) + + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer(senderProps) + + consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition))) + + when: + def greeting = "Hello from MockConsumer!" + producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) + TEST_WRITER.waitForTraces(1) + + then: + + def records = new LinkedBlockingQueue>() + def pollResult = KafkaTestUtils.getRecords(consumer) + + def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() + + def isFirst = true + while (recs.hasNext()) { + if(!isFirst) { + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + } + records.add(recs.next()) + isFirst = false + } + TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces + + then: + def first = records.poll(5, TimeUnit.SECONDS) + first.value() == greeting + first.key() == null + + assertTraces(2) { + trace(0, 1) { + // PRODUCER span 0 + span(0) { + serviceName "kafka" + operationName "kafka.produce" + resourceName "Produce Topic $SHARED_TOPIC" + spanType "queue" + errored false + parent() + tags { + "component" "java-kafka" + "span.kind" "producer" + "kafka.partition" { it >= 0 } + defaultTags(true) + } + } + } + trace(1, 1) { + // CONSUMER span 0 + span(0) { + serviceName "kafka" + operationName "kafka.consume" + resourceName "Consume Topic $SHARED_TOPIC" + spanType "queue" + errored false + childOf TEST_WRITER[0][0] + tags { + "component" "java-kafka" + "span.kind" "consumer" + "partition" { it >= 0 } + "offset" 0 + defaultTags(true) + } + } + } + } + + cleanup: + consumer.close() + producer.close() + embeddedKafka.after() + } } From ef5a006df2ccef508e2455f2e92baa6797977b3c Mon Sep 17 00:00:00 2001 From: Pedro Silva Date: Fri, 19 Jul 2019 16:45:34 +0100 Subject: [PATCH 2/2] changes requested: remove iterator() implementation from TracingList. refactor unit tests to expect only one element to be consumed. Kafka embedded instance as a Rule --- LICENSE-3rdparty.csv | 1 - .../kafka_clients/TracingList.java | 6 ----- .../src/test/groovy/KafkaClientTest.groovy | 25 ++++++------------- 3 files changed, 7 insertions(+), 25 deletions(-) diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 56bc2c41cde..2cd62aec64b 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -9,4 +9,3 @@ logback.xml,net.logstash.logback,Apache-2.0, import (test),org.junit,EPL-1.0,Copyright © 2002-2017 JUnit. All Rights Reserved. import (test),org.assertj,Apache-2.0,Copyright 2012-2017 the original author or authors. import (test),org.mockito,MIT,Copyright (c) 2007 Mockito contributors -kafka-clients: method records(TopicPartition) partial+test,"Aspect Software, Inc.", Apache-2.0, "Copyright (C) Aspect Software, Inc." diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java index 2c6fb8c86f5..cd8bddbbfd5 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TracingList.java @@ -1,7 +1,6 @@ package datadog.trace.instrumentation.kafka_clients; import java.util.Collection; -import java.util.Iterator; import java.util.List; import java.util.ListIterator; import org.apache.kafka.clients.consumer.ConsumerRecord; @@ -113,11 +112,6 @@ public int lastIndexOf(final Object o) { return delegate.lastIndexOf(o); } - @Override - public Iterator iterator() { - return super.iterator(); - } - @Override public ListIterator listIterator() { // TODO: the API for ListIterator is not really good to instrument it in context of Kafka diff --git a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy index 094e670b7bc..3ea04550aa3 100644 --- a/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy +++ b/dd-java-agent/instrumentation/kafka-clients-0.11/src/test/groovy/KafkaClientTest.groovy @@ -5,7 +5,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.TopicPartition -import org.junit.ClassRule +import org.junit.Rule import org.springframework.kafka.core.DefaultKafkaConsumerFactory import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.KafkaTemplate @@ -14,7 +14,6 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils -import spock.lang.Shared import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.TimeUnit @@ -22,8 +21,7 @@ import java.util.concurrent.TimeUnit class KafkaClientTest extends AgentTestRunner { static final SHARED_TOPIC = "shared.topic" - @Shared - @ClassRule + @Rule KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC) def "test kafka produce and consume"() { @@ -124,12 +122,10 @@ class KafkaClientTest extends AgentTestRunner { cleanup: producerFactory.stop() container?.stop() - embeddedKafka.after() } def "test records(TopicPartition) kafka consume"() { setup: - embeddedKafka.before() // set up the Kafka consumer properties def kafkaPartition = 0 @@ -145,27 +141,21 @@ class KafkaClientTest extends AgentTestRunner { when: def greeting = "Hello from MockConsumer!" producer.send(new ProducerRecord(SHARED_TOPIC, kafkaPartition, null, greeting)) - TEST_WRITER.waitForTraces(1) then: - + TEST_WRITER.waitForTraces(1) def records = new LinkedBlockingQueue>() def pollResult = KafkaTestUtils.getRecords(consumer) def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator() - def isFirst = true - while (recs.hasNext()) { - if(!isFirst) { - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces - } - records.add(recs.next()) - isFirst = false + def first = null + if (recs.hasNext()) { + first = recs.next() } - TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces then: - def first = records.poll(5, TimeUnit.SECONDS) + recs.hasNext() == false first.value() == greeting first.key() == null @@ -210,7 +200,6 @@ class KafkaClientTest extends AgentTestRunner { cleanup: consumer.close() producer.close() - embeddedKafka.after() }