Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ public class TracingIterable implements Iterable<ConsumerRecord> {
private final Iterable<ConsumerRecord> delegate;
private final String operationName;
private final KafkaDecorator decorator;
private boolean firstIterator = true;

public TracingIterable(
final Iterable<ConsumerRecord> delegate,
Expand All @@ -19,6 +20,17 @@ public TracingIterable(

@Override
public Iterator<ConsumerRecord> iterator() {
return new TracingIterator(delegate.iterator(), operationName, decorator);
Iterator<ConsumerRecord> it;
// We should only return one iterator with tracing.
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
// ConsumerRecords is performed in the same thread that called poll()
if (this.firstIterator) {
it = new TracingIterator(delegate.iterator(), operationName, decorator);
firstIterator = false;
} else {
it = delegate.iterator();
}

return it;
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,19 @@
package datadog.trace.instrumentation.kafka_clients;

import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class TracingList implements List<ConsumerRecord> {
public class TracingList extends TracingIterable implements List<ConsumerRecord> {
private final List<ConsumerRecord> delegate;
private final String operationName;
private final KafkaDecorator decorator;

public TracingList(
final List<ConsumerRecord> delegate,
final String operationName,
final KafkaDecorator decorator) {
super(delegate, operationName, decorator);
this.delegate = delegate;
this.operationName = operationName;
this.decorator = decorator;
}

@Override
Expand All @@ -35,11 +31,6 @@ public boolean contains(final Object o) {
return delegate.contains(o);
}

@Override
public Iterator<ConsumerRecord> iterator() {
return new TracingIterator(delegate.iterator(), operationName, decorator);
}

@Override
public Object[] toArray() {
return delegate.toArray();
Expand Down Expand Up @@ -137,6 +128,10 @@ public ListIterator<ConsumerRecord> listIterator(final int index) {

@Override
public List<ConsumerRecord> subList(final int fromIndex, final int toIndex) {
return new TracingList(delegate.subList(fromIndex, toIndex), operationName, decorator);
// TODO: the API for subList is not really good to instrument it in context of Kafka
// Consumer so we will not do that for now
// Kafka is essentially a sequential commit log. We should only enable tracing when traversing
// sequentially with an iterator
return delegate.subList(fromIndex, toIndex);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import datadog.trace.agent.test.AgentTestRunner
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.junit.ClassRule
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.junit.Rule
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
Expand All @@ -9,16 +14,14 @@ import org.springframework.kafka.listener.MessageListener
import org.springframework.kafka.test.rule.KafkaEmbedded
import org.springframework.kafka.test.utils.ContainerTestUtils
import org.springframework.kafka.test.utils.KafkaTestUtils
import spock.lang.Shared

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

class KafkaClientTest extends AgentTestRunner {
static final SHARED_TOPIC = "shared.topic"

@Shared
@ClassRule
@Rule
KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SHARED_TOPIC)

def "test kafka produce and consume"() {
Expand Down Expand Up @@ -121,4 +124,83 @@ class KafkaClientTest extends AgentTestRunner {
container?.stop()
}

def "test records(TopicPartition) kafka consume"() {
setup:

// set up the Kafka consumer properties
def kafkaPartition = 0
def consumerProperties = KafkaTestUtils.consumerProps("sender", "false", embeddedKafka)
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
def consumer = new KafkaConsumer<String,String>(consumerProperties)

def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
def producer = new KafkaProducer(senderProps)

consumer.assign(Arrays.asList(new TopicPartition(SHARED_TOPIC, kafkaPartition)))

when:
def greeting = "Hello from MockConsumer!"
producer.send(new ProducerRecord<Integer, String>(SHARED_TOPIC, kafkaPartition, null, greeting))

then:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you may be able to simplify this to something like:

    when:
    def greeting = "Hello from MockConsumer!"
    producer.send(new ProducerRecord<Integer, String>(SHARED_TOPIC, kafkaPartition, null, greeting))

    then:
    TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
    def pollResult = KafkaTestUtils.getRecords(consumer)
    def records = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator()

    def first
    while (records.hasNext()) {
      first = records.next()
      break
    }
    records.hasNext() == false
    first.value() == greeting
    first.key() == null

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will change the WAIT_TRACES of producer to the then block:

On other note, I don't need to wait for traces when consuming?

Copy link
Copy Markdown
Contributor

@mar-kolya mar-kolya Jul 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertTraces(2) { waits to see exactly two traces. If I understand things correctly you need TEST_WRITER.waitForTraces(1) only once right before consuming - to make sure producer traces have been shipped - so two traces do not appear out of order at assertion time.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep makes sense. I have updated the PR as per your comments

TEST_WRITER.waitForTraces(1)
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
def pollResult = KafkaTestUtils.getRecords(consumer)

def recs = pollResult.records(new TopicPartition(SHARED_TOPIC, kafkaPartition)).iterator()

def first = null
if (recs.hasNext()) {
first = recs.next()
}

then:
recs.hasNext() == false
first.value() == greeting
first.key() == null

assertTraces(2) {
trace(0, 1) {
// PRODUCER span 0
span(0) {
serviceName "kafka"
operationName "kafka.produce"
resourceName "Produce Topic $SHARED_TOPIC"
spanType "queue"
errored false
parent()
tags {
"component" "java-kafka"
"span.kind" "producer"
"kafka.partition" { it >= 0 }
defaultTags(true)
}
}
}
trace(1, 1) {
// CONSUMER span 0
span(0) {
serviceName "kafka"
operationName "kafka.consume"
resourceName "Consume Topic $SHARED_TOPIC"
spanType "queue"
errored false
childOf TEST_WRITER[0][0]
tags {
"component" "java-kafka"
"span.kind" "consumer"
"partition" { it >= 0 }
"offset" 0
defaultTags(true)
}
}
}
}

cleanup:
consumer.close()
producer.close()

}

}