diff --git a/dd-java-agent/instrumentation/avro/build.gradle b/dd-java-agent/instrumentation/avro/build.gradle new file mode 100644 index 00000000000..b6039d0c2b6 --- /dev/null +++ b/dd-java-agent/instrumentation/avro/build.gradle @@ -0,0 +1,26 @@ +muzzle { + pass { + group = 'org.apache.avro' + module = 'avro' + versions = "[1.11.3,)" + extraDependency 'org.apache.avro:avro-mapred:1.11.3' + } +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest','test') + +dependencies { + compileOnly group: 'org.apache.avro', name: 'avro', version: '1.11.3' + testImplementation group: 'org.apache.avro', name: 'avro', version: '1.11.3' + latestDepTestImplementation group: 'org.apache.avro', name: 'avro', version: '1.+' + + compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.1' + testImplementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.1' + latestDepTestImplementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.+' + + compileOnly group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3' + testImplementation group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3' + latestDepTestImplementation group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3' +} diff --git a/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/GenericDatumReaderInstrumentation.java b/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/GenericDatumReaderInstrumentation.java new file mode 100644 index 00000000000..f55459bee24 --- /dev/null +++ b/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/GenericDatumReaderInstrumentation.java @@ -0,0 +1,66 @@ +package datadog.trace.instrumentation.avro; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.avro.generic.GenericDatumReader; + +@AutoService(InstrumenterModule.class) +public final class GenericDatumReaderInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy { + + public GenericDatumReaderInstrumentation() { + super("avro"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".SchemaExtractor", packageName + ".SchemaExtractor$1", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("read")), + GenericDatumReaderInstrumentation.class.getName() + "$GenericDatumReaderAdvice"); + } + + @Override + public String hierarchyMarkerType() { + return "org.apache.avro.generic.GenericDatumReader"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + public static class GenericDatumReaderAdvice { + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan(@Advice.This GenericDatumReader reader) { + if (!Config.get().isDataStreamsEnabled()) { + return; + } + AgentSpan span = activeSpan(); + if (span == null) { + return; + } + if (reader != null) { + SchemaExtractor.attachSchemaOnSpan( + reader.getSchema(), span, SchemaExtractor.DESERIALIZATION); + } + } + } +} diff --git a/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/GenericDatumWriterInstrumentation.java b/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/GenericDatumWriterInstrumentation.java new file mode 100644 index 00000000000..b97967a79e6 --- /dev/null +++ b/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/GenericDatumWriterInstrumentation.java @@ -0,0 +1,67 @@ +package datadog.trace.instrumentation.avro; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass; +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.api.Config; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.avro.Schema; + +@AutoService(InstrumenterModule.class) +public final class GenericDatumWriterInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForTypeHierarchy { + + public GenericDatumWriterInstrumentation() { + super("avro"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".SchemaExtractor", packageName + ".SchemaExtractor$1", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("write")).and(takesArguments(2)), + GenericDatumWriterInstrumentation.class.getName() + "$GenericDatumWriterAdvice"); + } + + @Override + public String hierarchyMarkerType() { + return "org.apache.avro.generic.GenericDatumWriter"; + } + + @Override + public ElementMatcher hierarchyMatcher() { + return extendsClass(named(hierarchyMarkerType())); + } + + public static class GenericDatumWriterAdvice { + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void stopSpan(@Advice.FieldValue("root") Schema root) { + if (!Config.get().isDataStreamsEnabled()) { + return; + } + AgentSpan span = activeSpan(); + if (span == null) { + return; + } + + if (root != null) { + SchemaExtractor.attachSchemaOnSpan(root, span, SchemaExtractor.SERIALIZATION); + } + } + } +} diff --git a/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/SchemaExtractor.java b/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/SchemaExtractor.java new file mode 100644 index 00000000000..115e9296fb7 --- /dev/null +++ b/dd-java-agent/instrumentation/avro/src/main/java/datadog/trace/instrumentation/avro/SchemaExtractor.java @@ -0,0 +1,171 @@ +package datadog.trace.instrumentation.avro; + +import datadog.trace.api.DDTags; +import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import datadog.trace.bootstrap.instrumentation.api.Schema; +import datadog.trace.bootstrap.instrumentation.api.SchemaBuilder; +import datadog.trace.bootstrap.instrumentation.api.SchemaIterator; +import java.util.List; +import org.apache.avro.Schema.Field; + +public class SchemaExtractor implements SchemaIterator { + public static final String SERIALIZATION = "serialization"; + public static final String DESERIALIZATION = "deserialization"; + private static final String AVRO = "avro"; + + private final org.apache.avro.Schema schema; + + public SchemaExtractor(org.apache.avro.Schema schema) { + this.schema = schema; + } + + public static boolean extractProperty( + Field field, String schemaName, String fieldName, SchemaBuilder builder, int depth) { + boolean array = false; + String type = null; + String format = null; + String description = null; + String ref = null; + List enumValues = null; + + switch (field.schema().getType()) { + case RECORD: + type = "object"; + break; + case ENUM: + type = "string"; + enumValues = field.schema().getEnumSymbols(); + break; + case ARRAY: + array = true; + type = getType(field.schema().getElementType().getType().getName()); + break; + case MAP: + type = "object"; + description = "Map type"; + break; + case STRING: + type = "string"; + break; + case BYTES: + type = "string"; + format = "byte"; + break; + case INT: + type = "integer"; + format = "int32"; + break; + case LONG: + type = "integer"; + format = "int64"; + break; + case FLOAT: + type = "number"; + format = "float"; + break; + case DOUBLE: + type = "number"; + format = "double"; + break; + case BOOLEAN: + type = "boolean"; + break; + case NULL: + type = "null"; + break; + case FIXED: + type = "string"; + break; + default: + type = "string"; + description = "Unknown type"; + break; + } + + return builder.addProperty( + schemaName, fieldName, array, type, description, ref, format, enumValues); + } + + public static boolean extractSchema( + org.apache.avro.Schema schema, SchemaBuilder builder, int depth) { + depth++; + String schemaName = schema.getFullName(); + if (!builder.shouldExtractSchema(schemaName, depth)) { + return false; + } + try { + for (Field field : schema.getFields()) { + if (!extractProperty(field, schemaName, field.name(), builder, depth)) { + return false; + } + } + } catch (Exception e) { + return false; + } + return true; + } + + public static Schema extractSchemas(org.apache.avro.Schema schema) { + return AgentTracer.get() + .getDataStreamsMonitoring() + .getSchema(schema.getFullName(), new SchemaExtractor(schema)); + } + + @Override + public void iterateOverSchema(SchemaBuilder builder) { + extractSchema(schema, builder, 0); + } + + public static void attachSchemaOnSpan( + org.apache.avro.Schema schema, AgentSpan span, String operation) { + if (schema == null || span == null) { + return; + } + AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring(); + span.setTag(DDTags.SCHEMA_TYPE, AVRO); + span.setTag(DDTags.SCHEMA_NAME, schema.getFullName()); + span.setTag(DDTags.SCHEMA_OPERATION, operation); + + if (!dsm.canSampleSchema(operation)) { + return; + } + + Integer prio = span.forceSamplingDecision(); + if (prio == null || prio <= 0) { + return; + } + + int weight = dsm.trySampleSchema(operation); + if (weight == 0) { + return; + } + + Schema schemaData = SchemaExtractor.extractSchemas(schema); + span.setTag(DDTags.SCHEMA_DEFINITION, schemaData.definition); + span.setTag(DDTags.SCHEMA_WEIGHT, weight); + span.setTag(DDTags.SCHEMA_ID, schemaData.id); + } + + private static String getType(String type) { + switch (type) { + case "string": + return "string"; + case "int": + return "integer"; + case "long": + return "integer"; + case "float": + return "number"; + case "double": + return "number"; + case "boolean": + return "boolean"; + case "null": + return "null"; + default: + return "string"; + } + } +} diff --git a/dd-java-agent/instrumentation/avro/src/test/groovy/AvroDatumReaderTest.groovy b/dd-java-agent/instrumentation/avro/src/test/groovy/AvroDatumReaderTest.groovy new file mode 100644 index 00000000000..e777ab6a1a8 --- /dev/null +++ b/dd-java-agent/instrumentation/avro/src/test/groovy/AvroDatumReaderTest.groovy @@ -0,0 +1,194 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.api.DDTags +import org.apache.avro.Schema +import org.apache.avro.generic.GenericData +import org.apache.avro.generic.GenericDatumReader +import org.apache.avro.generic.GenericRecord +import org.apache.avro.io.BinaryDecoder +import org.apache.avro.io.Encoder +import org.apache.avro.io.EncoderFactory +import org.apache.avro.io.DecoderFactory + +import org.apache.avro.specific.SpecificDatumWriter + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace +import java.nio.ByteBuffer +class AvroDatumReaderTest extends AgentTestRunner { + + + + @Override + + protected boolean isDataStreamsEnabled() { + + return true + } + String schemaID = "5493435211744749109" + String openApiSchemaDef = "{\"components\":{\"schemas\":{\"TestRecord\":{\"properties\":{\"stringField\":{\"type\":\"string\"},\"intField\":{\"format\":\"int32\",\"type\":\"integer\"},\"longField\":{\"format\":\"int64\",\"type\":\"integer\"},\"floatField\":{\"format\":\"float\",\"type\":\"number\"},\"doubleField\":{\"format\":\"double\",\"type\":\"number\"},\"booleanField\":{\"type\":\"boolean\"},\"bytesField\":{\"format\":\"byte\",\"type\":\"string\"},\"nullField\":{\"type\":\"null\"},\"enumField\":{\"enum\":[\"A\",\"B\",\"C\"],\"type\":\"string\"},\"fixedField\":{\"type\":\"string\"},\"recordField\":{\"type\":\"object\"},\"arrayField\":{\"items\":{\"type\":\"integer\"},\"type\":\"array\"},\"mapField\":{\"description\":\"Map type\",\"type\":\"object\"}},\"type\":\"object\"}}},\"openapi\":\"3.0.0\"}" + String schemaStr = ''' + { + "type": "record", + "name": "TestRecord", + "fields": [ + {"name": "stringField", "type": "string"}, + {"name": "intField", "type": "int"}, + {"name": "longField", "type": "long"}, + {"name": "floatField", "type": "float"}, + {"name": "doubleField", "type": "double"}, + {"name": "booleanField", "type": "boolean"}, + {"name": "bytesField", "type": "bytes"}, + {"name": "nullField", "type": "null"}, + {"name": "enumField", "type": {"type": "enum", "name": "TestEnum", "symbols": ["A", "B", "C"]}}, + {"name": "fixedField", "type": {"type": "fixed", "name": "TestFixed", "size": 16}}, + {"name": "recordField", "type": {"type": "record", "name": "NestedRecord", "fields": [{"name": "nestedString", "type": "string"}]}}, + {"name": "arrayField", "type": {"type": "array", "items": "int"}}, + {"name": "mapField", "type": {"type": "map", "values": "string"}} + ] + } + ''' + Schema schemaDef = new Schema.Parser().parse(schemaStr) + + + + void 'test extract avro schema on deserialize'() { + + setup: + // Creating the datum record + GenericRecord datum = new GenericData.Record(schemaDef) + datum.put("stringField", "testString") + datum.put("intField", 123) + datum.put("longField", 456L) + datum.put("floatField", 7.89f) + datum.put("doubleField", 1.23e2) + datum.put("booleanField", true) + datum.put("bytesField", ByteBuffer.wrap(new byte[]{0x01, 0x02, 0x03})) + datum.put("nullField", null) + datum.put("enumField", new GenericData.EnumSymbol(schemaDef.getField("enumField").schema(), "A")) + datum.put("fixedField", new GenericData.Fixed(schemaDef.getField("fixedField").schema(), new byte[16])) + + // Nested record field + GenericRecord nestedRecord = new GenericData.Record(schemaDef.getField("recordField").schema()) + nestedRecord.put("nestedString", "nestedTestString") + datum.put("recordField", nestedRecord) + + // Array field + datum.put("arrayField", Arrays.asList(1, 2, 3)) + + // Map field + Map map = new HashMap<>() + map.put("key1", "value1") + map.put("key2", "value2") + datum.put("mapField", map) + + when: + def bytes + ByteArrayOutputStream out = new ByteArrayOutputStream() + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null) + SpecificDatumWriter datumWriter = new SpecificDatumWriter<>(schemaDef) + datumWriter.write(datum, encoder) + encoder.flush() + bytes = out.toByteArray() + + GenericRecord result = null + runUnderTrace("parent_deserialize") { + ByteArrayInputStream inputStream = new ByteArrayInputStream(bytes) + BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(inputStream, null) + GenericDatumReader datumReader = new GenericDatumReader<>(schemaDef) + + result = datumReader.read(null, decoder) + } + + TEST_WRITER.waitForTraces(1) + + then: + + assertTraces(1, SORT_TRACES_BY_ID) { + trace(1) { + span { + hasServiceName() + operationName "parent_deserialize" + resourceName "parent_deserialize" + errored false + measured false + tags { + "$DDTags.SCHEMA_DEFINITION" openApiSchemaDef + "$DDTags.SCHEMA_WEIGHT" 1 + "$DDTags.SCHEMA_TYPE" "avro" + "$DDTags.SCHEMA_NAME" "TestRecord" + "$DDTags.SCHEMA_OPERATION" "deserialization" + "$DDTags.SCHEMA_ID" schemaID + defaultTags(false) + } + } + } + } + } + void 'test extract avro schema on serialize'() { + + setup: + // Creating the datum record + GenericRecord datum = new GenericData.Record(schemaDef) + datum.put("stringField", "testString") + datum.put("intField", 123) + datum.put("longField", 456L) + datum.put("floatField", 7.89f) + datum.put("doubleField", 1.23e2) + datum.put("booleanField", true) + datum.put("bytesField", ByteBuffer.wrap(new byte[]{0x01, 0x02, 0x03})) + datum.put("nullField", null) + datum.put("enumField", new GenericData.EnumSymbol(schemaDef.getField("enumField").schema(), "A")) + datum.put("fixedField", new GenericData.Fixed(schemaDef.getField("fixedField").schema(), new byte[16])) + + // Nested record field + GenericRecord nestedRecord = new GenericData.Record(schemaDef.getField("recordField").schema()) + nestedRecord.put("nestedString", "nestedTestString") + datum.put("recordField", nestedRecord) + + // Array field + datum.put("arrayField", Arrays.asList(1, 2, 3)) + + // Map field + Map map = new HashMap<>() + map.put("key1", "value1") + map.put("key2", "value2") + datum.put("mapField", map) + + when: + def bytes + ByteArrayOutputStream out = new ByteArrayOutputStream() + Encoder encoder = EncoderFactory.get().binaryEncoder(out, null) + SpecificDatumWriter datumWriter = new SpecificDatumWriter<>(schemaDef) + + + runUnderTrace("parent_serialize") { + datumWriter.write(datum, encoder) + encoder.flush() + bytes = out.toByteArray() + } + + TEST_WRITER.waitForTraces(1) + + then: + + assertTraces(1, SORT_TRACES_BY_ID) { + trace(1) { + span { + hasServiceName() + operationName "parent_serialize" + resourceName "parent_serialize" + errored false + measured false + tags { + "$DDTags.SCHEMA_DEFINITION" openApiSchemaDef + "$DDTags.SCHEMA_WEIGHT" 1 + "$DDTags.SCHEMA_TYPE" "avro" + "$DDTags.SCHEMA_NAME" "TestRecord" + "$DDTags.SCHEMA_OPERATION" "serialization" + "$DDTags.SCHEMA_ID" schemaID + defaultTags(false) + } + } + } + } + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index 0d21b007e57..477755bffa1 100644 --- a/settings.gradle +++ b/settings.gradle @@ -171,6 +171,7 @@ include ':dd-java-agent:instrumentation:apache-httpclient-5' include ':dd-java-agent:instrumentation:apache-httpcore-4' include ':dd-java-agent:instrumentation:armeria-grpc' include ':dd-java-agent:instrumentation:armeria-jetty' +include ':dd-java-agent:instrumentation:avro' include ':dd-java-agent:instrumentation:aws-java-sdk-1.11.0' include ':dd-java-agent:instrumentation:aws-java-sdk-2.2' include ':dd-java-agent:instrumentation:aws-java-sns-1.0'