Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avro instrumentation for schema tracking #7236

Merged
merged 18 commits into from
Jun 27, 2024
Merged
26 changes: 26 additions & 0 deletions dd-java-agent/instrumentation/avro/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
muzzle {
pass {
group = 'org.apache.avro'
module = 'avro'
versions = "[1.11.3,)"
extraDependency 'org.apache.avro:avro-mapred:1.11.3'
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest','test')

dependencies {
compileOnly group: 'org.apache.avro', name: 'avro', version: '1.11.3'
testImplementation group: 'org.apache.avro', name: 'avro', version: '1.11.3'
latestDepTestImplementation group: 'org.apache.avro', name: 'avro', version: '1.+'

compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.1'
testImplementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.1'
latestDepTestImplementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.+'

compileOnly group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3'
testImplementation group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3'
nayeem-kamal marked this conversation as resolved.
Show resolved Hide resolved
latestDepTestImplementation group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package datadog.trace.instrumentation.avro;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.avro.generic.GenericDatumReader;

@AutoService(InstrumenterModule.class)
public final class GenericDatumReaderInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy {

public GenericDatumReaderInstrumentation() {
super("avro");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SchemaExtractor", packageName + ".SchemaExtractor$1",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("read")),
GenericDatumReaderInstrumentation.class.getName() + "$GenericDatumReaderAdvice");
}

@Override
public String hierarchyMarkerType() {
return "org.apache.avro.generic.GenericDatumReader";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return extendsClass(named(hierarchyMarkerType()));
}

public static class GenericDatumReaderAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(@Advice.This GenericDatumReader reader) {
nayeem-kamal marked this conversation as resolved.
Show resolved Hide resolved
if (!Config.get().isDataStreamsEnabled()) {
return;
}
AgentSpan span = activeSpan();
if (span == null) {
return;
}
if (reader != null) {
SchemaExtractor.attachSchemaOnSpan(
reader.getSchema(), span, SchemaExtractor.DESERIALIZATION);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package datadog.trace.instrumentation.avro;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.avro.Schema;

@AutoService(InstrumenterModule.class)
public final class GenericDatumWriterInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy {

public GenericDatumWriterInstrumentation() {
super("avro");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SchemaExtractor", packageName + ".SchemaExtractor$1",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("write")).and(takesArguments(2)),
GenericDatumWriterInstrumentation.class.getName() + "$GenericDatumWriterAdvice");
}

@Override
public String hierarchyMarkerType() {
return "org.apache.avro.generic.GenericDatumWriter";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return extendsClass(named(hierarchyMarkerType()));
}

public static class GenericDatumWriterAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(@Advice.FieldValue("root") Schema root) {
nayeem-kamal marked this conversation as resolved.
Show resolved Hide resolved
if (!Config.get().isDataStreamsEnabled()) {
return;
}
AgentSpan span = activeSpan();
if (span == null) {
return;
}

if (root != null) {
SchemaExtractor.attachSchemaOnSpan(root, span, SchemaExtractor.SERIALIZATION);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package datadog.trace.instrumentation.avro;

import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Schema;
import datadog.trace.bootstrap.instrumentation.api.SchemaBuilder;
import datadog.trace.bootstrap.instrumentation.api.SchemaIterator;
import java.util.List;
import org.apache.avro.Schema.Field;

public class SchemaExtractor implements SchemaIterator {
public static final String SERIALIZATION = "serialization";
public static final String DESERIALIZATION = "deserialization";
private static final String AVRO = "avro";

private final org.apache.avro.Schema schema;

public SchemaExtractor(org.apache.avro.Schema schema) {
this.schema = schema;
}

public static boolean extractProperty(
Field field, String schemaName, String fieldName, SchemaBuilder builder, int depth) {
boolean array = false;
String type = null;
String format = null;
String description = null;
String ref = null;
List<String> enumValues = null;

switch (field.schema().getType()) {
case RECORD:
type = "object";
break;
case ENUM:
type = "string";
enumValues = field.schema().getEnumSymbols();
break;
case ARRAY:
array = true;
type = getType(field.schema().getElementType().getType().getName());
break;
case MAP:
type = "object";
description = "Map type";
break;
case STRING:
type = "string";
break;
case BYTES:
type = "string";
format = "byte";
break;
case INT:
type = "integer";
format = "int32";
break;
case LONG:
type = "integer";
format = "int64";
break;
case FLOAT:
type = "number";
format = "float";
break;
case DOUBLE:
type = "number";
format = "double";
break;
case BOOLEAN:
type = "boolean";
break;
case NULL:
type = "null";
break;
case FIXED:
type = "string";
break;
default:
type = "string";
description = "Unknown type";
break;
}

return builder.addProperty(
schemaName, fieldName, array, type, description, ref, format, enumValues);
}

public static boolean extractSchema(
org.apache.avro.Schema schema, SchemaBuilder builder, int depth) {
depth++;
String schemaName = schema.getFullName();
if (!builder.shouldExtractSchema(schemaName, depth)) {
return false;
}
try {
for (Field field : schema.getFields()) {
if (!extractProperty(field, schemaName, field.name(), builder, depth)) {
return false;
}
}
} catch (Exception e) {
return false;
}
return true;
}

public static Schema extractSchemas(org.apache.avro.Schema schema) {
return AgentTracer.get()
.getDataStreamsMonitoring()
.getSchema(schema.getFullName(), new SchemaExtractor(schema));
}

@Override
public void iterateOverSchema(SchemaBuilder builder) {
extractSchema(schema, builder, 0);
}

public static void attachSchemaOnSpan(
org.apache.avro.Schema schema, AgentSpan span, String operation) {
if (schema == null || span == null) {
return;
}
AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring();
span.setTag(DDTags.SCHEMA_TYPE, AVRO);
span.setTag(DDTags.SCHEMA_NAME, schema.getFullName());
span.setTag(DDTags.SCHEMA_OPERATION, operation);

if (!dsm.canSampleSchema(operation)) {
return;
}

Integer prio = span.forceSamplingDecision();
if (prio == null || prio <= 0) {
return;
}

int weight = dsm.trySampleSchema(operation);
if (weight == 0) {
return;
}

Schema schemaData = SchemaExtractor.extractSchemas(schema);
span.setTag(DDTags.SCHEMA_DEFINITION, schemaData.definition);
span.setTag(DDTags.SCHEMA_WEIGHT, weight);
span.setTag(DDTags.SCHEMA_ID, schemaData.id);
}

private static String getType(String type) {
switch (type) {
case "string":
return "string";
case "int":
return "integer";
case "long":
return "integer";
case "float":
return "number";
case "double":
return "number";
case "boolean":
return "boolean";
case "null":
return "null";
default:
return "string";
}
}
}
Loading
Loading