Skip to content

Commit

Permalink
Merge pull request #451 from DataDog/tyler/netty-client-fixes
Browse files Browse the repository at this point in the history
Allow trace to persist across netty connect.
  • Loading branch information
tylerbenson committed Aug 23, 2018
2 parents 4d91cf1 + 898647e commit 36b49c7
Show file tree
Hide file tree
Showing 25 changed files with 498 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ abstract class Default implements Instrumenter {
private final String instrumentationPrimaryName;
protected final boolean enabled;

protected final String packageName =
getClass().getPackage() == null ? "" : getClass().getPackage().getName();

public Default(final String instrumentationName, final String... additionalNames) {
instrumentationNames = new HashSet<>(Arrays.asList(additionalNames));
instrumentationNames.add(instrumentationName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,18 @@ public final class ExecutorInstrumentation extends Instrumenter.Default {
"akka.dispatch.PinnedDispatcher",
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
"akka.dispatch.ExecutionContexts$sameThreadExecutionContext$",
"play.api.libs.streams.Execution$trampoline$"
"play.api.libs.streams.Execution$trampoline$",
"io.netty.channel.MultithreadEventLoopGroup",
"io.netty.util.concurrent.MultithreadEventExecutorGroup",
"io.netty.util.concurrent.AbstractEventExecutorGroup",
"io.netty.channel.epoll.EpollEventLoopGroup",
"io.netty.channel.nio.NioEventLoopGroup",
"io.netty.util.concurrent.GlobalEventExecutor",
"io.netty.util.concurrent.AbstractScheduledEventExecutor",
"io.netty.util.concurrent.AbstractEventExecutor",
"io.netty.util.concurrent.SingleThreadEventExecutor",
"io.netty.channel.nio.NioEventLoop",
"io.netty.channel.SingleThreadEventLoop",
};
WHITELISTED_EXECUTORS = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(whitelist)));

Expand Down
5 changes: 3 additions & 2 deletions dd-java-agent/instrumentation/netty-4.0/netty-4.0.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ dependencies {
implementation deps.autoservice

testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:java-concurrent')

// testCompile group: 'io.netty', name: 'netty-all', version: '4.0.0.Final'
testCompile group: 'io.netty', name: 'netty-codec-http', version: '4.0.0.Final'
testCompile group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.0'
}

Expand All @@ -50,7 +51,7 @@ configurations.testCompile {

configurations.latestDepTestCompile {
resolutionStrategy {
force group: 'io.netty', name: 'netty-all', version: '4.0.56.Final'
force group: 'io.netty', name: 'netty-codec-http', version: '4.0.56.Final'
force group: 'org.asynchttpclient', name: 'async-http-client', version: '2.0.+'
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package datadog.trace.instrumentation.netty40;

import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty40.server.HttpServerTracingHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;

public class AttributeKeys {
public static final AttributeKey<TraceScope.Continuation>
PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY =
new AttributeKey<>("datadog.trace.instrumentation.netty40.parent.connect.continuation");

public static final AttributeKey<Span> SERVER_ATTRIBUTE_KEY =
new AttributeKey<>(HttpServerTracingHandler.class.getName() + ".span");

public static final AttributeKey<Span> CLIENT_ATTRIBUTE_KEY =
new AttributeKey<>(HttpServerTracingHandler.class.getName() + ".span");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package datadog.trace.instrumentation.netty40;

import static datadog.trace.agent.tooling.ByteBuddyElementMatchers.safeHasSuperType;
import static datadog.trace.agent.tooling.ClassLoaderMatcher.classLoaderHasClasses;
import static io.opentracing.log.Fields.ERROR_OBJECT;
import static net.bytebuddy.matcher.ElementMatchers.isInterface;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.context.TraceScope;
import io.netty.channel.ChannelFuture;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(Instrumenter.class)
public class ChannelFutureListenerInstrumentation extends Instrumenter.Default {

public ChannelFutureListenerInstrumentation() {
super("netty", "netty-4.0");
}

@Override
protected boolean defaultEnabled() {
return false;
}

@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return not(isInterface())
.and(safeHasSuperType(named("io.netty.channel.ChannelFutureListener")));
}

@Override
public ElementMatcher<ClassLoader> classLoaderMatcher() {
return classLoaderHasClasses("io.netty.handler.codec.spdy.SpdyOrHttpChooser");
}

@Override
public String[] helperClassNames() {
return new String[] {packageName + ".AttributeKeys"};
}

@Override
public Map<ElementMatcher, String> transformers() {
final Map<ElementMatcher, String> transformers = new HashMap<>();
transformers.put(
isMethod()
.and(named("operationComplete"))
.and(takesArgument(0, named("io.netty.channel.ChannelFuture"))),
OperationCompleteAdvice.class.getName());
return transformers;
}

public static class OperationCompleteAdvice {
@Advice.OnMethodEnter
public static TraceScope activateScope(@Advice.Argument(0) final ChannelFuture future) {
final TraceScope.Continuation continuation =
future.channel().attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY).get();

if (continuation == null) {
return null;
}
final TraceScope scope = continuation.activate();

final Throwable cause = future.cause();
if (cause != null) {
final Span errorSpan =
GlobalTracer.get()
.buildSpan("netty.connect")
.withTag(Tags.COMPONENT.getKey(), "netty")
.start();
Tags.ERROR.set(errorSpan, true);
errorSpan.log(Collections.singletonMap(ERROR_OBJECT, cause));
errorSpan.finish();
}

return scope;
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void deactivateScope(
@Advice.Enter final TraceScope scope, @Advice.Thrown final Throwable throwable) {
if (scope != null) {
((Scope) scope).close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.not;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty40.client.HttpClientRequestTracingHandler;
import datadog.trace.instrumentation.netty40.client.HttpClientResponseTracingHandler;
import datadog.trace.instrumentation.netty40.client.HttpClientTracingHandler;
Expand All @@ -26,6 +28,8 @@
import io.netty.handler.codec.http.HttpResponseDecoder;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpServerCodec;
import io.opentracing.Scope;
import io.opentracing.util.GlobalTracer;
import java.util.HashMap;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand All @@ -35,11 +39,8 @@
@AutoService(Instrumenter.class)
public class NettyChannelPipelineInstrumentation extends Instrumenter.Default {

private static final String PACKAGE =
NettyChannelPipelineInstrumentation.class.getPackage().getName();

public NettyChannelPipelineInstrumentation() {
super("netty", "netty-4.1");
super("netty", "netty-4.0");
}

@Override
Expand All @@ -60,16 +61,17 @@ public ElementMatcher<ClassLoader> classLoaderMatcher() {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AttributeKeys",
// client helpers
PACKAGE + ".client.NettyResponseInjectAdapter",
PACKAGE + ".client.HttpClientRequestTracingHandler",
PACKAGE + ".client.HttpClientResponseTracingHandler",
PACKAGE + ".client.HttpClientTracingHandler",
packageName + ".client.NettyResponseInjectAdapter",
packageName + ".client.HttpClientRequestTracingHandler",
packageName + ".client.HttpClientResponseTracingHandler",
packageName + ".client.HttpClientTracingHandler",
// server helpers
PACKAGE + ".server.NettyRequestExtractAdapter",
PACKAGE + ".server.HttpServerRequestTracingHandler",
PACKAGE + ".server.HttpServerResponseTracingHandler",
PACKAGE + ".server.HttpServerTracingHandler"
packageName + ".server.NettyRequestExtractAdapter",
packageName + ".server.HttpServerRequestTracingHandler",
packageName + ".server.HttpServerResponseTracingHandler",
packageName + ".server.HttpServerTracingHandler"
};
}

Expand All @@ -81,6 +83,9 @@ public Map<ElementMatcher, String> transformers() {
.and(nameStartsWith("add"))
.and(takesArgument(2, named("io.netty.channel.ChannelHandler"))),
ChannelPipelineAddAdvice.class.getName());
transformers.put(
isMethod().and(named("connect")).and(returns(named("io.netty.channel.ChannelFuture"))),
ChannelPipelineConnectAdvice.class.getName());
return transformers;
}

Expand Down Expand Up @@ -138,4 +143,18 @@ public static void addHandler(
}
}
}

public static class ChannelPipelineConnectAdvice {
@Advice.OnMethodEnter
public static void addParentSpan(@Advice.This final ChannelPipeline pipeline) {
final Scope scope = GlobalTracer.get().scopeManager().active();

if (scope instanceof TraceScope) {
pipeline
.channel()
.attr(AttributeKeys.PARENT_CONNECT_CONTINUATION_ATTRIBUTE_KEY)
.set(((TraceScope) scope).capture());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.instrumentation.netty40.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
Expand Down Expand Up @@ -47,7 +48,7 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann
.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));

ctx.channel().attr(HttpClientTracingHandler.attributeKey).set(span);
ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).set(span);

try {
ctx.write(msg, prm);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,36 +2,53 @@

import static io.opentracing.log.Fields.ERROR_OBJECT;

import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty40.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.util.GlobalTracer;
import java.util.Collections;

public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Span span = ctx.channel().attr(HttpClientTracingHandler.attributeKey).get();
if (span == null || !(msg instanceof HttpResponse)) {
final Span span = ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).get();
if (span == null) {
ctx.fireChannelRead(msg);
return;
}

final HttpResponse response = (HttpResponse) msg;
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final boolean finishSpan = msg instanceof HttpResponse;

try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
if (finishSpan) {
Tags.ERROR.set(span, Boolean.TRUE);
span.log(Collections.singletonMap(ERROR_OBJECT, throwable));
Tags.HTTP_STATUS.set(span, 500);
span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}

Tags.HTTP_STATUS.set(span, response.getStatus().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(false);
}

if (finishSpan) {
Tags.HTTP_STATUS.set(span, ((HttpResponse) msg).getStatus().code());
span.finish(); // Finish the span manually since finishSpanOnClose was false
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
package datadog.trace.instrumentation.netty40.client;

import io.netty.channel.CombinedChannelDuplexHandler;
import io.netty.util.AttributeKey;
import io.opentracing.Span;

public class HttpClientTracingHandler
extends CombinedChannelDuplexHandler<
HttpClientResponseTracingHandler, HttpClientRequestTracingHandler> {

static final AttributeKey<Span> attributeKey =
new AttributeKey<>(HttpClientTracingHandler.class.getName());

public HttpClientTracingHandler() {
super(new HttpClientResponseTracingHandler(), new HttpClientRequestTracingHandler());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import datadog.trace.api.DDSpanTypes;
import datadog.trace.api.DDTags;
import datadog.trace.context.TraceScope;
import datadog.trace.instrumentation.netty40.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpRequest;
Expand Down Expand Up @@ -55,7 +56,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
}

final Span span = scope.span();
ctx.channel().attr(HttpServerTracingHandler.attributeKey).set(span);
ctx.channel().attr(AttributeKeys.SERVER_ATTRIBUTE_KEY).set(span);

try {
ctx.fireChannelRead(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static io.opentracing.log.Fields.ERROR_OBJECT;

import datadog.trace.instrumentation.netty40.AttributeKeys;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
Expand All @@ -14,7 +15,7 @@ public class HttpServerResponseTracingHandler extends ChannelOutboundHandlerAdap

@Override
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise prm) {
final Span span = ctx.channel().attr(HttpServerTracingHandler.attributeKey).get();
final Span span = ctx.channel().attr(AttributeKeys.SERVER_ATTRIBUTE_KEY).get();
if (span == null || !(msg instanceof HttpResponse)) {
ctx.write(msg, prm);
return;
Expand Down

0 comments on commit 36b49c7

Please sign in to comment.