Skip to content

Commit

Permalink
Clean up and expand akka-http client instrumentation and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bantonsson committed Nov 9, 2020
1 parent 0bd8ca0 commit 7eac56a
Show file tree
Hide file tree
Showing 4 changed files with 126 additions and 65 deletions.
Expand Up @@ -5,17 +5,15 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.AKKA_CLIENT_REQUEST;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE;
import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientHelpers.*;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import akka.http.javadsl.model.headers.RawHeader;
import akka.http.scaladsl.HttpExt;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.CallDepthThreadLocalMap;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.HashMap;
Expand All @@ -26,13 +24,11 @@
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import scala.concurrent.Future;
import scala.runtime.AbstractFunction1;
import scala.util.Try;

@Slf4j
@AutoService(Instrumenter.class)
public final class AkkaHttpClientInstrumentation extends Instrumenter.Default {
public AkkaHttpClientInstrumentation() {
public final class AkkaHttpSingleRequestInstrumentation extends Instrumenter.Default {
public AkkaHttpSingleRequestInstrumentation() {
super("akka-http", "akka-http-client");
}

Expand All @@ -44,8 +40,9 @@ public ElementMatcher<TypeDescription> typeMatcher() {
@Override
public String[] helperClassNames() {
return new String[] {
AkkaHttpClientInstrumentation.class.getName() + "$OnCompleteHandler",
AkkaHttpClientInstrumentation.class.getName() + "$AkkaHttpHeaders",
packageName + ".AkkaHttpClientHelpers$OnCompleteHandler",
packageName + ".AkkaHttpClientHelpers$AkkaHttpHeaders",
packageName + ".AkkaHttpClientHelpers$HasSpanHeader",
packageName + ".AkkaHttpClientDecorator",
};
}
Expand All @@ -56,12 +53,12 @@ public Map<? extends ElementMatcher<? super MethodDescription>, String> transfor
// This is mainly for compatibility with 10.0
transformers.put(
named("singleRequest").and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))),
AkkaHttpClientInstrumentation.class.getName() + "$SingleRequestAdvice");
AkkaHttpSingleRequestInstrumentation.class.getName() + "$SingleRequestAdvice");
// This is for 10.1+
transformers.put(
named("singleRequestImpl")
.and(takesArgument(0, named("akka.http.scaladsl.model.HttpRequest"))),
AkkaHttpClientInstrumentation.class.getName() + "$SingleRequestAdvice");
AkkaHttpSingleRequestInstrumentation.class.getName() + "$SingleRequestAdvice");
return transformers;
}

