From 6e62b79b8f9b93fe0ed4c326868be06b87114106 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Wed, 28 Mar 2018 10:55:18 -0700 Subject: [PATCH 1/7] Separate executor and scala future tests --- .../java-concurrent/java-concurrent.gradle | 3 + .../scala-testing/scala-testing.gradle | 2 +- .../groovy/ScalaInstrumentationTest.groovy | 82 +++++++++++++++++++ .../scala/ScalaConcurrentTests.scala | 2 +- .../groovy/ExecutorInstrumentationTest.groovy | 69 ---------------- .../src/test/java/AsyncChild.java | 51 ++++++++++++ 6 files changed, 138 insertions(+), 71 deletions(-) create mode 100644 dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy rename dd-java-agent/instrumentation/java-concurrent/scala-testing/src/{main => test}/scala/ScalaConcurrentTests.scala (100%) rename dd-java-agent/instrumentation/java-concurrent/{scala-testing => }/src/test/groovy/ExecutorInstrumentationTest.groovy (59%) create mode 100644 dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java diff --git a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle index cc3bf64f640..7951e9254f3 100644 --- a/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/java-concurrent.gradle @@ -7,4 +7,7 @@ dependencies { compile deps.bytebuddy compile deps.opentracing compile deps.autoservice + + testCompile project(':dd-java-agent:testing') + testCompile project(':dd-java-agent:instrumentation:trace-annotation') } diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle b/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle index 6e25720806f..1bc5e57177a 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/scala-testing.gradle @@ -1,5 +1,5 @@ apply from: "${rootDir}/gradle/java.gradle" -apply plugin: 'scala' +apply from: "${rootDir}/gradle/test-with-scala.gradle" dependencies { compile project(':dd-trace-api') diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy new file mode 100644 index 00000000000..527daf4d960 --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ScalaInstrumentationTest.groovy @@ -0,0 +1,82 @@ +import datadog.opentracing.DDSpan +import datadog.trace.agent.test.AgentTestRunner + +class ScalaInstrumentationTest extends AgentTestRunner { + static { + System.setProperty("dd.integration.java_concurrent.enabled", "true") + } + + @Override + void afterTest() { + // Ignore failures to instrument sun proxy classes + } + + def "scala futures and callbacks"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks" + findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala propagates across futures with no traces"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace" + findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala either promise completion"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.traceWithPromises() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == expectedNumberOfSpans + trace[0].operationName == "ScalaConcurrentTests.traceWithPromises" + findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId() + } + + def "scala first completed future"() { + setup: + ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() + int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions() + TEST_WRITER.waitForTraces(1) + List trace = TEST_WRITER.get(0) + + expect: + TEST_WRITER.size() == 1 + trace.size() == expectedNumberOfSpans + findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId() + findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId() + } + + private DDSpan findSpan(List trace, String opName) { + for (DDSpan span : trace) { + if (span.getOperationName() == opName) { + return span + } + } + return null + } +} diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala similarity index 100% rename from dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala rename to dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala index c73c886d0ef..fd0a71851a3 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/main/scala/ScalaConcurrentTests.scala +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala @@ -1,8 +1,8 @@ import datadog.trace.api.Trace import io.opentracing.util.GlobalTracer -import scala.concurrent.duration._ import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ import scala.concurrent.{Await, Future, Promise} class ScalaConcurrentTests { diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy similarity index 59% rename from dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy rename to dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index 19dd7b94ba3..5884d02b24e 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -118,73 +118,4 @@ class ExecutorInstrumentationTest extends AgentTestRunner { new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue(1)) | _ new ScheduledThreadPoolExecutor(1) | _ } - - def "scala futures and callbacks"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks" - findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala propagates across futures with no traces"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace" - findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala either promise completion"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.traceWithPromises() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - TEST_WRITER.size() == 1 - trace.size() == expectedNumberOfSpans - trace[0].operationName == "ScalaConcurrentTests.traceWithPromises" - findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId() - } - - def "scala first completed future"() { - setup: - ScalaConcurrentTests scalaTest = new ScalaConcurrentTests() - int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions() - TEST_WRITER.waitForTraces(1) - List trace = TEST_WRITER.get(0) - - expect: - TEST_WRITER.size() == 1 - trace.size() == expectedNumberOfSpans - findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId() - findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId() - } - - private DDSpan findSpan(List trace, String opName) { - for (DDSpan span : trace) { - if (span.getOperationName() == opName) { - return span - } - } - return null - } } diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java b/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java new file mode 100644 index 00000000000..8d8746be2da --- /dev/null +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/java/AsyncChild.java @@ -0,0 +1,51 @@ +import datadog.trace.api.Trace; +import java.util.concurrent.Callable; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +public class AsyncChild implements Runnable, Callable { + private final AtomicBoolean blockThread; + private final boolean doTraceableWork; + private final AtomicInteger numberOfWorkers = new AtomicInteger(0); + + public AsyncChild() { + this(true, false); + } + + public AsyncChild(boolean doTraceableWork, boolean blockThread) { + this.doTraceableWork = doTraceableWork; + this.blockThread = new AtomicBoolean(blockThread); + } + + public void unblock() { + blockThread.set(false); + } + + @Override + public void run() { + runImpl(); + } + + @Override + public Object call() throws Exception { + runImpl(); + return null; + } + + private void runImpl() { + if (doTraceableWork) { + asyncChild(); + } + numberOfWorkers.getAndIncrement(); + try { + while (blockThread.get()) { + // busy-wait to block thread + } + } finally { + numberOfWorkers.getAndDecrement(); + } + } + + @Trace(operationName = "asyncChild") + private void asyncChild() {} +} From 0d7aa022db8810a7ed881de965204da3bc805744 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Mon, 2 Apr 2018 11:31:50 -0700 Subject: [PATCH 2/7] Refactor ContinuableScope --- .../concurrent/ExecutorInstrumentation.java | 4 +- .../datadog/trace/context/TraceScope.java | 5 +- .../scopemanager/ContextualScopeManager.java | 7 +- .../scopemanager/ContinuableScope.java | 125 +++++++----------- .../opentracing/scopemanager/SimpleScope.java | 39 ++++++ .../scopemanager/ScopeManagerTest.groovy | 62 +++------ 6 files changed, 116 insertions(+), 126 deletions(-) create mode 100644 dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index 84a06e5b5e3..5ccea111f43 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -217,13 +217,13 @@ public abstract static class DatadogWrapper { protected final TraceScope.Continuation continuation; public DatadogWrapper(TraceScope scope) { - continuation = scope.capture(true); + continuation = scope.capture(); log.debug("created continuation {} from scope {}", continuation, scope); } public void cancel() { if (null != continuation) { - continuation.activate().close(); + continuation.close(); log.debug("canceled continuation {}", continuation); } } diff --git a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java index 1886796edc3..c859d14d2d7 100644 --- a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java +++ b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java @@ -8,7 +8,7 @@ public interface TraceScope { * *

Should be called on the parent thread. */ - Continuation capture(boolean finishOnClose); + Continuation capture(); /** Close the activated context and allow any underlying spans to finish. */ void close(); @@ -21,5 +21,8 @@ interface Continuation { *

Should be called on the child thread. */ TraceScope activate(); + + /** Cancel the continuation. */ + void close(); } } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java index 094e9f869ec..e0283a21c82 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContextualScopeManager.java @@ -1,5 +1,6 @@ package datadog.opentracing.scopemanager; +import datadog.opentracing.DDSpan; import io.opentracing.Scope; import io.opentracing.ScopeManager; import io.opentracing.Span; @@ -17,7 +18,11 @@ public Scope activate(final Span span, final boolean finishOnClose) { return context.activate(span, finishOnClose); } } - return new ContinuableScope(this, span, finishOnClose); + if (span instanceof DDSpan) { + return new ContinuableScope(this, (DDSpan) span, finishOnClose); + } else { + return new SimpleScope(this, span, finishOnClose); + } } @Override diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index 3250894705f..943f2b1779f 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -1,11 +1,10 @@ package datadog.opentracing.scopemanager; +import datadog.opentracing.DDSpan; import datadog.opentracing.DDSpanContext; import datadog.opentracing.PendingTrace; import datadog.trace.context.TraceScope; import io.opentracing.Scope; -import io.opentracing.Span; -import io.opentracing.noop.NoopScopeManager; import java.io.Closeable; import java.lang.ref.WeakReference; import java.util.concurrent.atomic.AtomicBoolean; @@ -14,26 +13,38 @@ @Slf4j public class ContinuableScope implements Scope, TraceScope { - final ContextualScopeManager scopeManager; - final AtomicInteger refCount; - private final Span wrapped; + /** ScopeManager holding the thread-local to this scope. */ + private final ContextualScopeManager scopeManager; + /** + * Span contained by this scope. Async scopes will hold a reference to the parent scope's span. + */ + private final DDSpan spanUnderScope; + /** If true, finish the span when openCount hits 0. */ private final boolean finishOnClose; + /** Count of open scope and continuations */ + private final AtomicInteger openCount; + /** Scope to placed in the thread local after close. May be null. */ private final Scope toRestore; + /** Continuation that created this scope. May be null. */ + private final Continuation continuation; ContinuableScope( - final ContextualScopeManager scopeManager, final Span wrapped, final boolean finishOnClose) { - this(scopeManager, new AtomicInteger(1), wrapped, finishOnClose); + final ContextualScopeManager scopeManager, + final DDSpan spanUnderScope, + final boolean finishOnClose) { + this(scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose); } private ContinuableScope( final ContextualScopeManager scopeManager, - final AtomicInteger refCount, - final Span wrapped, + final AtomicInteger openCount, + final Continuation continuation, + final DDSpan spanUnderScope, final boolean finishOnClose) { - this.scopeManager = scopeManager; - this.refCount = refCount; - this.wrapped = wrapped; + this.openCount = openCount; + this.continuation = continuation; + this.spanUnderScope = spanUnderScope; this.finishOnClose = finishOnClose; this.toRestore = scopeManager.tlsScope.get(); scopeManager.tlsScope.set(this); @@ -41,20 +52,22 @@ private ContinuableScope( @Override public void close() { - if (scopeManager.tlsScope.get() != this) { - return; + if (null != continuation) { + spanUnderScope.context().getTrace().cancelContinuation(continuation); } - if (refCount.decrementAndGet() == 0 && finishOnClose) { - wrapped.finish(); + if (openCount.decrementAndGet() == 0 && finishOnClose) { + spanUnderScope.finish(); } - scopeManager.tlsScope.set(toRestore); + if (scopeManager.tlsScope.get() == this) { + scopeManager.tlsScope.set(toRestore); + } } @Override - public Span span() { - return wrapped; + public DDSpan span() { + return spanUnderScope; } /** @@ -63,8 +76,8 @@ public Span span() { * @param finishOnClose * @return */ - public Continuation capture(final boolean finishOnClose) { - return new Continuation(this.finishOnClose && finishOnClose); + public Continuation capture() { + return new Continuation(); } public class Continuation implements Closeable, TraceScope.Continuation { @@ -72,72 +85,32 @@ public class Continuation implements Closeable, TraceScope.Continuation { private final AtomicBoolean used = new AtomicBoolean(false); private final PendingTrace trace; - private final boolean finishSpanOnClose; - private Continuation(final boolean finishOnClose) { - this.finishSpanOnClose = finishOnClose; - refCount.incrementAndGet(); - if (wrapped.context() instanceof DDSpanContext) { - final DDSpanContext context = (DDSpanContext) wrapped.context(); - trace = context.getTrace(); - trace.registerContinuation(this); - } else { - trace = null; - } + private Continuation() { + openCount.incrementAndGet(); + final DDSpanContext context = (DDSpanContext) spanUnderScope.context(); + trace = context.getTrace(); + trace.registerContinuation(this); } - public ClosingScope activate() { + public ContinuableScope activate() { if (used.compareAndSet(false, true)) { - for (final ScopeContext context : scopeManager.scopeContexts) { - if (context.inContext()) { - return new ClosingScope(context.activate(wrapped, finishSpanOnClose)); - } - } - return new ClosingScope( - new ContinuableScope(scopeManager, refCount, wrapped, finishSpanOnClose)); + return new ContinuableScope(scopeManager, openCount, this, spanUnderScope, finishOnClose); } else { - log.debug("Reusing a continuation not allowed. Returning no-op scope."); - return new ClosingScope(NoopScopeManager.NoopScope.INSTANCE); + log.debug( + "Failed to activate continuation. Reusing a continuation not allowed. Returning a new scope. Spans will not be linked."); + return new ContinuableScope( + scopeManager, new AtomicInteger(1), null, spanUnderScope, finishOnClose); } } @Override public void close() { - used.getAndSet(true); - if (trace != null) { + if (used.compareAndSet(false, true)) { trace.cancelContinuation(this); - } - } - - private class ClosingScope implements Scope, TraceScope { - private final Scope wrappedScope; - - private ClosingScope(final Scope wrappedScope) { - this.wrappedScope = wrappedScope; - } - - @Override - public Continuation capture(boolean finishOnClose) { - if (wrappedScope instanceof TraceScope) { - return ((TraceScope) wrappedScope).capture(finishOnClose); - } else { - log.debug( - "{} Failed to capture. ClosingScope does not wrap a TraceScope: {}.", - this, - wrappedScope); - return null; - } - } - - @Override - public void close() { - wrappedScope.close(); - ContinuableScope.Continuation.this.close(); - } - - @Override - public Span span() { - return wrappedScope.span(); + ContinuableScope.this.close(); + } else { + log.debug("Failed to close continuation {}. Already used.", this); } } } diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java new file mode 100644 index 00000000000..a8a0511c6b2 --- /dev/null +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/SimpleScope.java @@ -0,0 +1,39 @@ +package datadog.opentracing.scopemanager; + +import io.opentracing.Scope; +import io.opentracing.Span; + +/** Simple scope implementation which does not propagate across threads. */ +public class SimpleScope implements Scope { + private final ContextualScopeManager scopeManager; + private final Span spanUnderScope; + private final boolean finishOnClose; + private final Scope toRestore; + + public SimpleScope( + final ContextualScopeManager scopeManager, + final Span spanUnderScope, + final boolean finishOnClose) { + this.scopeManager = scopeManager; + this.spanUnderScope = spanUnderScope; + this.finishOnClose = finishOnClose; + this.toRestore = scopeManager.tlsScope.get(); + scopeManager.tlsScope.set(this); + } + + @Override + public void close() { + if (finishOnClose) { + spanUnderScope.finish(); + } + + if (scopeManager.tlsScope.get() == this) { + scopeManager.tlsScope.set(toRestore); + } + } + + @Override + public Span span() { + return spanUnderScope; + } +} diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy index d99a370eace..f03e6e0e881 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -92,11 +92,11 @@ class ScopeManagerTest extends Specification { finishSpan << [true, false] } - def "ref counting scope doesn't close if non-zero"() { + def "ContinuableScope doesn't close if non-zero"() { setup: def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(true) - def continuation = scope.capture(true) + def continuation = scope.capture() expect: !spanFinished(scope.span()) @@ -149,7 +149,7 @@ class ScopeManagerTest extends Specification { def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() - def continuation = scope.capture(true) + def continuation = scope.capture() scope.close() span.finish() @@ -188,7 +188,7 @@ class ScopeManagerTest extends Specification { ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) def childSpan = childScope.span() - def continuation = childScope.capture(true) + def continuation = childScope.capture() childScope.close() expect: @@ -209,11 +209,11 @@ class ScopeManagerTest extends Specification { when: def newScope = continuation.activate() - def newContinuation = newScope.capture(true) + def newContinuation = newScope.capture() then: - newScope instanceof ContinuableScope.Continuation.ClosingScope - scopeManager.active() == newScope.wrappedScope + newScope instanceof ContinuableScope + scopeManager.active() == newScope newScope != childScope && newScope != parentScope newScope.span() == childSpan !spanFinished(childSpan) @@ -236,16 +236,16 @@ class ScopeManagerTest extends Specification { def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() - def continuation = scope.capture(false) + def continuation = scope.capture() scope.close() span.finish() def newScope = continuation.activate() expect: - newScope instanceof ContinuableScope.Continuation.ClosingScope + newScope instanceof ContinuableScope newScope != scope - scopeManager.active() == newScope.wrappedScope + scopeManager.active() == newScope spanFinished(span) writer == [] @@ -259,24 +259,7 @@ class ScopeManagerTest extends Specification { scopeManager.active() == null spanFinished(childSpan) childSpan.context().parentId == span.context().spanId - writer == [] - - when: - if (closeScope) { - newScope.close() - } - if (closeContinuation) { - continuation.close() - } - - then: writer == [[childSpan, span]] - - where: - closeScope | closeContinuation - true | false - false | true - true | true } @Unroll @@ -327,40 +310,27 @@ class ScopeManagerTest extends Specification { [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] | _ } - @Unroll - def "threadlocal to context with capture (#active)"() { + def "ContinuableScope put in threadLocal after continuation activation"() { setup: - contexts.each { - scopeManager.addScopeContext(it) - } ContinuableScope scope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) expect: scopeManager.tlsScope.get() == scope when: - def cont = scope.capture(true) + def cont = scope.capture() scope.close() then: scopeManager.tlsScope.get() == null when: - active.each { - ((AtomicBoolean) contexts[it].enabled).set(true) - } - cont.activate() + scopeManager.addScopeContext(new AtomicReferenceScope(true)) + def newScope = cont.activate() then: - scopeManager.tlsScope.get() == null - - where: - active | contexts - [0] | [new AtomicReferenceScope(false)] - [0] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [1] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] - [0, 2] | [new AtomicReferenceScope(false), new AtomicReferenceScope(false), new AtomicReferenceScope(false)] + newScope != scope + scopeManager.tlsScope.get() == newScope } @Unroll From a0a11a51d0fcd55c7d2365bb955faf277bd4d386 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Mon, 2 Apr 2018 16:59:57 -0700 Subject: [PATCH 3/7] Use ScopeManager to enable/disable low level async instrumentation. --- .../src/main/scala/AkkaActors.scala | 4 +++ .../src/test/scala/ScalaConcurrentTests.scala | 6 ++++ .../concurrent/ExecutorInstrumentation.java | 14 +++++++-- .../groovy/ExecutorInstrumentationTest.groovy | 4 +++ .../play/PlayInstrumentation.java | 29 ++++++++++++------- .../datadog/trace/context/TraceScope.java | 6 ++++ .../scopemanager/ContinuableScope.java | 12 ++++++++ 7 files changed, 62 insertions(+), 13 deletions(-) diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala b/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala index cc77d434874..09ec95f0219 100644 --- a/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala +++ b/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala @@ -1,4 +1,5 @@ import datadog.trace.api.Trace +import datadog.trace.context.TraceScope import akka.pattern.ask import io.opentracing.util.GlobalTracer @@ -32,18 +33,21 @@ class AkkaActors { @Trace def basicTell() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) howdyGreeter ! WhoToGreet("Akka") howdyGreeter ! Greet } @Trace def basicAsk() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) howdyGreeter ! WhoToGreet("Akka") howdyGreeter ? Greet } @Trace def basicForward() : Unit = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) helloGreeter ! WhoToGreet("Akka") helloGreeter ? Greet } diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala index fd0a71851a3..1b56a37d446 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala @@ -1,4 +1,5 @@ import datadog.trace.api.Trace +import datadog.trace.context.TraceScope import io.opentracing.util.GlobalTracer import scala.concurrent.ExecutionContext.Implicits.global @@ -12,6 +13,7 @@ class ScalaConcurrentTests { */ @Trace def traceWithFutureAndCallbacks() : Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val goodFuture: Future[Integer] = Future { tracedChild("goodFuture") 1 @@ -32,6 +34,7 @@ class ScalaConcurrentTests { @Trace def tracedAcrossThreadsWithNoTrace() :Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val goodFuture: Future[Integer] = Future { 1 } @@ -51,6 +54,7 @@ class ScalaConcurrentTests { */ @Trace def traceWithPromises() : Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val keptPromise = Promise[Boolean]() val brokenPromise = Promise[Boolean]() val afterPromise = keptPromise.future @@ -83,6 +87,7 @@ class ScalaConcurrentTests { */ @Trace def tracedWithFutureFirstCompletions() :Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val completedVal = Future.firstCompletedOf( List( Future { @@ -106,6 +111,7 @@ class ScalaConcurrentTests { */ @Trace def tracedTimeout(): Integer = { + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) val f: Future[String] = Future { tracedChild("timeoutChild") while(true) { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index 5ccea111f43..d8ab6114c29 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -140,7 +140,10 @@ public static class WrapRunnableAdvice { public static DatadogWrapper wrapJob( @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) { + if (scope instanceof TraceScope + && ((TraceScope) scope).isAsyncLinking() + && task != null + && !(task instanceof DatadogWrapper)) { task = new RunnableWrapper(task, (TraceScope) scope); return (RunnableWrapper) task; } @@ -161,7 +164,10 @@ public static class WrapCallableAdvice { public static DatadogWrapper wrapJob( @Advice.Argument(value = 0, readOnly = false) Callable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) { + if (scope instanceof TraceScope + && ((TraceScope) scope).isAsyncLinking() + && task != null + && !(task instanceof DatadogWrapper)) { task = new CallableWrapper(task, (TraceScope) scope); return (CallableWrapper) task; } @@ -182,7 +188,7 @@ public static class WrapCallableCollectionAdvice { public static Collection wrapJob( @Advice.Argument(value = 0, readOnly = false) Collection> tasks) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope) { + if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncLinking()) { Collection> wrappedTasks = new ArrayList<>(tasks.size()); for (Callable task : tasks) { if (task != null) { @@ -241,6 +247,7 @@ public RunnableWrapper(Runnable toWrap, TraceScope scope) { @Override public void run() { final TraceScope context = continuation.activate(); + context.setAsyncLinking(true); try { delegatee.run(); } finally { @@ -261,6 +268,7 @@ public CallableWrapper(Callable toWrap, TraceScope scope) { @Override public T call() throws Exception { final TraceScope context = continuation.activate(); + context.setAsyncLinking(true); try { return delegatee.call(); } finally { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index 5884d02b24e..6466b91e545 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -1,6 +1,8 @@ import datadog.opentracing.DDSpan +import datadog.opentracing.scopemanager.ContinuableScope import datadog.trace.agent.test.AgentTestRunner import datadog.trace.api.Trace +import io.opentracing.util.GlobalTracer import spock.lang.Shared import spock.lang.Unroll @@ -48,6 +50,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Override @Trace(operationName = "parent") void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true) // this child will have a span m.invoke(pool, new AsyncChild()) // this child won't @@ -92,6 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Override @Trace(operationName = "parent") void run() { + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true) try { for (int i = 0; i < 20; ++ i) { Future f = pool.submit((Callable)child) diff --git a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java index 7821da279f9..aec5d947e1f 100644 --- a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java +++ b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java @@ -9,6 +9,7 @@ import datadog.trace.agent.tooling.*; import datadog.trace.api.DDSpanTypes; import datadog.trace.api.DDTags; +import datadog.trace.context.TraceScope; import io.opentracing.Scope; import io.opentracing.Span; import io.opentracing.SpanContext; @@ -77,9 +78,7 @@ public AgentBuilder apply(final AgentBuilder agentBuilder) { public static class PlayAdvice { @Advice.OnMethodEnter(suppress = Throwable.class) public static Scope startSpan(@Advice.Argument(0) final Request req) { - // TODO - // begin tracking across threads - + final Scope scope; if (GlobalTracer.get().activeSpan() == null) { final SpanContext extractedContext; if (GlobalTracer.get().scopeManager().active() == null) { @@ -88,15 +87,21 @@ public static Scope startSpan(@Advice.Argument(0) final Request req) { } else { extractedContext = null; } - return GlobalTracer.get() - .buildSpan("play.request") - .asChildOf(extractedContext) - .startActive(false); + scope = + GlobalTracer.get() + .buildSpan("play.request") + .asChildOf(extractedContext) + .startActive(false); } else { // An upstream framework (e.g. akka-http, netty) has already started the span. // Do not extract the context. - return GlobalTracer.get().buildSpan("play.request").startActive(false); + scope = GlobalTracer.get().buildSpan("play.request").startActive(false); + } + + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true); } + return scope; } @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) @@ -169,6 +174,9 @@ public RequestError(Span span) { @Override public Object apply(Throwable t, boolean isCheck) throws Exception { try { + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false); + } onError(span, t); } catch (Throwable t2) { LoggerFactory.getLogger(RequestCallback.class).debug("error in play instrumentation", t); @@ -193,8 +201,9 @@ public RequestCallback(Span span) { } public Result apply(Result result) { - // TODO - // stop tracking across threads + if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false); + } try { Tags.HTTP_STATUS.set(span, result.header().status()); } catch (Throwable t) { diff --git a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java index c859d14d2d7..5b047951182 100644 --- a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java +++ b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java @@ -13,6 +13,12 @@ public interface TraceScope { /** Close the activated context and allow any underlying spans to finish. */ void close(); + /** If true, this context will propagate across async boundaries. */ + boolean isAsyncLinking(); + + /** Set context's async propagation value. */ + void setAsyncLinking(boolean value); + /** Used to pass async context between workers. */ interface Continuation { /** diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index 943f2b1779f..26a60d324ce 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -27,6 +27,8 @@ public class ContinuableScope implements Scope, TraceScope { private final Scope toRestore; /** Continuation that created this scope. May be null. */ private final Continuation continuation; + /** Flag to propagate this scope across async boundaries. */ + private final AtomicBoolean isAsyncLinking = new AtomicBoolean(false); ContinuableScope( final ContextualScopeManager scopeManager, @@ -70,6 +72,16 @@ public DDSpan span() { return spanUnderScope; } + @Override + public void setAsyncLinking(boolean value) { + isAsyncLinking.set(value); + } + + @Override + public boolean isAsyncLinking() { + return isAsyncLinking.get(); + } + /** * The continuation returned should be closed after the associa * From d594d6e8db88b376201c5c8d572a1667cd82d217 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Thu, 5 Apr 2018 18:36:28 -0400 Subject: [PATCH 4/7] Don't create continuations when async propagation is off --- .../src/main/scala/AkkaActors.scala | 6 ++--- .../src/test/scala/ScalaConcurrentTests.scala | 10 ++++---- .../concurrent/ExecutorInstrumentation.java | 10 ++++---- .../groovy/ExecutorInstrumentationTest.groovy | 4 +-- .../play/PlayInstrumentation.java | 6 ++--- .../datadog/trace/context/TraceScope.java | 10 +++++--- .../scopemanager/ContinuableScope.java | 21 +++++++++------- .../scopemanager/ScopeManagerTest.groovy | 25 +++++++++++++++++++ 8 files changed, 62 insertions(+), 30 deletions(-) diff --git a/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala b/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala index 09ec95f0219..a4d53267ef8 100644 --- a/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala +++ b/dd-java-agent/instrumentation/java-concurrent/akka-testing/src/main/scala/AkkaActors.scala @@ -33,21 +33,21 @@ class AkkaActors { @Trace def basicTell() : Unit = { - GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) howdyGreeter ! WhoToGreet("Akka") howdyGreeter ! Greet } @Trace def basicAsk() : Unit = { - GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) howdyGreeter ! WhoToGreet("Akka") howdyGreeter ? Greet } @Trace def basicForward() : Unit = { - GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) helloGreeter ! WhoToGreet("Akka") helloGreeter ? Greet } diff --git a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala index 1b56a37d446..40578a5e2d8 100644 --- a/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala +++ b/dd-java-agent/instrumentation/java-concurrent/scala-testing/src/test/scala/ScalaConcurrentTests.scala @@ -13,7 +13,7 @@ class ScalaConcurrentTests { */ @Trace def traceWithFutureAndCallbacks() : Integer = { - GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val goodFuture: Future[Integer] = Future { tracedChild("goodFuture") 1 @@ -34,7 +34,7 @@ class ScalaConcurrentTests { @Trace def tracedAcrossThreadsWithNoTrace() :Integer = { - GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val goodFuture: Future[Integer] = Future { 1 } @@ -54,7 +54,7 @@ class ScalaConcurrentTests { */ @Trace def traceWithPromises() : Integer = { - GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val keptPromise = Promise[Boolean]() val brokenPromise = Promise[Boolean]() val afterPromise = keptPromise.future @@ -87,7 +87,7 @@ class ScalaConcurrentTests { */ @Trace def tracedWithFutureFirstCompletions() :Integer = { - GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val completedVal = Future.firstCompletedOf( List( Future { @@ -111,7 +111,7 @@ class ScalaConcurrentTests { */ @Trace def tracedTimeout(): Integer = { - GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncLinking(true) + GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true) val f: Future[String] = Future { tracedChild("timeoutChild") while(true) { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java index d8ab6114c29..7646d8e8b06 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java +++ b/dd-java-agent/instrumentation/java-concurrent/src/main/java/datadog/trace/instrumentation/java/concurrent/ExecutorInstrumentation.java @@ -141,7 +141,7 @@ public static DatadogWrapper wrapJob( @Advice.Argument(value = 0, readOnly = false) Runnable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); if (scope instanceof TraceScope - && ((TraceScope) scope).isAsyncLinking() + && ((TraceScope) scope).isAsyncPropagating() && task != null && !(task instanceof DatadogWrapper)) { task = new RunnableWrapper(task, (TraceScope) scope); @@ -165,7 +165,7 @@ public static DatadogWrapper wrapJob( @Advice.Argument(value = 0, readOnly = false) Callable task) { final Scope scope = GlobalTracer.get().scopeManager().active(); if (scope instanceof TraceScope - && ((TraceScope) scope).isAsyncLinking() + && ((TraceScope) scope).isAsyncPropagating() && task != null && !(task instanceof DatadogWrapper)) { task = new CallableWrapper(task, (TraceScope) scope); @@ -188,7 +188,7 @@ public static class WrapCallableCollectionAdvice { public static Collection wrapJob( @Advice.Argument(value = 0, readOnly = false) Collection> tasks) { final Scope scope = GlobalTracer.get().scopeManager().active(); - if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncLinking()) { + if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating()) { Collection> wrappedTasks = new ArrayList<>(tasks.size()); for (Callable task : tasks) { if (task != null) { @@ -247,7 +247,7 @@ public RunnableWrapper(Runnable toWrap, TraceScope scope) { @Override public void run() { final TraceScope context = continuation.activate(); - context.setAsyncLinking(true); + context.setAsyncPropagation(true); try { delegatee.run(); } finally { @@ -268,7 +268,7 @@ public CallableWrapper(Callable toWrap, TraceScope scope) { @Override public T call() throws Exception { final TraceScope context = continuation.activate(); - context.setAsyncLinking(true); + context.setAsyncPropagation(true); try { return delegatee.call(); } finally { diff --git a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy index 6466b91e545..ac7c225748c 100644 --- a/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy +++ b/dd-java-agent/instrumentation/java-concurrent/src/test/groovy/ExecutorInstrumentationTest.groovy @@ -50,7 +50,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Override @Trace(operationName = "parent") void run() { - ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true) + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) // this child will have a span m.invoke(pool, new AsyncChild()) // this child won't @@ -95,7 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner { @Override @Trace(operationName = "parent") void run() { - ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true) + ((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true) try { for (int i = 0; i < 20; ++ i) { Future f = pool.submit((Callable)child) diff --git a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java index aec5d947e1f..999e297b7e6 100644 --- a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java +++ b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java @@ -99,7 +99,7 @@ public static Scope startSpan(@Advice.Argument(0) final Request req) { } if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { - ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(true); + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true); } return scope; } @@ -175,7 +175,7 @@ public RequestError(Span span) { public Object apply(Throwable t, boolean isCheck) throws Exception { try { if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { - ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false); + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false); } onError(span, t); } catch (Throwable t2) { @@ -202,7 +202,7 @@ public RequestCallback(Span span) { public Result apply(Result result) { if (GlobalTracer.get().scopeManager().active() instanceof TraceScope) { - ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncLinking(false); + ((TraceScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(false); } try { Tags.HTTP_STATUS.set(span, result.header().status()); diff --git a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java index 5b047951182..ff989a2844e 100644 --- a/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java +++ b/dd-trace-api/src/main/java/datadog/trace/context/TraceScope.java @@ -14,10 +14,14 @@ public interface TraceScope { void close(); /** If true, this context will propagate across async boundaries. */ - boolean isAsyncLinking(); + boolean isAsyncPropagating(); - /** Set context's async propagation value. */ - void setAsyncLinking(boolean value); + /** + * Enable or disable async propagation. Async propagation is initially set to false. + * + * @param value The new propagation value. True == propagate. False == don't propagate. + */ + void setAsyncPropagation(boolean value); /** Used to pass async context between workers. */ interface Continuation { diff --git a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java index 26a60d324ce..efe8df43437 100644 --- a/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java +++ b/dd-trace-ot/src/main/java/datadog/opentracing/scopemanager/ContinuableScope.java @@ -28,7 +28,7 @@ public class ContinuableScope implements Scope, TraceScope { /** Continuation that created this scope. May be null. */ private final Continuation continuation; /** Flag to propagate this scope across async boundaries. */ - private final AtomicBoolean isAsyncLinking = new AtomicBoolean(false); + private final AtomicBoolean isAsyncPropagating = new AtomicBoolean(false); ContinuableScope( final ContextualScopeManager scopeManager, @@ -73,23 +73,26 @@ public DDSpan span() { } @Override - public void setAsyncLinking(boolean value) { - isAsyncLinking.set(value); + public boolean isAsyncPropagating() { + return isAsyncPropagating.get(); } @Override - public boolean isAsyncLinking() { - return isAsyncLinking.get(); + public void setAsyncPropagation(boolean value) { + isAsyncPropagating.set(value); } /** - * The continuation returned should be closed after the associa + * The continuation returned must be closed or activated or the trace will not finish. * - * @param finishOnClose - * @return + * @return The new continuation, or null if this scope is not async propagating. */ public Continuation capture() { - return new Continuation(); + if (isAsyncPropagating()) { + return new Continuation(); + } else { + return null; + } } public class Continuation implements Closeable, TraceScope.Continuation { diff --git a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy index f03e6e0e881..25113764952 100644 --- a/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy +++ b/dd-trace-ot/src/test/groovy/datadog/opentracing/scopemanager/ScopeManagerTest.groovy @@ -92,10 +92,30 @@ class ScopeManagerTest extends Specification { finishSpan << [true, false] } + def "ContinuableScope only creates continuations when propagation is set"() { + setup: + def builder = tracer.buildSpan("test") + def scope = (ContinuableScope) builder.startActive(true) + def continuation = scope.capture() + + expect: + continuation == null + + when: + scope.setAsyncPropagation(true) + continuation = scope.capture() + then: + continuation != null + + cleanup: + continuation.close() + } + def "ContinuableScope doesn't close if non-zero"() { setup: def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(true) + scope.setAsyncPropagation(true) def continuation = scope.capture() expect: @@ -149,6 +169,7 @@ class ScopeManagerTest extends Specification { def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() + scope.setAsyncPropagation(true) def continuation = scope.capture() scope.close() span.finish() @@ -186,6 +207,7 @@ class ScopeManagerTest extends Specification { def parentScope = tracer.buildSpan("parent").startActive(true) def parentSpan = parentScope.span() ContinuableScope childScope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) + childScope.setAsyncPropagation(true) def childSpan = childScope.span() def continuation = childScope.capture() @@ -209,6 +231,7 @@ class ScopeManagerTest extends Specification { when: def newScope = continuation.activate() + newScope.setAsyncPropagation(true) def newContinuation = newScope.capture() then: @@ -236,6 +259,7 @@ class ScopeManagerTest extends Specification { def builder = tracer.buildSpan("test") def scope = (ContinuableScope) builder.startActive(false) def span = scope.span() + scope.setAsyncPropagation(true) def continuation = scope.capture() scope.close() span.finish() @@ -313,6 +337,7 @@ class ScopeManagerTest extends Specification { def "ContinuableScope put in threadLocal after continuation activation"() { setup: ContinuableScope scope = (ContinuableScope) tracer.buildSpan("parent").startActive(true) + scope.setAsyncPropagation(true) expect: scopeManager.tlsScope.get() == scope From 2d92ed2bed18eec79f17ac2f8a312c93b82b4c34 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Wed, 11 Apr 2018 13:14:49 -0700 Subject: [PATCH 5/7] Use the same operationName for all play requests --- dd-java-agent/instrumentation/play-2.4/.gitignore | 1 + .../play-2.6-testing/src/test/groovy/Play26Test.groovy | 6 +++--- .../trace/instrumentation/play/PlayInstrumentation.java | 1 - .../play-2.4/src/test/groovy/Play24Test.groovy | 6 +++--- 4 files changed, 7 insertions(+), 7 deletions(-) create mode 100644 dd-java-agent/instrumentation/play-2.4/.gitignore diff --git a/dd-java-agent/instrumentation/play-2.4/.gitignore b/dd-java-agent/instrumentation/play-2.4/.gitignore new file mode 100644 index 00000000000..5292519a25e --- /dev/null +++ b/dd-java-agent/instrumentation/play-2.4/.gitignore @@ -0,0 +1 @@ +logs/ \ No newline at end of file diff --git a/dd-java-agent/instrumentation/play-2.4/play-2.6-testing/src/test/groovy/Play26Test.groovy b/dd-java-agent/instrumentation/play-2.4/play-2.6-testing/src/test/groovy/Play26Test.groovy index bc192c886fb..049f00e637b 100644 --- a/dd-java-agent/instrumentation/play-2.4/play-2.6-testing/src/test/groovy/Play26Test.groovy +++ b/dd-java-agent/instrumentation/play-2.4/play-2.6-testing/src/test/groovy/Play26Test.groovy @@ -58,7 +58,7 @@ class Play26Test extends AgentTestRunner { root.traceId == 123 root.parentId == 456 root.serviceName == "unnamed-java-app" - root.operationName == "/helloplay/:from" + root.operationName == "play.request" root.resourceName == "GET /helloplay/:from" !root.context().getErrorFlag() root.context().tags["http.status_code"] == 200 @@ -85,7 +85,7 @@ class Play26Test extends AgentTestRunner { response.code() == 500 root.serviceName == "unnamed-java-app" - root.operationName == "/make-error" + root.operationName == "play.request" root.resourceName == "GET /make-error" root.context().getErrorFlag() root.context().tags["http.status_code"] == 500 @@ -116,7 +116,7 @@ class Play26Test extends AgentTestRunner { root.context().tags["error.type"] == RuntimeException.getName() root.serviceName == "unnamed-java-app" - root.operationName == "/exception" + root.operationName == "play.request" root.resourceName == "GET /exception" root.context().tags["http.status_code"] == 500 root.context().tags["http.url"] == "/exception" diff --git a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java index 999e297b7e6..9e7d96c021b 100644 --- a/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java +++ b/dd-java-agent/instrumentation/play-2.4/src/main/java/datadog/trace/instrumentation/play/PlayInstrumentation.java @@ -116,7 +116,6 @@ public static void stopTraceOnResponse( if (!pathOption.isEmpty()) { final String path = (String) pathOption.get(); scope.span().setTag(Tags.HTTP_URL.getKey(), path); - scope.span().setOperationName(path); scope.span().setTag(DDTags.RESOURCE_NAME, req.method() + " " + path); } diff --git a/dd-java-agent/instrumentation/play-2.4/src/test/groovy/Play24Test.groovy b/dd-java-agent/instrumentation/play-2.4/src/test/groovy/Play24Test.groovy index bd21e4ddab2..d2f6f287b48 100644 --- a/dd-java-agent/instrumentation/play-2.4/src/test/groovy/Play24Test.groovy +++ b/dd-java-agent/instrumentation/play-2.4/src/test/groovy/Play24Test.groovy @@ -58,7 +58,7 @@ class Play24Test extends AgentTestRunner { root.traceId == 123 root.parentId == 456 root.serviceName == "unnamed-java-app" - root.operationName == "/helloplay/:from" + root.operationName == "play.request" root.resourceName == "GET /helloplay/:from" !root.context().getErrorFlag() root.context().tags["http.status_code"] == 200 @@ -85,7 +85,7 @@ class Play24Test extends AgentTestRunner { response.code() == 500 root.serviceName == "unnamed-java-app" - root.operationName == "/make-error" + root.operationName == "play.request" root.resourceName == "GET /make-error" root.context().getErrorFlag() root.context().tags["http.status_code"] == 500 @@ -116,7 +116,7 @@ class Play24Test extends AgentTestRunner { root.context().tags["error.type"] == RuntimeException.getName() root.serviceName == "unnamed-java-app" - root.operationName == "/exception" + root.operationName == "play.request" root.resourceName == "GET /exception" root.context().tags["http.status_code"] == 500 root.context().tags["http.url"] == "/exception" From 35c40846adb017e7fc471531f438627f4ffdac32 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Wed, 11 Apr 2018 13:31:55 -0700 Subject: [PATCH 6/7] Accept string for method params on classloader method matcher --- .../trace/agent/tooling/ClassLoaderMatcher.java | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/ClassLoaderMatcher.java b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/ClassLoaderMatcher.java index d06bd32b67f..cce2d90016b 100644 --- a/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/ClassLoaderMatcher.java +++ b/dd-java-agent/agent-tooling/src/main/java/datadog/trace/agent/tooling/ClassLoaderMatcher.java @@ -31,7 +31,7 @@ public static ElementMatcher.Junction.AbstractBase classLoaderHasCl } public static ElementMatcher.Junction.AbstractBase classLoaderHasClassWithMethod( - final String className, final String methodName, final Class... methodArgs) { + final String className, final String methodName, final String... methodArgs) { return new ClassLoaderHasClassWithMethodMatcher(className, methodName, methodArgs); } @@ -205,10 +205,10 @@ public static class ClassLoaderHasClassWithMethodMatcher private final String className; private final String methodName; - private final Class[] methodArgs; + private final String[] methodArgs; private ClassLoaderHasClassWithMethodMatcher( - final String className, final String methodName, final Class... methodArgs) { + final String className, final String methodName, final String... methodArgs) { this.className = className; this.methodName = methodName; this.methodArgs = methodArgs; @@ -223,10 +223,14 @@ public boolean matches(final ClassLoader target) { } try { final Class aClass = Class.forName(className, false, target); + final Class[] methodArgsClasses = new Class[methodArgs.length]; + for (int i = 0; i < methodArgs.length; ++i) { + methodArgsClasses[i] = target.loadClass(methodArgs[i]); + } if (aClass.isInterface()) { - aClass.getMethod(methodName, methodArgs); + aClass.getMethod(methodName, methodArgsClasses); } else { - aClass.getDeclaredMethod(methodName, methodArgs); + aClass.getDeclaredMethod(methodName, methodArgsClasses); } cache.put(target, true); return true; From 03b96ca567eeb918d1875057fbdbbfa49d2f6a58 Mon Sep 17 00:00:00 2001 From: Andrew Kent Date: Wed, 11 Apr 2018 13:32:21 -0700 Subject: [PATCH 7/7] Increase agent writer max trace limit --- .../main/java/datadog/trace/common/writer/DDAgentWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java index c428ce09164..1ef15eb2c4e 100644 --- a/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java +++ b/dd-trace-ot/src/main/java/datadog/trace/common/writer/DDAgentWriter.java @@ -36,7 +36,7 @@ public class DDAgentWriter implements Writer { public static final int DEFAULT_PORT = 8126; /** Maximum number of traces kept in memory */ - static final int DEFAULT_MAX_TRACES = 1000; + static final int DEFAULT_MAX_TRACES = 7000; /** Timeout for the API in seconds */ static final long API_TIMEOUT_SECONDS = 1;