diff --git a/.gitignore b/.gitignore index c0dcdb32d1b..686905af989 100644 --- a/.gitignore +++ b/.gitignore @@ -46,6 +46,10 @@ out/ ###################### .vscode +# Cursor # +########## +.cursor + # Others # ########## /logs/* diff --git a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy index b963a0a08bc..9feb48f3a98 100644 --- a/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy +++ b/dd-java-agent/instrumentation-testing/src/main/groovy/datadog/trace/agent/test/datastreams/RecordingDatastreamsPayloadWriter.groovy @@ -19,6 +19,9 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { @SuppressWarnings('UnusedPrivateField') private final Set backlogs = [] + @SuppressWarnings('UnusedPrivateField') + private final List schemaRegistryUsages = [] + private final Set serviceNameOverrides = [] @Override @@ -33,6 +36,11 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { this.@backlogs.add(backlog.getKey()) } } + if (bucket.schemaRegistryUsages != null) { + for (Map.Entry usage : bucket.schemaRegistryUsages) { + this.@schemaRegistryUsages.add(usage.getKey()) + } + } } } @@ -52,10 +60,15 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { Collections.unmodifiableList(new ArrayList<>(this.@backlogs)) } + synchronized List getSchemaRegistryUsages() { + Collections.unmodifiableList(new ArrayList<>(this.@schemaRegistryUsages)) + } + synchronized void clear() { this.@payloads.clear() this.@groups.clear() this.@backlogs.clear() + this.@schemaRegistryUsages.clear() } void waitForPayloads(int count, long timeout = TimeUnit.SECONDS.toMillis(3)) { @@ -70,6 +83,10 @@ class RecordingDatastreamsPayloadWriter implements DatastreamsPayloadWriter { waitFor(count, timeout, this.@backlogs) } + void waitForSchemaRegistryUsages(int count, long timeout = TimeUnit.SECONDS.toMillis(3)) { + waitFor(count, timeout, this.@schemaRegistryUsages) + } + private static void waitFor(int count, long timeout, Collection collection) { long deadline = System.currentTimeMillis() + timeout while (System.currentTimeMillis() < deadline) { diff --git a/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/build.gradle b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/build.gradle new file mode 100644 index 00000000000..8e0a847fb6d --- /dev/null +++ b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/build.gradle @@ -0,0 +1,26 @@ +apply from: "$rootDir/gradle/java.gradle" + +muzzle { + pass { + group = "io.confluent" + module = "kafka-schema-registry-client" + versions = "[7.0.0,)" + assertInverse = true + } +} + +dependencies { + compileOnly project(':dd-java-agent:instrumentation:kafka:kafka-common') + compileOnly group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.0.0' + compileOnly group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.0.0' + compileOnly group: 'io.confluent', name: 'kafka-protobuf-serializer', version: '7.0.0' + compileOnly group: 'org.apache.kafka', name: 'kafka-clients', version: '3.0.0' + + testImplementation project(':dd-java-agent:instrumentation:kafka:kafka-common') + testImplementation group: 'io.confluent', name: 'kafka-schema-registry-client', version: '7.5.2' + testImplementation group: 'io.confluent', name: 'kafka-avro-serializer', version: '7.5.2' + testImplementation group: 'io.confluent', name: 'kafka-protobuf-serializer', version: '7.5.1' + testImplementation group: 'org.apache.kafka', name: 'kafka-clients', version: '3.5.0' + testImplementation group: 'org.apache.avro', name: 'avro', version: '1.11.0' +} + diff --git a/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/KafkaDeserializerInstrumentation.java b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/KafkaDeserializerInstrumentation.java new file mode 100644 index 00000000000..825bd0db51b --- /dev/null +++ b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/KafkaDeserializerInstrumentation.java @@ -0,0 +1,115 @@ +package datadog.trace.instrumentation.confluentschemaregistry; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.Instrumenter.MethodTransformer; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; +import java.util.HashMap; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.common.serialization.Deserializer; + +/** + * Instruments Confluent Schema Registry deserializers (Avro, Protobuf, and JSON) to capture + * deserialization operations. + */ +@AutoService(InstrumenterModule.class) +public class KafkaDeserializerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + public KafkaDeserializerInstrumentation() { + super("confluent-schema-registry", "kafka"); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "io.confluent.kafka.serializers.KafkaAvroDeserializer", + "io.confluent.kafka.serializers.json.KafkaJsonSchemaDeserializer", + "io.confluent.kafka.serializers.protobuf.KafkaProtobufDeserializer" + }; + } + + @Override + public String[] helperClassNames() { + return new String[] { + "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", + packageName + ".SchemaIdExtractor" + }; + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(); + contextStores.put("org.apache.kafka.common.serialization.Deserializer", "java.lang.Boolean"); + return contextStores; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // Instrument configure to capture isKey value + transformer.applyAdvice( + isMethod() + .and(named("configure")) + .and(isPublic()) + .and(takesArguments(2)) + .and(takesArgument(1, boolean.class)), + getClass().getName() + "$ConfigureAdvice"); + + // Instrument deserialize(String topic, Headers headers, byte[] data) + // The 2-arg version calls this one, so we only need to instrument this to avoid duplicates + transformer.applyAdvice( + isMethod() + .and(named("deserialize")) + .and(isPublic()) + .and(takesArguments(3)) + .and(takesArgument(0, String.class)) + .and(takesArgument(2, byte[].class)), + getClass().getName() + "$DeserializeAdvice"); + } + + public static class ConfigureAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This Deserializer deserializer, @Advice.Argument(1) boolean isKey) { + // Store the isKey value in InstrumentationContext for later use + InstrumentationContext.get(Deserializer.class, Boolean.class).put(deserializer, isKey); + } + } + + public static class DeserializeAdvice { + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.This Deserializer deserializer, + @Advice.Argument(0) String topic, + @Advice.Argument(2) byte[] data, + @Advice.Return Object result, + @Advice.Thrown Throwable throwable) { + + // Get isKey from InstrumentationContext (stored during configure) + Boolean isKeyObj = + InstrumentationContext.get(Deserializer.class, Boolean.class).get(deserializer); + boolean isKey = isKeyObj != null && isKeyObj; + + // Get cluster ID from thread-local (set by Kafka consumer instrumentation) + String clusterId = ClusterIdHolder.get(); + + boolean isSuccess = throwable == null; + int schemaId = isSuccess ? SchemaIdExtractor.extractSchemaId(data) : -1; + + // Record the schema registry usage + AgentTracer.get() + .getDataStreamsMonitoring() + .reportSchemaRegistryUsage(topic, clusterId, schemaId, isSuccess, isKey, "deserialize"); + } + } +} diff --git a/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/KafkaSerializerInstrumentation.java b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/KafkaSerializerInstrumentation.java new file mode 100644 index 00000000000..7ad15382669 --- /dev/null +++ b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/KafkaSerializerInstrumentation.java @@ -0,0 +1,114 @@ +package datadog.trace.instrumentation.confluentschemaregistry; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.isPublic; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.Instrumenter.MethodTransformer; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; +import java.util.HashMap; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import org.apache.kafka.common.serialization.Serializer; + +/** + * Instruments Confluent Schema Registry serializers (Avro, Protobuf, and JSON) to capture + * serialization operations. + */ +@AutoService(InstrumenterModule.class) +public class KafkaSerializerInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForKnownTypes, Instrumenter.HasMethodAdvice { + + public KafkaSerializerInstrumentation() { + super("confluent-schema-registry", "kafka"); + } + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "io.confluent.kafka.serializers.KafkaAvroSerializer", + "io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer", + "io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer" + }; + } + + @Override + public String[] helperClassNames() { + return new String[] { + "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", + packageName + ".SchemaIdExtractor" + }; + } + + @Override + public Map contextStore() { + Map contextStores = new HashMap<>(); + contextStores.put("org.apache.kafka.common.serialization.Serializer", "java.lang.Boolean"); + return contextStores; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + // Instrument configure to capture isKey value + transformer.applyAdvice( + isMethod() + .and(named("configure")) + .and(isPublic()) + .and(takesArguments(2)) + .and(takesArgument(1, boolean.class)), + getClass().getName() + "$ConfigureAdvice"); + + // Instrument both serialize(String topic, Object data) + // and serialize(String topic, Headers headers, Object data) for Kafka 2.1+ + transformer.applyAdvice( + isMethod() + .and(named("serialize")) + .and(isPublic()) + .and(takesArgument(0, String.class)) + .and(returns(byte[].class)), + getClass().getName() + "$SerializeAdvice"); + } + + public static class ConfigureAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onExit( + @Advice.This Serializer serializer, @Advice.Argument(1) boolean isKey) { + // Store the isKey value in InstrumentationContext for later use + InstrumentationContext.get(Serializer.class, Boolean.class).put(serializer, isKey); + } + } + + public static class SerializeAdvice { + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit( + @Advice.This Serializer serializer, + @Advice.Argument(0) String topic, + @Advice.Return byte[] result, + @Advice.Thrown Throwable throwable) { + + // Get isKey from InstrumentationContext (stored during configure) + Boolean isKeyObj = + InstrumentationContext.get(Serializer.class, Boolean.class).get(serializer); + boolean isKey = isKeyObj != null && isKeyObj; + + // Get cluster ID from thread-local (set by Kafka producer instrumentation) + String clusterId = ClusterIdHolder.get(); + + boolean isSuccess = throwable == null; + int schemaId = isSuccess ? SchemaIdExtractor.extractSchemaId(result) : -1; + + // Record the schema registry usage + AgentTracer.get() + .getDataStreamsMonitoring() + .reportSchemaRegistryUsage(topic, clusterId, schemaId, isSuccess, isKey, "serialize"); + } + } +} diff --git a/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/SchemaIdExtractor.java b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/SchemaIdExtractor.java new file mode 100644 index 00000000000..786ff25bc21 --- /dev/null +++ b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/main/java/datadog/trace/instrumentation/confluentschemaregistry/SchemaIdExtractor.java @@ -0,0 +1,23 @@ +package datadog.trace.instrumentation.confluentschemaregistry; + +/** + * Helper class to extract schema ID from Confluent Schema Registry wire format. Wire format: + * [magic_byte][4-byte schema id][data] + */ +public class SchemaIdExtractor { + public static int extractSchemaId(byte[] data) { + if (data == null || data.length < 5 || data[0] != 0) { + return -1; + } + + try { + // Confluent wire format: [magic_byte][4-byte schema id][data] + return ((data[1] & 0xFF) << 24) + | ((data[2] & 0xFF) << 16) + | ((data[3] & 0xFF) << 8) + | (data[4] & 0xFF); + } catch (Throwable ignored) { + return -1; + } + } +} diff --git a/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/test/groovy/ConfluentSchemaRegistryDataStreamsTest.groovy b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/test/groovy/ConfluentSchemaRegistryDataStreamsTest.groovy new file mode 100644 index 00000000000..25806cc42a4 --- /dev/null +++ b/dd-java-agent/instrumentation/confluent-schema-registry/confluent-schema-registry-7.0/src/test/groovy/ConfluentSchemaRegistryDataStreamsTest.groovy @@ -0,0 +1,122 @@ +import datadog.trace.agent.test.InstrumentationSpecification +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder +import io.confluent.kafka.serializers.KafkaAvroDeserializer +import io.confluent.kafka.serializers.KafkaAvroSerializer +import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericRecord +import spock.lang.Shared + +import java.util.concurrent.TimeUnit + +/** + * Tests that Schema Registry usage is tracked in Data Streams Monitoring + * for both serialization and deserialization operations. + */ +class ConfluentSchemaRegistryDataStreamsTest extends InstrumentationSpecification { + @Shared + SchemaRegistryClient schemaRegistryClient + + @Shared + Schema testSchema + + @Override + protected boolean isDataStreamsEnabled() { + return true + } + + @Override + protected long dataStreamsBucketDuration() { + return TimeUnit.SECONDS.toNanos(1) + } + + void setup() { + schemaRegistryClient = new MockSchemaRegistryClient() + testSchema = new Schema.Parser().parse(""" + { + "type": "record", + "name": "TestRecord", + "fields": [ + {"name": "field1", "type": "string"}, + {"name": "field2", "type": "int"} + ] + } + """) + } + + def "test schema registry tracks both serialize and deserialize operations"() { + setup: + def topicName = "test-topic" + def testClusterId = "test-cluster" + def config = [ + "schema.registry.url": "mock://test-url", + "auto.register.schemas": "true" + ] + + // Create serializer and deserializer + def serializer = new KafkaAvroSerializer(schemaRegistryClient) + serializer.configure(config, false) // false = value serializer + + def deserializer = new KafkaAvroDeserializer(schemaRegistryClient) + deserializer.configure(config, false) // false = value deserializer + + // Create a test record + GenericRecord record = new GenericData.Record(testSchema) + record.put("field1", "test-value") + record.put("field2", 42) + + when: "we produce a message (serialize)" + ClusterIdHolder.set(testClusterId) + byte[] serialized = serializer.serialize(topicName, record) + + and: "we consume the message (deserialize)" + ClusterIdHolder.set(testClusterId) + def deserialized = deserializer.deserialize(topicName, serialized) + + and: "we wait for DSM to flush" + Thread.sleep(1200) // Wait for bucket duration + buffer + TEST_DATA_STREAMS_MONITORING.report() + TEST_DATA_STREAMS_WRITER.waitForSchemaRegistryUsages(2, TimeUnit.SECONDS.toMillis(5)) + + then: "the message was serialized and deserialized successfully" + serialized != null + serialized.length > 0 + deserialized != null + deserialized.get("field1").toString() == "test-value" + deserialized.get("field2") == 42 + + and: "two schema registry usages were tracked" + def usages = TEST_DATA_STREAMS_WRITER.schemaRegistryUsages + usages.size() >= 2 + + and: "one is a serialize operation" + def serializeUsage = usages.find { u -> + u.topic == topicName && u.operation == "serialize" + } + serializeUsage != null + serializeUsage.schemaId > 0 // Valid schema ID + serializeUsage.isSuccess() == true + serializeUsage.isKey() == false + serializeUsage.clusterId == testClusterId + + and: "one is a deserialize operation" + def deserializeUsage = usages.find { u -> + u.topic == topicName && u.operation == "deserialize" + } + deserializeUsage != null + deserializeUsage.schemaId > 0 // Valid schema ID + deserializeUsage.isSuccess() == true + deserializeUsage.isKey() == false + deserializeUsage.clusterId == testClusterId + + and: "both operations used the same schema ID" + serializeUsage.schemaId == deserializeUsage.schemaId + + cleanup: + serializer.close() + deserializer.close() + ClusterIdHolder.clear() + } +} diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java index 33056572e87..ada3a013d74 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaConsumerInfoInstrumentation.java @@ -22,6 +22,7 @@ import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -78,7 +79,9 @@ public String instrumentedType() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo", + packageName + ".KafkaDecorator", + packageName + ".KafkaConsumerInfo", + "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", }; } @@ -204,7 +207,21 @@ public static void muzzleCheck(ConsumerRecord record) { */ public static class RecordsAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope onEnter() { + public static AgentScope onEnter(@Advice.This KafkaConsumer consumer) { + // Set cluster ID in ClusterIdHolder for Schema Registry instrumentation + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(KafkaConsumer.class, KafkaConsumerInfo.class).get(consumer); + if (kafkaConsumerInfo != null && Config.get().isDataStreamsEnabled()) { + Metadata consumerMetadata = kafkaConsumerInfo.getClientMetadata(); + if (consumerMetadata != null) { + String clusterId = + InstrumentationContext.get(Metadata.class, String.class).get(consumerMetadata); + if (clusterId != null) { + ClusterIdHolder.set(clusterId); + } + } + } + if (traceConfig().isDataStreamsEnabled()) { final AgentSpan span = startSpan(KAFKA_POLL); return activateSpan(span); @@ -227,6 +244,9 @@ public static void captureGroup( } recordsCount = records.count(); } + // Clear cluster ID from Schema Registry instrumentation + ClusterIdHolder.clear(); + if (scope == null) { return; } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java index ba66a01b6da..2dc6059f7fc 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/KafkaProducerInstrumentation.java @@ -35,6 +35,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import java.util.Map; import net.bytebuddy.asm.Advice; import net.bytebuddy.matcher.ElementMatcher; @@ -78,6 +79,7 @@ public String[] helperClassNames() { packageName + ".NoopTextMapInjectAdapter", packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext", + "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", packageName + ".AvroSchemaExtractor", }; } @@ -118,6 +120,11 @@ public static AgentScope onEnter( @Advice.Argument(value = 1, readOnly = false) Callback callback) { String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata); + // Set cluster ID for Schema Registry instrumentation + if (clusterId != null) { + ClusterIdHolder.set(clusterId); + } + final AgentSpan parent = activeSpan(); final AgentSpan span = startSpan(KAFKA_PRODUCE); PRODUCER_DECORATE.afterStart(span); @@ -184,6 +191,9 @@ record = @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + // Clear cluster ID from Schema Registry instrumentation + ClusterIdHolder.clear(); + PRODUCER_DECORATE.onError(scope, throwable); PRODUCER_DECORATE.beforeFinish(scope); scope.close(); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 9870627fda3..686c550a8b6 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1,4 +1,5 @@ import datadog.trace.api.datastreams.DataStreamsTags +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder import static datadog.trace.agent.test.utils.TraceUtils.basicSpan import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace @@ -250,6 +251,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { received.value() == greeting received.key() == null + // verify ClusterIdHolder was properly cleaned up after produce and consume + ClusterIdHolder.get() == null + int nTraces = isDataStreamsEnabled() ? 3 : 2 int produceTraceIdx = nTraces - 1 TEST_WRITER.waitForTraces(nTraces) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java index 576982b9da4..334aa6c9aac 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaConsumerInfoInstrumentation.java @@ -67,7 +67,9 @@ public ElementMatcher hierarchyMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo", + packageName + ".KafkaDecorator", + packageName + ".KafkaConsumerInfo", + "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", }; } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java index bb0af11f3db..a06469fb494 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/KafkaProducerInstrumentation.java @@ -41,6 +41,7 @@ public String[] helperClassNames() { packageName + ".NoopTextMapInjectAdapter", packageName + ".KafkaProducerCallback", "datadog.trace.instrumentation.kafka_common.StreamingContext", + "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", packageName + ".AvroSchemaExtractor", }; } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/LegacyKafkaConsumerInfoInstrumentation.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/LegacyKafkaConsumerInfoInstrumentation.java index 73aa4534816..4b415b0bc97 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/LegacyKafkaConsumerInfoInstrumentation.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java/datadog/trace/instrumentation/kafka_clients38/LegacyKafkaConsumerInfoInstrumentation.java @@ -67,7 +67,9 @@ public ElementMatcher hierarchyMatcher() { @Override public String[] helperClassNames() { return new String[] { - packageName + ".KafkaDecorator", packageName + ".KafkaConsumerInfo", + packageName + ".KafkaDecorator", + packageName + ".KafkaConsumerInfo", + "datadog.trace.instrumentation.kafka_common.ClusterIdHolder", }; } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java index 8d86dca9ea8..be6bdc2500b 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/ProducerAdvice.java @@ -22,6 +22,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import net.bytebuddy.asm.Advice; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.producer.Callback; @@ -39,6 +40,12 @@ public static AgentScope onEnter( @Advice.Argument(value = 0, readOnly = false) ProducerRecord record, @Advice.Argument(value = 1, readOnly = false) Callback callback) { String clusterId = InstrumentationContext.get(Metadata.class, String.class).get(metadata); + + // Set cluster ID for Schema Registry instrumentation + if (clusterId != null) { + ClusterIdHolder.set(clusterId); + } + final AgentSpan parent = activeSpan(); final AgentSpan span = startSpan(KAFKA_PRODUCE); PRODUCER_DECORATE.afterStart(span); @@ -106,6 +113,9 @@ record = @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) public static void stopSpan( @Advice.Enter final AgentScope scope, @Advice.Thrown final Throwable throwable) { + // Clear cluster ID from Schema Registry instrumentation + ClusterIdHolder.clear(); + PRODUCER_DECORATE.onError(scope, throwable); PRODUCER_DECORATE.beforeFinish(scope); scope.close(); diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java index 1afd4db7cb0..12e7089a562 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/RecordsAdvice.java @@ -6,10 +6,13 @@ import static datadog.trace.bootstrap.instrumentation.api.InstrumentationTags.KAFKA_RECORDS_COUNT; import static datadog.trace.instrumentation.kafka_clients38.KafkaDecorator.KAFKA_POLL; +import datadog.trace.api.Config; import datadog.trace.bootstrap.InstrumentationContext; import datadog.trace.bootstrap.instrumentation.api.AgentScope; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder; import net.bytebuddy.asm.Advice; +import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.internals.ConsumerDelegate; @@ -20,7 +23,19 @@ */ public class RecordsAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) - public static AgentScope onEnter() { + public static AgentScope onEnter(@Advice.This ConsumerDelegate consumer) { + // Set cluster ID in ClusterIdHolder for Schema Registry instrumentation + KafkaConsumerInfo kafkaConsumerInfo = + InstrumentationContext.get(ConsumerDelegate.class, KafkaConsumerInfo.class).get(consumer); + if (kafkaConsumerInfo != null && Config.get().isDataStreamsEnabled()) { + String clusterId = + KafkaConsumerInstrumentationHelper.extractClusterId( + kafkaConsumerInfo, InstrumentationContext.get(Metadata.class, String.class)); + if (clusterId != null) { + ClusterIdHolder.set(clusterId); + } + } + if (traceConfig().isDataStreamsEnabled()) { final AgentSpan span = startSpan(KAFKA_POLL); return activateSpan(span); @@ -45,6 +60,9 @@ public static void captureGroup( } recordsCount = records.count(); } + // Clear cluster ID from Schema Registry instrumentation + ClusterIdHolder.clear(); + if (scope == null) { return; } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index 78d1d8f70de..c0a2c9c2625 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -8,6 +8,7 @@ import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.common.writer.ListWriter import datadog.trace.core.DDSpan import datadog.trace.core.datastreams.StatsGroup +import datadog.trace.instrumentation.kafka_common.ClusterIdHolder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.clients.consumer.KafkaConsumer @@ -217,6 +218,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase { def received = records.poll(10, TimeUnit.SECONDS) received.value() == greeting received.key() == null + + // verify ClusterIdHolder was properly cleaned up after produce and consume + ClusterIdHolder.get() == null int nTraces = isDataStreamsEnabled() ? 3 : 2 int produceTraceIdx = nTraces - 1 TEST_WRITER.waitForTraces(nTraces) diff --git a/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/ClusterIdHolder.java b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/ClusterIdHolder.java new file mode 100644 index 00000000000..fa0b90eebce --- /dev/null +++ b/dd-java-agent/instrumentation/kafka/kafka-common/src/main/java/datadog/trace/instrumentation/kafka_common/ClusterIdHolder.java @@ -0,0 +1,22 @@ +package datadog.trace.instrumentation.kafka_common; + +/** + * Thread-local holder for Kafka cluster ID to be used during schema registry operations. The Kafka + * producer/consumer instrumentation sets this before serialization/deserialization, and the schema + * registry serializer/deserializer instrumentation reads it. + */ +public class ClusterIdHolder { + private static final ThreadLocal CLUSTER_ID = new ThreadLocal<>(); + + public static void set(String clusterId) { + CLUSTER_ID.set(clusterId); + } + + public static String get() { + return CLUSTER_ID.get(); + } + + public static void clear() { + CLUSTER_ID.set(null); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java index c86b9402081..931d3583afe 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultDataStreamsMonitoring.java @@ -17,6 +17,7 @@ import datadog.trace.api.Config; import datadog.trace.api.TraceConfig; import datadog.trace.api.datastreams.*; +import datadog.trace.api.datastreams.SchemaRegistryUsage; import datadog.trace.api.experimental.DataStreamsContextCarrier; import datadog.trace.api.time.TimeSource; import datadog.trace.bootstrap.instrumentation.api.AgentSpan; @@ -217,6 +218,26 @@ public void trackBacklog(DataStreamsTags tags, long value) { inbox.offer(new Backlog(tags, value, timeSource.getCurrentTimeNanos(), getThreadServiceName())); } + @Override + public void reportSchemaRegistryUsage( + String topic, + String clusterId, + int schemaId, + boolean isSuccess, + boolean isKey, + String operation) { + inbox.offer( + new SchemaRegistryUsage( + topic, + clusterId, + schemaId, + isSuccess, + isKey, + operation, + timeSource.getCurrentTimeNanos(), + getThreadServiceName())); + } + @Override public void setCheckpoint(AgentSpan span, DataStreamsContext context) { PathwayContext pathwayContext = span.context().getPathwayContext(); @@ -358,6 +379,11 @@ public void run() { StatsBucket statsBucket = getStatsBucket(backlog.getTimestampNanos(), backlog.getServiceNameOverride()); statsBucket.addBacklog(backlog); + } else if (payload instanceof SchemaRegistryUsage) { + SchemaRegistryUsage usage = (SchemaRegistryUsage) payload; + StatsBucket statsBucket = + getStatsBucket(usage.getTimestampNanos(), usage.getServiceNameOverride()); + statsBucket.addSchemaRegistryUsage(usage); } } } catch (Exception e) { diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java index 0df8e7291d7..277820af4e9 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java @@ -35,6 +35,14 @@ public class MsgPackDatastreamsPayloadWriter implements DatastreamsPayloadWriter private static final byte[] PARENT_HASH = "ParentHash".getBytes(ISO_8859_1); private static final byte[] BACKLOG_VALUE = "Value".getBytes(ISO_8859_1); private static final byte[] BACKLOG_TAGS = "Tags".getBytes(ISO_8859_1); + private static final byte[] SCHEMA_REGISTRY_USAGES = "SchemaRegistryUsages".getBytes(ISO_8859_1); + private static final byte[] TOPIC = "Topic".getBytes(ISO_8859_1); + private static final byte[] KAFKA_CLUSTER_ID = "KafkaClusterId".getBytes(ISO_8859_1); + private static final byte[] SCHEMA_ID = "SchemaId".getBytes(ISO_8859_1); + private static final byte[] COUNT = "Count".getBytes(ISO_8859_1); + private static final byte[] IS_SUCCESS = "IsSuccess".getBytes(ISO_8859_1); + private static final byte[] IS_KEY = "IsKey".getBytes(ISO_8859_1); + private static final byte[] OPERATION = "Operation".getBytes(ISO_8859_1); private static final byte[] PRODUCTS_MASK = "ProductMask".getBytes(ISO_8859_1); private static final byte[] PROCESS_TAGS = "ProcessTags".getBytes(ISO_8859_1); @@ -121,7 +129,15 @@ public void writePayload(Collection data, String serviceNameOverrid for (StatsBucket bucket : data) { boolean hasBacklogs = !bucket.getBacklogs().isEmpty(); - writer.startMap(3 + (hasBacklogs ? 1 : 0)); + boolean hasSchemaRegistryUsages = !bucket.getSchemaRegistryUsages().isEmpty(); + int mapSize = 3; + if (hasBacklogs) { + mapSize++; + } + if (hasSchemaRegistryUsages) { + mapSize++; + } + writer.startMap(mapSize); /* 1 */ writer.writeUTF8(START); @@ -139,6 +155,11 @@ public void writePayload(Collection data, String serviceNameOverrid /* 4 */ writeBacklogs(bucket.getBacklogs(), writer); } + + if (hasSchemaRegistryUsages) { + /* 5 */ + writeSchemaRegistryUsages(bucket.getSchemaRegistryUsages(), writer); + } } /* 8 */ @@ -207,6 +228,40 @@ private void writeBacklogs( } } + private void writeSchemaRegistryUsages( + Collection> usages, Writable packer) { + packer.writeUTF8(SCHEMA_REGISTRY_USAGES); + packer.startArray(usages.size()); + for (Map.Entry entry : usages) { + StatsBucket.SchemaKey key = entry.getKey(); + long count = entry.getValue(); + + packer.startMap( + 7); // 7 fields: Topic, KafkaClusterId, SchemaId, IsSuccess, IsKey, Operation, Count + + packer.writeUTF8(TOPIC); + packer.writeString(key.getTopic() != null ? key.getTopic() : "", null); + + packer.writeUTF8(KAFKA_CLUSTER_ID); + packer.writeString(key.getClusterId() != null ? key.getClusterId() : "", null); + + packer.writeUTF8(SCHEMA_ID); + packer.writeInt(key.getSchemaId()); + + packer.writeUTF8(IS_SUCCESS); + packer.writeBoolean(key.isSuccess()); + + packer.writeUTF8(IS_KEY); + packer.writeBoolean(key.isKey()); + + packer.writeUTF8(OPERATION); + packer.writeString(key.getOperation() != null ? key.getOperation() : "", null); + + packer.writeUTF8(COUNT); + packer.writeLong(count); + } + } + private void writeDataStreamsTags(DataStreamsTags tags, Writable packer) { packer.startArray(tags.nonNullSize()); diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java index c61550d0e3e..1ddbffd94a4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/StatsBucket.java @@ -2,6 +2,7 @@ import datadog.trace.api.datastreams.Backlog; import datadog.trace.api.datastreams.DataStreamsTags; +import datadog.trace.api.datastreams.SchemaRegistryUsage; import datadog.trace.api.datastreams.StatsPoint; import java.util.Collection; import java.util.HashMap; @@ -12,6 +13,7 @@ public class StatsBucket { private final long bucketDurationNanos; private final Map hashToGroup = new HashMap<>(); private final Map backlogs = new HashMap<>(); + private final Map schemaRegistryUsages = new HashMap<>(); public StatsBucket(long startTimeNanos, long bucketDurationNanos) { this.startTimeNanos = startTimeNanos; @@ -40,6 +42,18 @@ public void addBacklog(Backlog backlog) { (k, v) -> (v == null) ? backlog.getValue() : Math.max(v, backlog.getValue())); } + public void addSchemaRegistryUsage(SchemaRegistryUsage usage) { + SchemaKey key = + new SchemaKey( + usage.getTopic(), + usage.getClusterId(), + usage.getSchemaId(), + usage.isSuccess(), + usage.isKey(), + usage.getOperation()); + schemaRegistryUsages.merge(key, 1L, Long::sum); + } + public long getStartTimeNanos() { return startTimeNanos; } @@ -55,4 +69,84 @@ public Collection getGroups() { public Collection> getBacklogs() { return backlogs.entrySet(); } + + public Collection> getSchemaRegistryUsages() { + return schemaRegistryUsages.entrySet(); + } + + /** + * Key for aggregating schema registry usage by topic, cluster, schema ID, success, key/value + * type, and operation. + */ + public static class SchemaKey { + private final String topic; + private final String clusterId; + private final int schemaId; + private final boolean isSuccess; + private final boolean isKey; + private final String operation; + + public SchemaKey( + String topic, + String clusterId, + int schemaId, + boolean isSuccess, + boolean isKey, + String operation) { + this.topic = topic; + this.clusterId = clusterId; + this.schemaId = schemaId; + this.isSuccess = isSuccess; + this.isKey = isKey; + this.operation = operation; + } + + public String getTopic() { + return topic; + } + + public String getClusterId() { + return clusterId; + } + + public int getSchemaId() { + return schemaId; + } + + public boolean isSuccess() { + return isSuccess; + } + + public boolean isKey() { + return isKey; + } + + public String getOperation() { + return operation; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SchemaKey that = (SchemaKey) o; + return schemaId == that.schemaId + && isSuccess == that.isSuccess + && isKey == that.isKey + && java.util.Objects.equals(topic, that.topic) + && java.util.Objects.equals(clusterId, that.clusterId) + && java.util.Objects.equals(operation, that.operation); + } + + @Override + public int hashCode() { + int result = topic != null ? topic.hashCode() : 0; + result = 31 * result + (clusterId != null ? clusterId.hashCode() : 0); + result = 31 * result + schemaId; + result = 31 * result + (isSuccess ? 1 : 0); + result = 31 * result + (isKey ? 1 : 0); + result = 31 * result + (operation != null ? operation.hashCode() : 0); + return result; + } + } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy index 1a2a4c680c8..1119e540ac9 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/core/datastreams/DefaultDataStreamsMonitoringTest.groovy @@ -4,6 +4,7 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery import datadog.trace.api.Config import datadog.trace.api.TraceConfig import datadog.trace.api.datastreams.DataStreamsTags +import datadog.trace.api.datastreams.SchemaRegistryUsage import datadog.trace.api.datastreams.StatsPoint import datadog.trace.api.experimental.DataStreamsContextCarrier import datadog.trace.api.time.ControllableTimeSource @@ -872,6 +873,151 @@ class DefaultDataStreamsMonitoringTest extends DDCoreSpecification { payloadWriter.close() dataStreams.close() } + + def "Schema registry usages are aggregated by operation"() { + given: + def conditions = new PollingConditions(timeout: 2) + def features = Stub(DDAgentFeaturesDiscovery) { + supportsDataStreams() >> true + } + def timeSource = new ControllableTimeSource() + def payloadWriter = new CapturingPayloadWriter() + def sink = Mock(Sink) + def traceConfig = Mock(TraceConfig) { + isDataStreamsEnabled() >> true + } + + when: + def dataStreams = new DefaultDataStreamsMonitoring(sink, features, timeSource, { traceConfig }, payloadWriter, DEFAULT_BUCKET_DURATION_NANOS) + dataStreams.start() + + // Record serialize and deserialize operations + dataStreams.reportSchemaRegistryUsage("test-topic", "test-cluster", 123, true, false, "serialize") + dataStreams.reportSchemaRegistryUsage("test-topic", "test-cluster", 123, true, false, "serialize") // duplicate serialize + dataStreams.reportSchemaRegistryUsage("test-topic", "test-cluster", 123, true, false, "deserialize") + dataStreams.reportSchemaRegistryUsage("test-topic", "test-cluster", 456, true, true, "serialize") // different schema/key + + timeSource.advance(DEFAULT_BUCKET_DURATION_NANOS) + dataStreams.report() + + then: + conditions.eventually { + assert dataStreams.inbox.isEmpty() + assert dataStreams.thread.state != Thread.State.RUNNABLE + assert payloadWriter.buckets.size() == 1 + } + + with(payloadWriter.buckets.get(0)) { + schemaRegistryUsages.size() == 3 // 3 unique combinations + + // Find serialize operation for schema 123 (should have count 2) + def serializeUsage = schemaRegistryUsages.find { e -> + e.key.schemaId == 123 && e.key.operation == "serialize" && !e.key.isKey + } + serializeUsage != null + serializeUsage.value == 2L // Aggregated 2 serialize operations + + // Find deserialize operation for schema 123 (should have count 1) + def deserializeUsage = schemaRegistryUsages.find { e -> + e.key.schemaId == 123 && e.key.operation == "deserialize" && !e.key.isKey + } + deserializeUsage != null + deserializeUsage.value == 1L + + // Find serialize operation for schema 456 with isKey=true (should have count 1) + def keySerializeUsage = schemaRegistryUsages.find { e -> + e.key.schemaId == 456 && e.key.operation == "serialize" && e.key.isKey + } + keySerializeUsage != null + keySerializeUsage.value == 1L + } + + cleanup: + payloadWriter.close() + dataStreams.close() + } + + def "SchemaKey equals and hashCode work correctly"() { + given: + def key1 = new StatsBucket.SchemaKey("topic1", "cluster1", 123, true, false, "serialize") + def key2 = new StatsBucket.SchemaKey("topic1", "cluster1", 123, true, false, "serialize") + def key3 = new StatsBucket.SchemaKey("topic2", "cluster1", 123, true, false, "serialize") // different topic + def key4 = new StatsBucket.SchemaKey("topic1", "cluster2", 123, true, false, "serialize") // different cluster + def key5 = new StatsBucket.SchemaKey("topic1", "cluster1", 456, true, false, "serialize") // different schema + def key6 = new StatsBucket.SchemaKey("topic1", "cluster1", 123, false, false, "serialize") // different success + def key7 = new StatsBucket.SchemaKey("topic1", "cluster1", 123, true, true, "serialize") // different isKey + def key8 = new StatsBucket.SchemaKey("topic1", "cluster1", 123, true, false, "deserialize") // different operation + + expect: + // Reflexive + key1.equals(key1) + key1.hashCode() == key1.hashCode() + + // Symmetric + key1.equals(key2) + key2.equals(key1) + key1.hashCode() == key2.hashCode() + + // Different topic + !key1.equals(key3) + !key3.equals(key1) + + // Different cluster + !key1.equals(key4) + !key4.equals(key1) + + // Different schema ID + !key1.equals(key5) + !key5.equals(key1) + + // Different success + !key1.equals(key6) + !key6.equals(key1) + + // Different isKey + !key1.equals(key7) + !key7.equals(key1) + + // Different operation + !key1.equals(key8) + !key8.equals(key1) + + // Null check + !key1.equals(null) + + // Different class + !key1.equals("not a schema key") + } + + def "StatsBucket aggregates schema registry usages correctly"() { + given: + def bucket = new StatsBucket(1000L, 10000L) + def usage1 = new SchemaRegistryUsage("topic1", "cluster1", 123, true, false, "serialize", 1000L, null) + def usage2 = new SchemaRegistryUsage("topic1", "cluster1", 123, true, false, "serialize", 2000L, null) + def usage3 = new SchemaRegistryUsage("topic1", "cluster1", 123, true, false, "deserialize", 3000L, null) + + when: + bucket.addSchemaRegistryUsage(usage1) + bucket.addSchemaRegistryUsage(usage2) // should increment count for same key + bucket.addSchemaRegistryUsage(usage3) // different operation, new key + + def usages = bucket.getSchemaRegistryUsages() + def usageMap = usages.collectEntries { [(it.key): it.value] } + + then: + usages.size() == 2 + + // Check serialize count + def serializeKey = new StatsBucket.SchemaKey("topic1", "cluster1", 123, true, false, "serialize") + usageMap[serializeKey] == 2L + + // Check deserialize count + def deserializeKey = new StatsBucket.SchemaKey("topic1", "cluster1", 123, true, false, "deserialize") + usageMap[deserializeKey] == 1L + + // Check that different operations create different keys + serializeKey != deserializeKey + } } class CapturingPayloadWriter implements DatastreamsPayloadWriter { diff --git a/internal-api/build.gradle.kts b/internal-api/build.gradle.kts index f05ab8f0b2a..cac64359668 100644 --- a/internal-api/build.gradle.kts +++ b/internal-api/build.gradle.kts @@ -68,6 +68,7 @@ val excludedClassesCoverage by extra( "datadog.trace.api.datastreams.InboxItem", "datadog.trace.api.datastreams.NoopDataStreamsMonitoring", "datadog.trace.api.datastreams.NoopPathwayContext", + "datadog.trace.api.datastreams.SchemaRegistryUsage", "datadog.trace.api.datastreams.StatsPoint", // Debugger "datadog.trace.api.debugger.DebuggerConfigUpdate", diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java index b7c51bd36ec..c11fab3fc28 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/AgentDataStreamsMonitoring.java @@ -8,6 +8,24 @@ public interface AgentDataStreamsMonitoring extends DataStreamsCheckpointer { void trackBacklog(DataStreamsTags tags, long value); + /** + * Tracks Schema Registry usage for Data Streams Monitoring. + * + * @param topic Kafka topic name + * @param clusterId Kafka cluster ID (important: schema IDs are only unique per cluster) + * @param schemaId Schema ID from Schema Registry + * @param isSuccess Whether the schema operation succeeded + * @param isKey Whether this is for the key (true) or value (false) + * @param operation The operation type: "serialize" or "deserialize" + */ + void reportSchemaRegistryUsage( + String topic, + String clusterId, + int schemaId, + boolean isSuccess, + boolean isKey, + String operation); + /** * Sets data streams checkpoint, used for both produce and consume operations. * diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java index f5cdcb0c82f..bd8e19fe6dc 100644 --- a/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/NoopDataStreamsMonitoring.java @@ -11,6 +11,15 @@ public class NoopDataStreamsMonitoring implements AgentDataStreamsMonitoring { @Override public void trackBacklog(DataStreamsTags tags, long value) {} + @Override + public void reportSchemaRegistryUsage( + String topic, + String clusterId, + int schemaId, + boolean isSuccess, + boolean isKey, + String operation) {} + @Override public void setCheckpoint(AgentSpan span, DataStreamsContext context) {} diff --git a/internal-api/src/main/java/datadog/trace/api/datastreams/SchemaRegistryUsage.java b/internal-api/src/main/java/datadog/trace/api/datastreams/SchemaRegistryUsage.java new file mode 100644 index 00000000000..814f8a05f09 --- /dev/null +++ b/internal-api/src/main/java/datadog/trace/api/datastreams/SchemaRegistryUsage.java @@ -0,0 +1,67 @@ +package datadog.trace.api.datastreams; + +/** + * SchemaRegistryUsage tracks usage of Confluent Schema Registry for data streams. This allows + * monitoring schema compatibility checks, registrations, and failures. + */ +public class SchemaRegistryUsage implements InboxItem { + private final String topic; + private final String clusterId; + private final int schemaId; + private final boolean isSuccess; + private final boolean isKey; + private final String operation; + private final long timestampNanos; + private final String serviceNameOverride; + + public SchemaRegistryUsage( + String topic, + String clusterId, + int schemaId, + boolean isSuccess, + boolean isKey, + String operation, + long timestampNanos, + String serviceNameOverride) { + this.topic = topic; + this.clusterId = clusterId; + this.schemaId = schemaId; + this.isSuccess = isSuccess; + this.isKey = isKey; + this.operation = operation; + this.timestampNanos = timestampNanos; + this.serviceNameOverride = serviceNameOverride; + } + + public String getTopic() { + return topic; + } + + public String getClusterId() { + return clusterId; + } + + public int getSchemaId() { + return schemaId; + } + + public boolean isSuccess() { + return isSuccess; + } + + public boolean isKey() { + return isKey; + } + + public String getOperation() { + return operation; + } + + public long getTimestampNanos() { + return timestampNanos; + } + + public String getServiceNameOverride() { + return serviceNameOverride; + } +} diff --git a/settings.gradle.kts b/settings.gradle.kts index c6ef830426e..ada18787303 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -301,6 +301,7 @@ include( ":dd-java-agent:instrumentation:commons-lang:commons-lang-2.1", ":dd-java-agent:instrumentation:commons-lang:commons-lang-3.5", ":dd-java-agent:instrumentation:commons-text-1.0", + ":dd-java-agent:instrumentation:confluent-schema-registry:confluent-schema-registry-7.0", ":dd-java-agent:instrumentation:couchbase:couchbase-2.0", ":dd-java-agent:instrumentation:couchbase:couchbase-2.6", ":dd-java-agent:instrumentation:couchbase:couchbase-3.1",