Expand All @@ -75,8 +72,8 @@ public static AgentScope methodEnter(
In the future we may want to separate these, but since lots of code is reused we would need to come up
with way of continuing to reusing it.
*/
final int callDepth = CallDepthThreadLocalMap.incrementCallDepth(HttpExt.class);
if (callDepth > 0) {
final AkkaHttpHeaders headers = new AkkaHttpHeaders(request);
if (headers.hadSpan()) {
return null;
}

Expand All @@ -86,7 +83,6 @@ public static AgentScope methodEnter(
DECORATE.onRequest(span, request);

if (request != null) {
final AkkaHttpHeaders headers = new AkkaHttpHeaders(request);
propagate().inject(span, request, headers);
// Request is immutable, so we have to assign new value once we update headers
request = headers.getRequest();
Expand All @@ -96,15 +92,13 @@ public static AgentScope methodEnter(

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(value = 0) final HttpRequest request,
@Advice.This final HttpExt thiz,
@Advice.Return final Future<HttpResponse> responseFuture,
@Advice.Enter final AgentScope scope,
@Advice.Thrown final Throwable throwable) {
if (scope == null) {
return;
}
CallDepthThreadLocalMap.reset(HttpExt.class);

final AgentSpan span = scope.span();

Expand All @@ -118,42 +112,4 @@ public static void methodExit(
scope.close();
}
}

public static class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
private final AgentSpan span;

public OnCompleteHandler(final AgentSpan span) {
this.span = span;
}

@Override
public Void apply(final Try<HttpResponse> result) {
if (result.isSuccess()) {
DECORATE.onResponse(span, result.get());
} else {
DECORATE.onError(span, result.failed().get());
}
DECORATE.beforeFinish(span);
span.finish();
return null;
}
}

public static class AkkaHttpHeaders implements AgentPropagation.Setter<HttpRequest> {
private HttpRequest request;

public AkkaHttpHeaders(final HttpRequest request) {
this.request = request;
}

@Override
public void set(final HttpRequest carrier, final String key, final String value) {
// It looks like this cast is only needed in Java, Scala would have figured it out
request = (HttpRequest) request.addHeader(RawHeader.create(key, value));
}

public HttpRequest getRequest() {
return request;
}
}
}
@@ -0,0 +1,87 @@
package datadog.trace.instrumentation.akkahttp;

import static datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator.DECORATE;

import akka.http.javadsl.model.headers.RawHeader;
import akka.http.scaladsl.model.HttpRequest;
import akka.http.scaladsl.model.HttpResponse;
import akka.http.scaladsl.model.headers.CustomHeader;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import scala.runtime.AbstractFunction1;
import scala.util.Try;

public final class AkkaHttpClientHelpers {
public static class OnCompleteHandler extends AbstractFunction1<Try<HttpResponse>, Void> {
private final AgentSpan span;

public OnCompleteHandler(final AgentSpan span) {
this.span = span;
}

@Override
public Void apply(final Try<HttpResponse> result) {
if (result.isSuccess()) {
DECORATE.onResponse(span, result.get());
} else {
DECORATE.onError(span, result.failed().get());
}
DECORATE.beforeFinish(span);
span.finish();
return null;
}
}

public static class AkkaHttpHeaders implements AgentPropagation.Setter<HttpRequest> {
private HttpRequest request;
// Did this request have a span when the AkkaHttpHeaders object was created?
private boolean hadSpan;

public AkkaHttpHeaders(final HttpRequest request) {
hadSpan = request != null && request.getHeader(HasSpanHeader.class).isPresent();
if (hadSpan || request == null) {
this.request = request;
} else {
// Coerce a Scala trait Self type into the correct type
this.request = (HttpRequest) request.addHeader(new HasSpanHeader());
}
}

public boolean hadSpan() {
return hadSpan;
}

@Override
public void set(final HttpRequest carrier, final String key, final String value) {
// Coerce a Scala trait Self type into the correct type
request = (HttpRequest) request.addHeader(RawHeader.create(key, value));
}

public HttpRequest getRequest() {
return request;
}
}

// Custom header to mark that this request has a span associated with it
public static class HasSpanHeader extends CustomHeader {
@Override
public String name() {
return "x-datadog-request-has-span";
}

@Override
public String value() {
return "true";
}

@Override
public boolean renderInRequests() {
return false;
}

@Override
public boolean renderInResponses() {
return false;
}
}
}
Expand Up @@ -2,25 +2,29 @@ import akka.actor.ActorSystem
import akka.http.javadsl.Http
import akka.http.javadsl.model.HttpMethods
import akka.http.javadsl.model.HttpRequest
import akka.http.javadsl.model.HttpResponse
import akka.http.javadsl.model.headers.RawHeader
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.ActorMaterializer
import datadog.trace.agent.test.base.HttpClientTest
import datadog.trace.api.DDSpanTypes
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.instrumentation.akkahttp.AkkaHttpClientDecorator
import org.junit.Ignore
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters
import spock.lang.Shared
import spock.lang.Timeout

@Ignore // Ignore this test until akka actor messaging is fixed
@Timeout(5)
class AkkaHttpClientInstrumentationTest extends HttpClientTest {
abstract class AkkaHttpClientInstrumentationTest extends HttpClientTest {

@Shared
ActorSystem system = ActorSystem.create()
@Shared
ActorMaterializer materializer = ActorMaterializer.create(system)

abstract CompletionStage<HttpResponse> doRequest(HttpRequest request)

@Override
int doRequest(String method, URI uri, Map<String, String> headers, Closure callback) {
def request = HttpRequest.create(uri.toString())
Expand All @@ -29,19 +33,16 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest {

def response
try {
response = Http.get(system)
.singleRequest(request, materializer)
//.whenComplete { result, error ->
// FIXME: Callback should be here instead.
// callback?.call()
//}
response = doRequest(request)
.whenComplete { result, error ->
callback?.call()
}
.toCompletableFuture()
.get()
} finally {
// FIXME: remove this when callback above works.
// Since the spans are completed in an async callback, we need to wait here
blockUntilChildSpansFinished(1)
}
callback?.call()
return response.status().intValue()
}

Expand Down Expand Up @@ -95,3 +96,20 @@ class AkkaHttpClientInstrumentationTest extends HttpClientTest {
renameService << [false, true]
}
}

class AkkaHttpJavaClientInstrumentationTest extends AkkaHttpClientInstrumentationTest {
@Override
CompletionStage<HttpResponse> doRequest(HttpRequest request) {
return Http.get(system).singleRequest(request, materializer)
}
}

class AkkaHttpScalaClientInstrumentationTest extends AkkaHttpClientInstrumentationTest {
@Override
CompletionStage<HttpResponse> doRequest(HttpRequest request) {
def http = akka.http.scaladsl.Http.apply(system)
def sRequest = (akka.http.scaladsl.model.HttpRequest) request
def f = http.singleRequest(sRequest, http.defaultClientHttpsContext(), (ConnectionPoolSettings) ConnectionPoolSettings.apply(system), system.log(), materializer)
return FutureConverters.toJava(f)
}
}

0 comments on commit 7eac56a

Please sign in to comment.