Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,16 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest<AkkaHttpClientDec

def response
try {
response = Http.get(system).singleRequest(request, materializer).toCompletableFuture().get()
response = Http.get(system)
.singleRequest(request, materializer)
//.whenComplete { result, error ->
// FIXME: Callback should be here instead.
// callback?.call()
//}
.toCompletableFuture()
.get()
} finally {
// FIXME: remove this when callback above works.
blockUntilChildSpansFinished(1)
}
callback?.call()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.instrumentation.apachehttpasyncclient.ApacheHttpAsyncClientDecorator
import io.opentracing.util.GlobalTracer
import org.apache.http.HttpResponse
import org.apache.http.concurrent.FutureCallback
import org.apache.http.impl.nio.client.HttpAsyncClients
Expand All @@ -22,8 +21,6 @@ class ApacheHttpAsyncClientCallbackTest extends HttpClientTest<ApacheHttpAsyncCl

@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def hasParent = GlobalTracer.get().activeSpan() != null

def request = new HttpUriRequest(method, uri)
headers.entrySet().each {
request.addHeader(new BasicHeader(it.key, it.value))
Expand All @@ -35,21 +32,13 @@ class ApacheHttpAsyncClientCallbackTest extends HttpClientTest<ApacheHttpAsyncCl

@Override
void completed(HttpResponse result) {
if (hasParent && GlobalTracer.get().activeSpan() == null) {
responseFuture.completeExceptionally(new Exception("Missing span in scope"))
} else {
responseFuture.complete(result.statusLine.statusCode)
}
responseFuture.complete(result.statusLine.statusCode)
callback?.call()
}

@Override
void failed(Exception ex) {
if (hasParent && GlobalTracer.get().activeSpan() == null) {
responseFuture.completeExceptionally(new Exception("Missing span in scope"))
} else {
responseFuture.completeExceptionally(ex)
}
responseFuture.completeExceptionally(ex)
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import javax.ws.rs.client.AsyncInvoker
import javax.ws.rs.client.Client
import javax.ws.rs.client.ClientBuilder
import javax.ws.rs.client.Entity
import javax.ws.rs.client.InvocationCallback
import javax.ws.rs.client.WebTarget
import javax.ws.rs.core.MediaType
import javax.ws.rs.core.Response
Expand All @@ -22,8 +23,16 @@ abstract class JaxRsClientAsyncTest extends HttpClientTest<JaxRsClientDecorator>
AsyncInvoker request = builder.async()

def body = BODY_METHODS.contains(method) ? Entity.text("") : null
Response response = request.method(method, (Entity) body).get()
callback?.call()
Response response = request.method(method, (Entity) body, new InvocationCallback<Response>(){
@Override
void completed(Response s) {
callback?.call()
}

@Override
void failed(Throwable throwable) {
}
}).get()

return response.status
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@ public class AttributeKeys {

public static final AttributeKey<Span> CLIENT_ATTRIBUTE_KEY =
new AttributeKey<>(HttpClientTracingHandler.class.getName() + ".span");

public static final AttributeKey<Span> CLIENT_PARENT_ATTRIBUTE_KEY =
new AttributeKey<>(HttpClientTracingHandler.class.getName() + ".parent");
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import java.net.InetSocketAddress;
Expand All @@ -34,19 +35,19 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann

final HttpRequest request = (HttpRequest) msg;

final Span span = GlobalTracer.get().buildSpan("netty.client.request").start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final Tracer tracer = GlobalTracer.get();
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_ATTRIBUTE_KEY).set(tracer.activeSpan());

final Span span = tracer.buildSpan("netty.client.request").start();
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
DECORATE.afterStart(span);
DECORATE.onRequest(span, request);
DECORATE.onPeerConnection(span, (InetSocketAddress) ctx.channel().remoteAddress());

// AWS calls are often signed, so we can't add headers without breaking the signature.
if (!request.headers().contains("amz-sdk-invocation-id")) {
GlobalTracer.get()
.inject(
span.context(),
Format.Builtin.HTTP_HEADERS,
new NettyResponseInjectAdapter(request));
tracer.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
}

ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).set(span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,38 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.Attribute;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;

public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Attribute<Span> parentAttr =
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_ATTRIBUTE_KEY);
parentAttr.setIfAbsent(NoopSpan.INSTANCE);
final Span parent = parentAttr.get();
final Span span = ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).get();
if (span == null) {
ctx.fireChannelRead(msg);
return;
}

try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final boolean finishSpan = msg instanceof HttpResponse;

if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
if (finishSpan) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
Tags.HTTP_STATUS.set(span, 500);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure, here the reasoning is instead of setting it explicitly I let HttpDecorator.onResponse() to do that!

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, I think we were previously setting the client span's status to error if the callback threw an error, which I don't think is correct.

span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}
final boolean finishSpan = msg instanceof HttpResponse;

if (finishSpan) {
if (span != null && finishSpan) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.onResponse(span, (HttpResponse) msg);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
span.finish();
}
}

