diff --git a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/ClassLoaderMatcher.java b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/ClassLoaderMatcher.java index d06bd32b67f..cce2d90016b 100644 --- a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/ClassLoaderMatcher.java +++ b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/ClassLoaderMatcher.java @@ -31,7 +31,7 @@ public static ElementMatcher.Junction.AbstractBase classLoaderHasCl } public static ElementMatcher.Junction.AbstractBase classLoaderHasClassWithMethod( - final String className, final String methodName, final Class... methodArgs) { + final String className, final String methodName, final String... methodArgs) { return new ClassLoaderHasClassWithMethodMatcher(className, methodName, methodArgs); } @@ -205,10 +205,10 @@ public static class ClassLoaderHasClassWithMethodMatcher private final String className; private final String methodName; - private final Class[] methodArgs; + private final String[] methodArgs; private ClassLoaderHasClassWithMethodMatcher( - final String className, final String methodName, final Class... methodArgs) { + final String className, final String methodName, final String... methodArgs) { this.className = className; this.methodName = methodName; this.methodArgs = methodArgs; @@ -223,10 +223,14 @@ public boolean matches(final ClassLoader target) { } try { final Class aClass = Class.forName(className, false, target); + final Class[] methodArgsClasses = new Class[methodArgs.length]; + for (int i = 0; i < methodArgs.length; ++i) { + methodArgsClasses[i] = target.loadClass(methodArgs[i]); + } if (aClass.isInterface()) { - aClass.getMethod(methodName, methodArgs); + aClass.getMethod(methodName, methodArgsClasses); } else { - aClass.getDeclaredMethod(methodName, methodArgs); + aClass.getDeclaredMethod(methodName, methodArgsClasses); } cache.put(target, true); return true; diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala b/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala index cc77d434874..a4d53267ef8 100644 --- a/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala +++ b/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala @@ -1,4 +1,5 @@ import datadog.trace.api.Trace +import datadog.trace.context.TraceScope import akka.pattern.ask import io.opentracing.util.GlobalTracer @@ -32,18 +33,21 @@ class AkkaActors { @Trace def basicTell() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) howdyGreeter ! WhoToGreet("Akka") howdyGreeter ! Greet } @Trace def basicAsk() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) howdyGreeter ! WhoToGreet("Akka") howdyGreeter ? Greet } @Trace def basicForward() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) helloGreeter ! WhoToGreet("Akka") helloGreeter ? Greet } diff --git a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle index cc3bf64f640..7951e9254f3 100644 --- a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle @@ -7,4 +7,7 @@ dependencies { compile deps.bytebuddy compile deps.opentracing compile deps.autoservice + + testCompile project(':dd-java-agent:testing') + testCompile project(':dd-java-agent:instrumentation:trace-annotation') } diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle b/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle index 6e25720806f..1bc5e57177a 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle @@ -1,5 +1,5 @@ apply from: "${rootDir}/gradle/java.gradle" -apply plugin: 'scala' +apply from: "${rootDir}/gradle/test-with-scala.gradle" dependencies { compile project(':dd-trace-api') diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy new file mode 100644 index 00000000000..527daf4d960 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy @@ -0,0 +1,82 @@ +import datadog.opentracing.DDSpan +import datadog.trace.agent.test.AgentTestRunner + +class ScalaInstrumentationTest extends AgentTestRunner { + static { + System.setProperty("dd.integration.java_concurrent.enabled", "true") + } + + @Override + void afterTest() { + // Ignore failures to instrument sun proxy classes + } + + def "scala futures and callbacks"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks" + findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala propagates across futures with no traces"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace" + findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala either promise completion"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.traceWithPromises() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.traceWithPromises" + findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala first completed future"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == expectedNumberOfSpans + findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId() + } + + private DDSpan findSpan(List trace, String opName) { + for (DDSpan span : trace) { + if (span.getOperationName() == opName) { + return span + } + } + return null + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala similarity index 83% rename from dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala rename to dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala index c73c886d0ef..40578a5e2d8 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala @@ -1,8 +1,9 @@ import datadog.trace.api.Trace +import datadog.trace.context.TraceScope import io.opentracing.util.GlobalTracer -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} class ScalaConcurrentTests { @@ -12,6 +13,7 @@ class ScalaConcurrentTests { */ @Trace def traceWithFutureAndCallbacks() : Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val goodFuture: Future[Integer] = Future { tracedChild("goodFuture") 1 @@ -32,6 +34,7 @@ class ScalaConcurrentTests { @Trace def tracedAcrossThreadsWithNoTrace() :Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val goodFuture: Future[Integer] = Future { 1 } @@ -51,6 +54,7 @@ class ScalaConcurrentTests { */ @Trace def traceWithPromises() : Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val keptPromise = Promise[Boolean]() val brokenPromise = Promise[Boolean]() val afterPromise = keptPromise.future @@ -83,6 +87,7 @@ class ScalaConcurrentTests { */ @Trace def tracedWithFutureFirstCompletions() :Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val completedVal = Future.firstCompletedOf( List( Future { @@ -106,6 +111,7 @@ class ScalaConcurrentTests { */ @Trace def tracedTimeout(): Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val f: Future[String] = Future { tracedChild("timeoutChild") while(true) { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index 84a06e5b5e3..7646d8e8b06 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -140,7 +140,10 @@ public static class WrapRunnableAdvice { public static DatadogWrapper wrapJob( @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) { + if (scope instanceof TraceScope + && ((TraceScope) scope).isAsyncPropagating() + && task != null + && !(task instanceof DatadogWrapper)) { task = new RunnableWrapper(task, (TraceScope) scope); return (RunnableWrapper) task; } @@ -161,7 +164,10 @@ public static class WrapCallableAdvice { public static DatadogWrapper wrapJob( @Advice.Argument(value = 0, readOnly = false) Callable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) { + if (scope instanceof TraceScope + && ((TraceScope) scope).isAsyncPropagating() + && task != null + && !(task instanceof DatadogWrapper)) { task = new CallableWrapper(task, (TraceScope) scope); return (CallableWrapper) task; } @@ -182,7 +188,7 @@ public static class WrapCallableCollectionAdvice { public static Collection wrapJob( @Advice.Argument(value = 0, readOnly = false) Collection> tasks) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope) { + if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating()) { Collection> wrappedTasks = new ArrayList<>(tasks.size()); for (Callable task : tasks) { if (task != null) { @@ -217,13 +223,13 @@ public abstract static class DatadogWrapper { protected final TraceScope.Continuation continuation; public DatadogWrapper(TraceScope scope) { - continuation = scope.capture(true); + continuation = scope.capture(); log.debug("created continuation {} from scope {}", continuation, scope); } public void cancel() { if (null != continuation) { - continuation.activate().close(); + continuation.close(); log.debug("canceled continuation {}", continuation); } } @@ -241,6 +247,7 @@ public RunnableWrapper(Runnable toWrap, TraceScope scope) { @Override public void run() { final TraceScope context = continuation.activate(); + context.setAsyncPropagation(true); try { delegatee.run(); } finally { @@ -261,6 +268,7 @@ public CallableWrapper(Callable toWrap, TraceScope scope) { @Override public T call() throws Exception { final TraceScope context = continuation.activate(); + context.setAsyncPropagation(true); try { return delegatee.call(); } finally { diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy similarity index 59% rename from dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy rename to dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index 19dd7b94ba3..ac7c225748c 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -1,6 +1,8 @@ import datadog.opentracing.DDSpan +import datadog.opentracing.scopemanager.ContinuableScope import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.Trace +import io.opentracing.util.GlobalTracer import spock.lang.Shared import spock.lang.Unroll @@ -48,6 +50,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Override @Trace(operationName = "parent") void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span m.invoke(pool, new AsyncChild()) // this child won't @@ -92,6 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Override @Trace(operationName = "parent") void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) try { for (int i = 0; i < 20; ++ i) { Future f = pool.submit((Callable)child) @@ -118,73 +122,4 @@ class ExecutorInstrumentationTest extends AgentTestRunner { new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | _ new ScheduledThreadPoolExecutor(1) | _ } - - def "scala futures and callbacks"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks" - findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala propagates across futures with no traces"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace" - findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala either promise completion"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.traceWithPromises() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - TEST_WRITER.size() == 1 - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.traceWithPromises" - findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala first completed future"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - TEST_WRITER.size() == 1 - trace.size() == expectedNumberOfSpans - findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId() - } - - private DDSpan findSpan(List trace, String opName) { - for (DDSpan span : trace) { - if (span.getOperationName() == opName) { - return span - } - } - return null - } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java b/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java new file mode 100644 index 00000000000..8d8746be2da --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java @@ -0,0 +1,51 @@ +import datadog.trace.api.Trace; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncChild implements Runnable, Callable { + private final AtomicBoolean blockThread; + private final boolean doTraceableWork; + private final AtomicInteger numberOfWorkers = new AtomicInteger(0); + + public AsyncChild() { + this(true, false); + } + + public AsyncChild(boolean doTraceableWork, boolean blockThread) { + this.doTraceableWork = doTraceableWork; + this.blockThread = new AtomicBoolean(blockThread); + } + + public void unblock() { + blockThread.set(false); + } + + @Override + public void run() { + runImpl(); + } + + @Override + public Object call() throws Exception { + runImpl(); + return null; + } + + private void runImpl() { + if (doTraceableWork) { + asyncChild(); + } + numberOfWorkers.getAndIncrement(); + try { + while (blockThread.get()) { + // busy-wait to block thread + } + } finally { + numberOfWorkers.getAndDecrement(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} +} diff --git a/dd-java-agent/instrumentation/play-2.4/.gitignore b/dd-java-agent/instrumentation/play-2.4/.gitignore new file mode 100644 index 00000000000..5292519a25e --- /dev/null +++ b/dd-java-agent/instrumentation/play-2.4/.gitignore @@ -0,0 +1 @@ +logs/ \ No newline at end of file diff --git a/dd-java-agent/instrumentation/play-2.4/play-2.6-testing/src/test/groovy/Play26Test.groovy b/dd-java-agent/instrumentation/play-2.4/play-2.6-testing/src/test/groovy/Play26Test.groovy index bc192c886fb..049f00e637b 100644 --- a/dd-java-agent/instrumentation/play-2.4/play-2.6-testing/src/test/groovy/Play26Test.groovy +++ b/dd-java-agent/instrumentation/play-2.4/play-2.6-testing/src/test/groovy/Play26Test.groovy @@ -58,7 +58,7 @@ class Play26Test extends AgentTestRunner { root.traceId == 123 root.parentId == 456 root.serviceName == "unnamed-java-app" - root.operationName == "/helloplay/:from" + root.operationName == "play.request" root.resourceName == "GET /helloplay/:from" !root.context().getErrorFlag() root.context().tags["http.status_code"] == 200 @@ -85,7 +85,7 @@ class Play26Test extends AgentTestRunner { response.code() == 500 root.serviceName == "unnamed-java-app" - root.operationName == "/make-error" + root.operationName == "play.request" root.resourceName == "GET /make-error" root.context().getErrorFlag() root.context().tags["http.status_code"] == 500 @@ -116,7 +116,7 @@ class Play26Test extends AgentTestRunner { root.context().tags["error.type"] == RuntimeException.getName() root.serviceName == "unnamed-java-app" - root.operationName == "/exception" + root.operationName == "play.request" root.resourceName == "GET /exception" root.context().tags["http.status_code"] == 500 root.context().tags["http.url"] == "/exception" diff --git a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java index 7821da279f9..9e7d96c021b 100644 --- a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java +++ b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java @@ -9,6 +9,7 @@ import datadog.trace.agent.tooling.*; import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; +import datadog.trace.context.TraceScope; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.SpanContext; @@ -77,9 +78,7 @@ public AgentBuilder apply(final AgentBuilder agentBuilder) { public static class PlayAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope startSpan(@Advice.Argument(0) final Request req) { - // TODO - // begin tracking across threads - + final Scope scope; if (GlobalTracer.get().activeSpan() == null) { final SpanContext extractedContext; if (GlobalTracer.get().scopeManager().active() == null) { @@ -88,15 +87,21 @@ public static Scope startSpan(@Advice.Argument(0) final Request req) { } else { extractedContext = null; } - return GlobalTracer.get() - .buildSpan("play.request") - .asChildOf(extractedContext) - .startActive(false); + scope = + GlobalTracer.get() + .buildSpan("play.request") + .asChildOf(extractedContext) + .startActive(false); } else { // An upstream framework (e.g. akka-http, netty) has already started the span. // Do not extract the context. - return GlobalTracer.get().buildSpan("play.request").startActive(false); + scope = GlobalTracer.get().buildSpan("play.request").startActive(false); + } + + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true); } + return scope; } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @@ -111,7 +116,6 @@ public static void stopTraceOnResponse( if (!pathOption.isEmpty()) { final String path = (String) pathOption.get(); scope.span().setTag(Tags.HTTP_URL.getKey(), path); - scope.span().setOperationName(path); scope.span().setTag(DDTags.RESOURCE_NAME, req.method() + " " + path); } @@ -169,6 +173,9 @@ public RequestError(Span span) { @Override public Object apply(Throwable t, boolean isCheck) throws Exception { try { + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false); + } onError(span, t); } catch (Throwable t2) { LoggerFactory.getLogger(RequestCallback.class).debug("error in play instrumentation", t); @@ -193,8 +200,9 @@ public RequestCallback(Span span) { } public Result apply(Result result) { - // TODO - // stop tracking across threads + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false); + } try { Tags.HTTP_STATUS.set(span, result.header().status()); } catch (Throwable t) { diff --git a/dd-java-agent/instrumentation/play-2.4/src/test/groovy/Play24Test.groovy b/dd-java-agent/instrumentation/play-2.4/src/test/groovy/Play24Test.groovy index bd21e4ddab2..d2f6f287b48 100644 --- a/dd-java-agent/instrumentation/play-2.4/src/test/groovy/Play24Test.groovy +++ b/dd-java-agent/instrumentation/play-2.4/src/test/groovy/Play24Test.groovy @@ -58,7 +58,7 @@ class Play24Test extends AgentTestRunner { root.traceId == 123 root.parentId == 456 root.serviceName == "unnamed-java-app" - root.operationName == "/helloplay/:from" + root.operationName == "play.request" root.resourceName == "GET /helloplay/:from" !root.context().getErrorFlag() root.context().tags["http.status_code"] == 200 @@ -85,7 +85,7 @@ class Play24Test extends AgentTestRunner { response.code() == 500 root.serviceName == "unnamed-java-app" - root.operationName == "/make-error" + root.operationName == "play.request" root.resourceName == "GET /make-error" root.context().getErrorFlag() root.context().tags["http.status_code"] == 500 @@ -116,7 +116,7 @@ class Play24Test extends AgentTestRunner { root.context().tags["error.type"] == RuntimeException.getName() root.serviceName == "unnamed-java-app" - root.operationName == "/exception" + root.operationName == "play.request" root.resourceName == "GET /exception" root.context().tags["http.status_code"] == 500 root.context().tags["http.url"] == "/exception" diff --git a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java index 1886796edc3..ff989a2844e 100644 --- a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java +++ b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java @@ -8,11 +8,21 @@ public interface TraceScope { * *

Should be called on the parent thread. */ - Continuation capture(boolean finishOnClose); + Continuation capture(); /** Close the activated context and allow any underlying spans to finish. */ void close(); + /** If true, this context will propagate across async boundaries. */ + boolean isAsyncPropagating(); + + /** + * Enable or disable async propagation. Async propagation is initially set to false. + * + * @param value The new propagation value. True == propagate. False == don't propagate. + */ + void setAsyncPropagation(boolean value); + /** Used to pass async context between workers. */ interface Continuation { /** @@ -21,5 +31,8 @@ interface Continuation { *

Should be called on the child thread. */ TraceScope activate(); + + /** Cancel the continuation. */ + void close(); } } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java index 094e9f869ec..e0283a21c82 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java @@ -1,5 +1,6 @@ package datadog.opentracing.scopemanager; +import datadog.opentracing.DDSpan; import io.opentracing.Scope; import io.opentracing.ScopeManager; import io.opentracing.Span; @@ -17,7 +18,11 @@ public Scope activate(final Span span, final boolean finishOnClose) { return context.activate(span, finishOnClose); } } - return new ContinuableScope(this, span, finishOnClose); + if (span instanceof DDSpan) { + return new ContinuableScope(this, (DDSpan) span, finishOnClose); + } else { + return new SimpleScope(this, span, finishOnClose); + } } @Override diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index 3250894705f..efe8df43437 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -1,11 +1,10 @@ package datadog.opentracing.scopemanager; +import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpanContext; import datadog.opentracing.PendingTrace; import datadog.trace.context.TraceScope; import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopScopeManager; import java.io.Closeable; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicBoolean; @@ -14,26 +13,40 @@ @Slf4j public class ContinuableScope implements Scope, TraceScope { - final ContextualScopeManager scopeManager; - final AtomicInteger refCount; - private final Span wrapped; + /** ScopeManager holding the thread-local to this scope. */ + private final ContextualScopeManager scopeManager; + /** + * Span contained by this scope. Async scopes will hold a reference to the parent scope's span. + */ + private final DDSpan spanUnderScope; + /** If true, finish the span when openCount hits 0. */ private final boolean finishOnClose; + /** Count of open scope and continuations */ + private final AtomicInteger openCount; + /** Scope to placed in the thread local after close. May be null. */ private final Scope toRestore; + /** Continuation that created this scope. May be null. */ + private final Continuation continuation; + /** Flag to propagate this scope across async boundaries. */ + private final AtomicBoolean isAsyncPropagating = new AtomicBoolean(false); ContinuableScope( - final ContextualScopeManager scopeManager, final Span wrapped, final boolean finishOnClose) { - this(scopeManager, new AtomicInteger(1), wrapped, finishOnClose); + final ContextualScopeManager scopeManager, + final DDSpan spanUnderScope, + final boolean finishOnClose) { + this(scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose); } private ContinuableScope( final ContextualScopeManager scopeManager, - final AtomicInteger refCount, - final Span wrapped, + final AtomicInteger openCount, + final Continuation continuation, + final DDSpan spanUnderScope, final boolean finishOnClose) { - this.scopeManager = scopeManager; - this.refCount = refCount; - this.wrapped = wrapped; + this.openCount = openCount; + this.continuation = continuation; + this.spanUnderScope = spanUnderScope; this.finishOnClose = finishOnClose; this.toRestore = scopeManager.tlsScope.get(); scopeManager.tlsScope.set(this); @@ -41,30 +54,45 @@ private ContinuableScope( @Override public void close() { - if (scopeManager.tlsScope.get() != this) { - return; + if (null != continuation) { + spanUnderScope.context().getTrace().cancelContinuation(continuation); + } + + if (openCount.decrementAndGet() == 0 && finishOnClose) { + spanUnderScope.finish(); } - if (refCount.decrementAndGet() == 0 && finishOnClose) { - wrapped.finish(); + if (scopeManager.tlsScope.get() == this) { + scopeManager.tlsScope.set(toRestore); } + } - scopeManager.tlsScope.set(toRestore); + @Override + public DDSpan span() { + return spanUnderScope; } @Override - public Span span() { - return wrapped; + public boolean isAsyncPropagating() { + return isAsyncPropagating.get(); + } + + @Override + public void setAsyncPropagation(boolean value) { + isAsyncPropagating.set(value); } /** - * The continuation returned should be closed after the associa + * The continuation returned must be closed or activated or the trace will not finish. * - * @param finishOnClose - * @return + * @return The new continuation, or null if this scope is not async propagating. */ - public Continuation capture(final boolean finishOnClose) { - return new Continuation(this.finishOnClose && finishOnClose); + public Continuation capture() { + if (isAsyncPropagating()) { + return new Continuation(); + } else { + return null; + } } public class Continuation implements Closeable, TraceScope.Continuation { @@ -72,72 +100,32 @@ public class Continuation implements Closeable, TraceScope.Continuation { private final AtomicBoolean used = new AtomicBoolean(false); private final PendingTrace trace; - private final boolean finishSpanOnClose; - - private Continuation(final boolean finishOnClose) { - this.finishSpanOnClose = finishOnClose; - refCount.incrementAndGet(); - if (wrapped.context() instanceof DDSpanContext) { - final DDSpanContext context = (DDSpanContext) wrapped.context(); - trace = context.getTrace(); - trace.registerContinuation(this); - } else { - trace = null; - } + + private Continuation() { + openCount.incrementAndGet(); + final DDSpanContext context = (DDSpanContext) spanUnderScope.context(); + trace = context.getTrace(); + trace.registerContinuation(this); } - public ClosingScope activate() { + public ContinuableScope activate() { if (used.compareAndSet(false, true)) { - for (final ScopeContext context : scopeManager.scopeContexts) { - if (context.inContext()) { - return new ClosingScope(context.activate(wrapped, finishSpanOnClose)); - } - } - return new ClosingScope( - new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose)); + return new ContinuableScope(scopeManager, openCount, this, spanUnderScope, finishOnClose); } else { - log.debug("Reusing a continuation not allowed. Returning no-op scope."); - return new ClosingScope(NoopScopeManager.NoopScope.INSTANCE); + log.debug( + "Failed to activate continuation. Reusing a continuation not allowed. Returning a new scope. Spans will not be linked."); + return new ContinuableScope( + scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose); } } @Override public void close() { - used.getAndSet(true); - if (trace != null) { + if (used.compareAndSet(false, true)) { trace.cancelContinuation(this); - } - } - - private class ClosingScope implements Scope, TraceScope { - private final Scope wrappedScope; - - private ClosingScope(final Scope wrappedScope) { - this.wrappedScope = wrappedScope; - } - - @Override - public Continuation capture(boolean finishOnClose) { - if (wrappedScope instanceof TraceScope) { - return ((TraceScope) wrappedScope).capture(finishOnClose); - } else { - log.debug( - "{} Failed to capture. ClosingScope does not wrap a TraceScope: {}.", - this, - wrappedScope); - return null; - } - } - - @Override - public void close() { - wrappedScope.close(); - ContinuableScope.Continuation.this.close(); - } - - @Override - public Span span() { - return wrappedScope.span(); + ContinuableScope.this.close(); + } else { + log.debug("Failed to close continuation {}. Already used.", this); } } } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java new file mode 100644 index 00000000000..a8a0511c6b2 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java @@ -0,0 +1,39 @@ +package datadog.opentracing.scopemanager; + +import io.opentracing.Scope; +import io.opentracing.Span; + +/** Simple scope implementation which does not propagate across threads. */ +public class SimpleScope implements Scope { + private final ContextualScopeManager scopeManager; + private final Span spanUnderScope; + private final boolean finishOnClose; + private final Scope toRestore; + + public SimpleScope( + final ContextualScopeManager scopeManager, + final Span spanUnderScope, + final boolean finishOnClose) { + this.scopeManager = scopeManager; + this.spanUnderScope = spanUnderScope; + this.finishOnClose = finishOnClose; + this.toRestore = scopeManager.tlsScope.get(); + scopeManager.tlsScope.set(this); + } + + @Override + public void close() { + if (finishOnClose) { + spanUnderScope.finish(); + } + + if (scopeManager.tlsScope.get() == this) { + scopeManager.tlsScope.set(toRestore); + } + } + + @Override + public Span span() { + return spanUnderScope; + } +} diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index c428ce09164..1ef15eb2c4e 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -36,7 +36,7 @@ public class DDAgentWriter implements Writer { public static final int DEFAULT_PORT = 8126; /** Maximum number of traces kept in memory */ - static final int DEFAULT_MAX_TRACES = 1000; + static final int DEFAULT_MAX_TRACES = 7000; /** Timeout for the API in seconds */ static final long API_TIMEOUT_SECONDS = 1; diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy index d99a370eace..25113764952 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -92,11 +92,31 @@ class ScopeManagerTest extends Specification { finishSpan << [true, false] } - def "ref counting scope doesn't close if non-zero"() { + def "ContinuableScope only creates continuations when propagation is set"() { setup: def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(true) - def continuation = scope.capture(true) + def continuation = scope.capture() + + expect: + continuation == null + + when: + scope.setAsyncPropagation(true) + continuation = scope.capture() + then: + continuation != null + + cleanup: + continuation.close() + } + + def "ContinuableScope doesn't close if non-zero"() { + setup: + def builder = tracer.buildSpan("test") + def scope = (ContinuableScope) builder.startActive(true) + scope.setAsyncPropagation(true) + def continuation = scope.capture() expect: !spanFinished(scope.span()) @@ -149,7 +169,8 @@ class ScopeManagerTest extends Specification { def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() - def continuation = scope.capture(true) + scope.setAsyncPropagation(true) + def continuation = scope.capture() scope.close() span.finish() @@ -186,9 +207,10 @@ class ScopeManagerTest extends Specification { def parentScope = tracer.buildSpan("parent").startActive(true) def parentSpan = parentScope.span() ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) + childScope.setAsyncPropagation(true) def childSpan = childScope.span() - def continuation = childScope.capture(true) + def continuation = childScope.capture() childScope.close() expect: @@ -209,11 +231,12 @@ class ScopeManagerTest extends Specification { when: def newScope = continuation.activate() - def newContinuation = newScope.capture(true) + newScope.setAsyncPropagation(true) + def newContinuation = newScope.capture() then: - newScope instanceof ContinuableScope.Continuation.ClosingScope - scopeManager.active() == newScope.wrappedScope + newScope instanceof ContinuableScope + scopeManager.active() == newScope newScope != childScope && newScope != parentScope newScope.span() == childSpan !spanFinished(childSpan) @@ -236,16 +259,17 @@ class ScopeManagerTest extends Specification { def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() - def continuation = scope.capture(false) + scope.setAsyncPropagation(true) + def continuation = scope.capture() scope.close() span.finish() def newScope = continuation.activate() expect: - newScope instanceof ContinuableScope.Continuation.ClosingScope + newScope instanceof ContinuableScope newScope != scope - scopeManager.active() == newScope.wrappedScope + scopeManager.active() == newScope spanFinished(span) writer == [] @@ -259,24 +283,7 @@ class ScopeManagerTest extends Specification { scopeManager.active() == null spanFinished(childSpan) childSpan.context().parentId == span.context().spanId - writer == [] - - when: - if (closeScope) { - newScope.close() - } - if (closeContinuation) { - continuation.close() - } - - then: writer == [[childSpan, span]] - - where: - closeScope | closeContinuation - true | false - false | true - true | true } @Unroll @@ -327,40 +334,28 @@ class ScopeManagerTest extends Specification { [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _ } - @Unroll - def "threadlocal to context with capture (#active)"() { + def "ContinuableScope put in threadLocal after continuation activation"() { setup: - contexts.each { - scopeManager.addScopeContext(it) - } ContinuableScope scope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) + scope.setAsyncPropagation(true) expect: scopeManager.tlsScope.get() == scope when: - def cont = scope.capture(true) + def cont = scope.capture() scope.close() then: scopeManager.tlsScope.get() == null when: - active.each { - ((AtomicBoolean) contexts[it].enabled).set(true) - } - cont.activate() + scopeManager.addScopeContext(new AtomicReferenceScope(true)) + def newScope = cont.activate() then: - scopeManager.tlsScope.get() == null - - where: - active | contexts - [0] | [new AtomicReferenceScope(false)] - [0] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [1] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [0, 2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] + newScope != scope + scopeManager.tlsScope.get() == newScope } @Unroll