Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ out/
######################
.vscode

# Cursor #
##########
.cursor

# Others #
##########
/logs/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
@SuppressWarnings('UnusedPrivateField')
private final Set<DataStreamsTags> backlogs = []

@SuppressWarnings('UnusedPrivateField')
private final List<StatsBucket.SchemaKey> schemaRegistryUsages = []

private final Set<String> serviceNameOverrides = []

@Override
Expand All @@ -33,6 +36,11 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
this.@backlogs.add(backlog.getKey())
}
}
if (bucket.schemaRegistryUsages != null) {
for (Map.Entry<StatsBucket.SchemaKey, Long> usage : bucket.schemaRegistryUsages) {
this.@schemaRegistryUsages.add(usage.getKey())
}
}
}
}

Expand All @@ -52,10 +60,15 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
Collections.unmodifiableList(new ArrayList<>(this.@backlogs))
}

synchronized List<StatsBucket.SchemaKey> getSchemaRegistryUsages() {
Collections.unmodifiableList(new ArrayList<>(this.@schemaRegistryUsages))
}

synchronized void clear() {
this.@payloads.clear()
this.@groups.clear()
this.@backlogs.clear()
this.@schemaRegistryUsages.clear()
}

void waitForPayloads(int count, long timeout = TimeUnit.SECONDS.toMillis(3)) {
Expand All @@ -70,6 +83,10 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter {
waitFor(count, timeout, this.@backlogs)
}

void waitForSchemaRegistryUsages(int count, long timeout = TimeUnit.SECONDS.toMillis(3)) {
waitFor(count, timeout, this.@schemaRegistryUsages)
}

private static void waitFor(int count, long timeout, Collection collection) {
long deadline = System.currentTimeMillis() + timeout
while (System.currentTimeMillis() < deadline) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
apply from: "$rootDir/gradle/java.gradle"

muzzle {
pass {
group = "io.confluent"
module = "kafka-schema-registry-client"
versions = "[7.0.0,)"
assertInverse = true
}
}

dependencies {
compileOnly project(':dd-java-agent:instrumentation:kafka:kafka-common')
compileOnly group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.0.0'
compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.0.0'
compileOnly group: 'io.confluent', name: 'kafka-protobuf-serializer', version: '7.0.0'
compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.0.0'

testImplementation project(':dd-java-agent:instrumentation:kafka:kafka-common')
testImplementation group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.5.2'
testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.5.2'
testImplementation group: 'io.confluent', name: 'kafka-protobuf-serializer', version: '7.5.1'
testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.5.0'
testImplementation group: 'org.apache.avro', name: 'avro', version: '1.11.0'
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package datadog.trace.instrumentation.confluentschemaregistry;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.Instrumenter.MethodTransformer;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.common.serialization.Deserializer;

/**
* Instruments Confluent Schema Registry deserializers (Avro, Protobuf, and JSON) to capture
* deserialization operations.
*/
@AutoService(InstrumenterModule.class)
public class KafkaDeserializerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {

public KafkaDeserializerInstrumentation() {
super("confluent-schema-registry", "kafka");
}

@Override
public String[] knownMatchingTypes() {
return new String[] {
"io.confluent.kafka.serializers.KafkaAvroDeserializer",
"io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer",
"io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer"
};
}

@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
packageName + ".SchemaIdExtractor"
};
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>();
contextStores.put("org.apache.kafka.common.serialization.Deserializer", "java.lang.Boolean");
return contextStores;
}

@Override
public void methodAdvice(MethodTransformer transformer) {
// Instrument configure to capture isKey value
transformer.applyAdvice(
isMethod()
.and(named("configure"))
.and(isPublic())
.and(takesArguments(2))
.and(takesArgument(1, boolean.class)),
getClass().getName() + "$ConfigureAdvice");

// Instrument deserialize(String topic, Headers headers, byte[] data)
// The 2-arg version calls this one, so we only need to instrument this to avoid duplicates
transformer.applyAdvice(
isMethod()
.and(named("deserialize"))
.and(isPublic())
.and(takesArguments(3))
.and(takesArgument(0, String.class))
.and(takesArgument(2, byte[].class)),
getClass().getName() + "$DeserializeAdvice");
}

public static class ConfigureAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This Deserializer deserializer, @Advice.Argument(1) boolean isKey) {
// Store the isKey value in InstrumentationContext for later use
InstrumentationContext.get(Deserializer.class, Boolean.class).put(deserializer, isKey);
}
}