// We want the callback in the scope of the parent, not the client span
try (final Scope scope = GlobalTracer.get().scopeManager().activate(parent, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
ctx.fireChannelRead(msg);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.instrumentation.netty40.client.NettyHttpClientDecorator
import io.opentracing.tag.Tags
import org.asynchttpclient.AsyncCompletionHandler
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.asynchttpclient.Response
import spock.lang.Shared

import java.util.concurrent.ExecutionException
Expand All @@ -25,8 +27,13 @@ class Netty40ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
def methodName = "prepare" + method.toLowerCase().capitalize()
def requestBuilder = asyncHttpClient."$methodName"(uri.toString())
headers.each { requestBuilder.setHeader(it.key, it.value) }
def response = requestBuilder.execute().get()
callback?.call()
def response = requestBuilder.execute(new AsyncCompletionHandler() {
@Override
Object onCompleted(Response response) throws Exception {
callback?.call()
return response
}
}).get()
return response.statusCode
}

Expand Down Expand Up @@ -66,7 +73,7 @@ class Netty40ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
and:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent", thrownException)
basicSpan(it, 0, "parent", null, thrownException)

span(1) {
operationName "netty.connect"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,7 @@ public class AttributeKeys {

public static final AttributeKey<Span> CLIENT_ATTRIBUTE_KEY =
AttributeKey.valueOf(HttpClientTracingHandler.class.getName() + ".span");

public static final AttributeKey<Span> CLIENT_PARENT_ATTRIBUTE_KEY =
AttributeKey.valueOf(HttpClientTracingHandler.class.getName() + ".parent");
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.netty.handler.codec.http.HttpRequest;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.Tracer;
import io.opentracing.propagation.Format;
import io.opentracing.util.GlobalTracer;
import java.net.InetSocketAddress;
Expand All @@ -34,19 +35,19 @@ public void write(final ChannelHandlerContext ctx, final Object msg, final Chann

final HttpRequest request = (HttpRequest) msg;

final Span span = GlobalTracer.get().buildSpan("netty.client.request").start();
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final Tracer tracer = GlobalTracer.get();
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_ATTRIBUTE_KEY).set(tracer.activeSpan());

final Span span = tracer.buildSpan("netty.client.request").start();
try (final Scope scope = tracer.scopeManager().activate(span, false)) {
DECORATE.afterStart(span);
DECORATE.onRequest(span, request);
DECORATE.onPeerConnection(span, (InetSocketAddress) ctx.channel().remoteAddress());

// AWS calls are often signed, so we can't add headers without breaking the signature.
if (!request.headers().contains("amz-sdk-invocation-id")) {
GlobalTracer.get()
.inject(
span.context(),
Format.Builtin.HTTP_HEADERS,
new NettyResponseInjectAdapter(request));
tracer.inject(
span.context(), Format.Builtin.HTTP_HEADERS, new NettyResponseInjectAdapter(request));
}

ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).set(span);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,44 +7,38 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.Attribute;
import io.opentracing.Scope;
import io.opentracing.Span;
import io.opentracing.tag.Tags;
import io.opentracing.noop.NoopSpan;
import io.opentracing.util.GlobalTracer;

public class HttpClientResponseTracingHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
final Attribute<Span> parentAttr =
ctx.channel().attr(AttributeKeys.CLIENT_PARENT_ATTRIBUTE_KEY);
parentAttr.setIfAbsent(NoopSpan.INSTANCE);
final Span parent = parentAttr.get();
final Span span = ctx.channel().attr(AttributeKeys.CLIENT_ATTRIBUTE_KEY).get();
if (span == null) {
ctx.fireChannelRead(msg);
return;
}

try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
final boolean finishSpan = msg instanceof HttpResponse;

if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
try {
ctx.fireChannelRead(msg);
} catch (final Throwable throwable) {
if (finishSpan) {
DECORATE.onError(span, throwable);
DECORATE.beforeFinish(span);
Tags.HTTP_STATUS.set(span, 500);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as above

span.finish(); // Finish the span manually since finishSpanOnClose was false
throw throwable;
}
}
final boolean finishSpan = msg instanceof HttpResponse;

if (finishSpan) {
if (span != null && finishSpan) {
try (final Scope scope = GlobalTracer.get().scopeManager().activate(span, false)) {
DECORATE.onResponse(span, (HttpResponse) msg);
DECORATE.beforeFinish(span);
span.finish(); // Finish the span manually since finishSpanOnClose was false
span.finish();
}
}

// We want the callback in the scope of the parent, not the client span
try (final Scope scope = GlobalTracer.get().scopeManager().activate(parent, false)) {
if (scope instanceof TraceScope) {
((TraceScope) scope).setAsyncPropagation(true);
}
ctx.fireChannelRead(msg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import io.netty.channel.ChannelInitializer
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.handler.codec.http.HttpClientCodec
import io.opentracing.tag.Tags
import org.asynchttpclient.AsyncCompletionHandler
import org.asynchttpclient.AsyncHttpClient
import org.asynchttpclient.DefaultAsyncHttpClientConfig
import org.asynchttpclient.Response
import spock.lang.Shared

import java.util.concurrent.ExecutionException
Expand All @@ -33,8 +35,13 @@ class Netty41ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
def methodName = "prepare" + method.toLowerCase().capitalize()
def requestBuilder = asyncHttpClient."$methodName"(uri.toString())
headers.each { requestBuilder.setHeader(it.key, it.value) }
def response = requestBuilder.execute().get()
callback?.call()
def response = requestBuilder.execute(new AsyncCompletionHandler() {
@Override
Object onCompleted(Response response) throws Exception {
callback?.call()
return response
}
}).get()
return response.statusCode
}

Expand Down Expand Up @@ -75,7 +82,7 @@ class Netty41ClientTest extends HttpClientTest<NettyHttpClientDecorator> {
and:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent", thrownException)
basicSpan(it, 0, "parent", null, thrownException)

span(1) {
operationName "netty.connect"
Expand Down
Loading