Skip to content

Commit

Permalink
Avro instrumentation for schema tracking(#7236)
Browse files Browse the repository at this point in the history
  • Loading branch information
nayeem-kamal committed Jun 27, 2024
1 parent 2f63f03 commit 001c89e
Show file tree
Hide file tree
Showing 6 changed files with 525 additions and 0 deletions.
26 changes: 26 additions & 0 deletions dd-java-agent/instrumentation/avro/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
muzzle {
pass {
group = 'org.apache.avro'
module = 'avro'
versions = "[1.11.3,)"
extraDependency 'org.apache.avro:avro-mapred:1.11.3'
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest','test')

dependencies {
compileOnly group: 'org.apache.avro', name: 'avro', version: '1.11.3'
testImplementation group: 'org.apache.avro', name: 'avro', version: '1.11.3'
latestDepTestImplementation group: 'org.apache.avro', name: 'avro', version: '1.+'

compileOnly group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.1'
testImplementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.3.1'
latestDepTestImplementation group: 'org.apache.hadoop', name: 'hadoop-common', version: '3.+'

compileOnly group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3'
testImplementation group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3'
latestDepTestImplementation group: 'org.apache.avro', name: 'avro-mapred', version: '1.11.3'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package datadog.trace.instrumentation.avro;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.avro.generic.GenericDatumReader;

@AutoService(InstrumenterModule.class)
public final class GenericDatumReaderInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy {

public GenericDatumReaderInstrumentation() {
super("avro");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SchemaExtractor", packageName + ".SchemaExtractor$1",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("read")),
GenericDatumReaderInstrumentation.class.getName() + "$GenericDatumReaderAdvice");
}

@Override
public String hierarchyMarkerType() {
return "org.apache.avro.generic.GenericDatumReader";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return extendsClass(named(hierarchyMarkerType()));
}

public static class GenericDatumReaderAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(@Advice.This GenericDatumReader reader) {
if (!Config.get().isDataStreamsEnabled()) {
return;
}
AgentSpan span = activeSpan();
if (span == null) {
return;
}
if (reader != null) {
SchemaExtractor.attachSchemaOnSpan(
reader.getSchema(), span, SchemaExtractor.DESERIALIZATION);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package datadog.trace.instrumentation.avro;

import static datadog.trace.agent.tooling.bytebuddy.matcher.HierarchyMatchers.extendsClass;
import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.avro.Schema;

@AutoService(InstrumenterModule.class)
public final class GenericDatumWriterInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForTypeHierarchy {

public GenericDatumWriterInstrumentation() {
super("avro");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SchemaExtractor", packageName + ".SchemaExtractor$1",
};
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("write")).and(takesArguments(2)),
GenericDatumWriterInstrumentation.class.getName() + "$GenericDatumWriterAdvice");
}

@Override
public String hierarchyMarkerType() {
return "org.apache.avro.generic.GenericDatumWriter";
}

@Override
public ElementMatcher<TypeDescription> hierarchyMatcher() {
return extendsClass(named(hierarchyMarkerType()));
}

public static class GenericDatumWriterAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(@Advice.FieldValue("root") Schema root) {
if (!Config.get().isDataStreamsEnabled()) {
return;
}
AgentSpan span = activeSpan();
if (span == null) {
return;
}

if (root != null) {
SchemaExtractor.attachSchemaOnSpan(root, span, SchemaExtractor.SERIALIZATION);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
package datadog.trace.instrumentation.avro;

import datadog.trace.api.DDTags;
import datadog.trace.bootstrap.instrumentation.api.AgentDataStreamsMonitoring;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Schema;
import datadog.trace.bootstrap.instrumentation.api.SchemaBuilder;
import datadog.trace.bootstrap.instrumentation.api.SchemaIterator;
import java.util.List;
import org.apache.avro.Schema.Field;

public class SchemaExtractor implements SchemaIterator {
public static final String SERIALIZATION = "serialization";
public static final String DESERIALIZATION = "deserialization";
private static final String AVRO = "avro";

private final org.apache.avro.Schema schema;

public SchemaExtractor(org.apache.avro.Schema schema) {
this.schema = schema;
}

public static boolean extractProperty(
Field field, String schemaName, String fieldName, SchemaBuilder builder, int depth) {
boolean array = false;
String type = null;
String format = null;
String description = null;
String ref = null;
List<String> enumValues = null;

switch (field.schema().getType()) {
case RECORD:
type = "object";
break;
case ENUM:
type = "string";
enumValues = field.schema().getEnumSymbols();
break;
case ARRAY:
array = true;
type = getType(field.schema().getElementType().getType().getName());
break;
case MAP:
type = "object";
description = "Map type";
break;
case STRING:
type = "string";
break;
case BYTES:
type = "string";
format = "byte";
break;
case INT:
type = "integer";
format = "int32";
break;
case LONG:
type = "integer";
format = "int64";
break;
case FLOAT:
type = "number";
format = "float";
break;
case DOUBLE:
type = "number";
format = "double";
break;
case BOOLEAN:
type = "boolean";
break;
case NULL:
type = "null";
break;
case FIXED:
type = "string";
break;
default:
type = "string";
description = "Unknown type";
break;
}

return builder.addProperty(
schemaName, fieldName, array, type, description, ref, format, enumValues);
}

public static boolean extractSchema(
org.apache.avro.Schema schema, SchemaBuilder builder, int depth) {
depth++;
String schemaName = schema.getFullName();
if (!builder.shouldExtractSchema(schemaName, depth)) {
return false;
}
try {
for (Field field : schema.getFields()) {
if (!extractProperty(field, schemaName, field.name(), builder, depth)) {
return false;
}
}
} catch (Exception e) {
return false;
}
return true;
}

public static Schema extractSchemas(org.apache.avro.Schema schema) {
return AgentTracer.get()
.getDataStreamsMonitoring()
.getSchema(schema.getFullName(), new SchemaExtractor(schema));
}

@Override
public void iterateOverSchema(SchemaBuilder builder) {
extractSchema(schema, builder, 0);
}

public static void attachSchemaOnSpan(
org.apache.avro.Schema schema, AgentSpan span, String operation) {
if (schema == null || span == null) {
return;
}
AgentDataStreamsMonitoring dsm = AgentTracer.get().getDataStreamsMonitoring();
span.setTag(DDTags.SCHEMA_TYPE, AVRO);
span.setTag(DDTags.SCHEMA_NAME, schema.getFullName());
span.setTag(DDTags.SCHEMA_OPERATION, operation);

if (!dsm.canSampleSchema(operation)) {
return;
}

Integer prio = span.forceSamplingDecision();
if (prio == null || prio <= 0) {
return;
}

int weight = dsm.trySampleSchema(operation);
if (weight == 0) {
return;
}

Schema schemaData = SchemaExtractor.extractSchemas(schema);
span.setTag(DDTags.SCHEMA_DEFINITION, schemaData.definition);
span.setTag(DDTags.SCHEMA_WEIGHT, weight);
span.setTag(DDTags.SCHEMA_ID, schemaData.id);
}

private static String getType(String type) {
switch (type) {
case "string":
return "string";
case "int":
return "integer";
case "long":
return "integer";
case "float":
return "number";
case "double":
return "number";
case "boolean":
return "boolean";
case "null":
return "null";
default:
return "string";
}
}
}
Loading

0 comments on commit 001c89e

Please sign in to comment.