public static class DeserializeAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.This Deserializer deserializer,
@Advice.Argument(0) String topic,
@Advice.Argument(2) byte[] data,
@Advice.Return Object result,
@Advice.Thrown Throwable throwable) {

// Get isKey from InstrumentationContext (stored during configure)
Boolean isKeyObj =
InstrumentationContext.get(Deserializer.class, Boolean.class).get(deserializer);
boolean isKey = isKeyObj != null && isKeyObj;

// Get cluster ID from thread-local (set by Kafka consumer instrumentation)
String clusterId = ClusterIdHolder.get();

boolean isSuccess = throwable == null;
int schemaId = isSuccess ? SchemaIdExtractor.extractSchemaId(data) : -1;

// Record the schema registry usage
AgentTracer.get()
.getDataStreamsMonitoring()
.reportSchemaRegistryUsage(topic, clusterId, schemaId, isSuccess, isKey, "deserialize");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package datadog.trace.instrumentation.confluentschemaregistry;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.Instrumenter.MethodTransformer;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.instrumentation.kafka_common.ClusterIdHolder;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import org.apache.kafka.common.serialization.Serializer;

/**
* Instruments Confluent Schema Registry serializers (Avro, Protobuf, and JSON) to capture
* serialization operations.
*/
@AutoService(InstrumenterModule.class)
public class KafkaSerializerInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice {

public KafkaSerializerInstrumentation() {
super("confluent-schema-registry", "kafka");
}

@Override
public String[] knownMatchingTypes() {
return new String[] {
"io.confluent.kafka.serializers.KafkaAvroSerializer",
"io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer",
"io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer"
};
}

@Override
public String[] helperClassNames() {
return new String[] {
"datadog.trace.instrumentation.kafka_common.ClusterIdHolder",
packageName + ".SchemaIdExtractor"
};
}

@Override
public Map<String, String> contextStore() {
Map<String, String> contextStores = new HashMap<>();
contextStores.put("org.apache.kafka.common.serialization.Serializer", "java.lang.Boolean");
return contextStores;
}

@Override
public void methodAdvice(MethodTransformer transformer) {
// Instrument configure to capture isKey value
transformer.applyAdvice(
isMethod()
.and(named("configure"))
.and(isPublic())
.and(takesArguments(2))
.and(takesArgument(1, boolean.class)),
getClass().getName() + "$ConfigureAdvice");

// Instrument both serialize(String topic, Object data)
// and serialize(String topic, Headers headers, Object data) for Kafka 2.1+
transformer.applyAdvice(
isMethod()
.and(named("serialize"))
.and(isPublic())
.and(takesArgument(0, String.class))
.and(returns(byte[].class)),
getClass().getName() + "$SerializeAdvice");
}

public static class ConfigureAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(
@Advice.This Serializer serializer, @Advice.Argument(1) boolean isKey) {
// Store the isKey value in InstrumentationContext for later use
InstrumentationContext.get(Serializer.class, Boolean.class).put(serializer, isKey);
}
}

public static class SerializeAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void onExit(
@Advice.This Serializer serializer,
@Advice.Argument(0) String topic,
@Advice.Return byte[] result,
@Advice.Thrown Throwable throwable) {

// Get isKey from InstrumentationContext (stored during configure)
Boolean isKeyObj =
InstrumentationContext.get(Serializer.class, Boolean.class).get(serializer);
boolean isKey = isKeyObj != null && isKeyObj;

// Get cluster ID from thread-local (set by Kafka producer instrumentation)
String clusterId = ClusterIdHolder.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Should we clear the cluster Id from the holder? Just to be extra careful.
As of today, I do not see a path that can bring us here without having passed for poll (which sets the new value)...but just in case in case of api changes in the future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are clearing it in the place we are setting it.
For the producer, we could clear it after using it, but for the consumer, it's not possible, because you can poll a batch of messages, so if we clear it after using it, we would only have the Kafka cluster ID for the first message.

So for consistency, I was thinking of only setting the Kafka cluster ID from the Kafka instrumentation. What do you think?

Copy link
Member

@labbati labbati Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes total sense about the issue with batching.

So for consistency, I was thinking of only setting the Kafka cluster ID from the Kafka instrumentation. What do you think?

Unsure of what you mean, but the current implementation (setting in poll enter + the ClusterIdHolder.clear() that you just added to the poll exit) feels like enough to me. Do you have a way in mind to improve on it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that's what I was thinking of 👍


boolean isSuccess = throwable == null;
int schemaId = isSuccess ? SchemaIdExtractor.extractSchemaId(result) : -1;

// Record the schema registry usage
AgentTracer.get()
.getDataStreamsMonitoring()
.reportSchemaRegistryUsage(topic, clusterId, schemaId, isSuccess, isKey, "serialize");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package datadog.trace.instrumentation.confluentschemaregistry;

/**
* Helper class to extract schema ID from Confluent Schema Registry wire format. Wire format:
* [magic_byte][4-byte schema id][data]
*/
public class SchemaIdExtractor {
public static int extractSchemaId(byte[] data) {
if (data == null || data.length < 5 || data[0] != 0) {
return -1;
}

try {
// Confluent wire format: [magic_byte][4-byte schema id][data]
return ((data[1] & 0xFF) << 24)
| ((data[2] & 0xFF) << 16)
| ((data[3] & 0xFF) << 8)
| (data[4] & 0xFF);
} catch (Throwable ignored) {
return -1;
}
}
}
Loading
Loading