Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ public static ElementMatcher.Junction.AbstractBase<ClassLoader> classLoaderHasCl
}

public static ElementMatcher.Junction.AbstractBase<ClassLoader> classLoaderHasClassWithMethod(
final String className, final String methodName, final Class... methodArgs) {
final String className, final String methodName, final String... methodArgs) {
return new ClassLoaderHasClassWithMethodMatcher(className, methodName, methodArgs);
}

Expand Down Expand Up @@ -205,10 +205,10 @@ public static class ClassLoaderHasClassWithMethodMatcher

private final String className;
private final String methodName;
private final Class[] methodArgs;
private final String[] methodArgs;

private ClassLoaderHasClassWithMethodMatcher(
final String className, final String methodName, final Class... methodArgs) {
final String className, final String methodName, final String... methodArgs) {
this.className = className;
this.methodName = methodName;
this.methodArgs = methodArgs;
Expand All @@ -223,10 +223,14 @@ public boolean matches(final ClassLoader target) {
}
try {
final Class<?> aClass = Class.forName(className, false, target);
final Class[] methodArgsClasses = new Class[methodArgs.length];
for (int i = 0; i < methodArgs.length; ++i) {
methodArgsClasses[i] = target.loadClass(methodArgs[i]);
}
if (aClass.isInterface()) {
aClass.getMethod(methodName, methodArgs);
aClass.getMethod(methodName, methodArgsClasses);
} else {
aClass.getDeclaredMethod(methodName, methodArgs);
aClass.getDeclaredMethod(methodName, methodArgsClasses);
}
cache.put(target, true);
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datadog.trace.api.Trace
import datadog.trace.context.TraceScope
import akka.pattern.ask
import io.opentracing.util.GlobalTracer

Expand Down Expand Up @@ -32,18 +33,21 @@ class AkkaActors {

@Trace
def basicTell() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
howdyGreeter ! WhoToGreet("Akka")
howdyGreeter ! Greet
}

@Trace
def basicAsk() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
howdyGreeter ! WhoToGreet("Akka")
howdyGreeter ? Greet
}

@Trace
def basicForward() : Unit = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
helloGreeter ! WhoToGreet("Akka")
helloGreeter ? Greet
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ dependencies {
compile deps.bytebuddy
compile deps.opentracing
compile deps.autoservice

testCompile project(':dd-java-agent:testing')
testCompile project(':dd-java-agent:instrumentation:trace-annotation')
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
apply from: "${rootDir}/gradle/java.gradle"
apply plugin: 'scala'
apply from: "${rootDir}/gradle/test-with-scala.gradle"

dependencies {
compile project(':dd-trace-api')
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import datadog.opentracing.DDSpan
import datadog.trace.agent.test.AgentTestRunner

class ScalaInstrumentationTest extends AgentTestRunner {
static {
System.setProperty("dd.integration.java_concurrent.enabled", "true")
}

@Override
void afterTest() {
// Ignore failures to instrument sun proxy classes
}

def "scala futures and callbacks"() {
setup:
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
trace.size() == expectedNumberOfSpans
trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks"
findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId()
}

def "scala propagates across futures with no traces"() {
setup:
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
trace.size() == expectedNumberOfSpans
trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace"
findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId()
}

def "scala either promise completion"() {
setup:
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
int expectedNumberOfSpans = scalaTest.traceWithPromises()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
TEST_WRITER.size() == 1
trace.size() == expectedNumberOfSpans
trace[0].operationName == "ScalaConcurrentTests.traceWithPromises"
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
}

def "scala first completed future"() {
setup:
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
TEST_WRITER.size() == 1
trace.size() == expectedNumberOfSpans
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
}

private DDSpan findSpan(List<DDSpan> trace, String opName) {
for (DDSpan span : trace) {
if (span.getOperationName() == opName) {
return span
}
}
return null
}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import datadog.trace.api.Trace
import datadog.trace.context.TraceScope
import io.opentracing.util.GlobalTracer

import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}

class ScalaConcurrentTests {
Expand All @@ -12,6 +13,7 @@ class ScalaConcurrentTests {
*/
@Trace
def traceWithFutureAndCallbacks() : Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val goodFuture: Future[Integer] = Future {
tracedChild("goodFuture")
1
Expand All @@ -32,6 +34,7 @@ class ScalaConcurrentTests {

@Trace
def tracedAcrossThreadsWithNoTrace() :Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val goodFuture: Future[Integer] = Future {
1
}
Expand All @@ -51,6 +54,7 @@ class ScalaConcurrentTests {
*/
@Trace
def traceWithPromises() : Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val keptPromise = Promise[Boolean]()
val brokenPromise = Promise[Boolean]()
val afterPromise = keptPromise.future
Expand Down Expand Up @@ -83,6 +87,7 @@ class ScalaConcurrentTests {
*/
@Trace
def tracedWithFutureFirstCompletions() :Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val completedVal = Future.firstCompletedOf(
List(
Future {
Expand All @@ -106,6 +111,7 @@ class ScalaConcurrentTests {
*/
@Trace
def tracedTimeout(): Integer = {
GlobalTracer.get().scopeManager().active().asInstanceOf[TraceScope].setAsyncPropagation(true)
val f: Future[String] = Future {
tracedChild("timeoutChild")
while(true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,10 @@ public static class WrapRunnableAdvice {
public static DatadogWrapper wrapJob(
@Advice.Argument(value = 0, readOnly = false) Runnable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) {
if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)) {
task = new RunnableWrapper(task, (TraceScope) scope);
return (RunnableWrapper) task;
}
Expand All @@ -161,7 +164,10 @@ public static class WrapCallableAdvice {
public static DatadogWrapper wrapJob(
@Advice.Argument(value = 0, readOnly = false) Callable task) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope && task != null && !(task instanceof DatadogWrapper)) {
if (scope instanceof TraceScope
&& ((TraceScope) scope).isAsyncPropagating()
&& task != null
&& !(task instanceof DatadogWrapper)) {
task = new CallableWrapper(task, (TraceScope) scope);
return (CallableWrapper) task;
}
Expand All @@ -182,7 +188,7 @@ public static class WrapCallableCollectionAdvice {
public static Collection<?> wrapJob(
@Advice.Argument(value = 0, readOnly = false) Collection<? extends Callable<?>> tasks) {
final Scope scope = GlobalTracer.get().scopeManager().active();
if (scope instanceof TraceScope) {
if (scope instanceof TraceScope && ((TraceScope) scope).isAsyncPropagating()) {
Collection<Callable<?>> wrappedTasks = new ArrayList<>(tasks.size());
for (Callable task : tasks) {
if (task != null) {
Expand Down Expand Up @@ -217,13 +223,13 @@ public abstract static class DatadogWrapper {
protected final TraceScope.Continuation continuation;

public DatadogWrapper(TraceScope scope) {
continuation = scope.capture(true);
continuation = scope.capture();
log.debug("created continuation {} from scope {}", continuation, scope);
}

public void cancel() {
if (null != continuation) {
continuation.activate().close();
continuation.close();
log.debug("canceled continuation {}", continuation);
}
}
Expand All @@ -241,6 +247,7 @@ public RunnableWrapper(Runnable toWrap, TraceScope scope) {
@Override
public void run() {
final TraceScope context = continuation.activate();
context.setAsyncPropagation(true);
try {
delegatee.run();
} finally {
Expand All @@ -261,6 +268,7 @@ public CallableWrapper(Callable<T> toWrap, TraceScope scope) {
@Override
public T call() throws Exception {
final TraceScope context = continuation.activate();
context.setAsyncPropagation(true);
try {
return delegatee.call();
} finally {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import datadog.opentracing.DDSpan
import datadog.opentracing.scopemanager.ContinuableScope
import datadog.trace.agent.test.AgentTestRunner
import datadog.trace.api.Trace
import io.opentracing.util.GlobalTracer
import spock.lang.Shared
import spock.lang.Unroll

Expand Down Expand Up @@ -48,6 +50,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
// this child will have a span
m.invoke(pool, new AsyncChild())
// this child won't
Expand Down Expand Up @@ -92,6 +95,7 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
@Override
@Trace(operationName = "parent")
void run() {
((ContinuableScope) GlobalTracer.get().scopeManager().active()).setAsyncPropagation(true)
try {
for (int i = 0; i < 20; ++ i) {
Future f = pool.submit((Callable)child)
Expand All @@ -118,73 +122,4 @@ class ExecutorInstrumentationTest extends AgentTestRunner {
new ThreadPoolExecutor(1, 1, 1000, TimeUnit.NANOSECONDS, new ArrayBlockingQueue<Runnable>(1)) | _
new ScheduledThreadPoolExecutor(1) | _
}

def "scala futures and callbacks"() {
setup:
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
int expectedNumberOfSpans = scalaTest.traceWithFutureAndCallbacks()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
trace.size() == expectedNumberOfSpans
trace[0].operationName == "ScalaConcurrentTests.traceWithFutureAndCallbacks"
findSpan(trace, "goodFuture").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "badFuture").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "successCallback").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "failureCallback").context().getParentId() == trace[0].context().getSpanId()
}

def "scala propagates across futures with no traces"() {
setup:
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
int expectedNumberOfSpans = scalaTest.tracedAcrossThreadsWithNoTrace()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
trace.size() == expectedNumberOfSpans
trace[0].operationName == "ScalaConcurrentTests.tracedAcrossThreadsWithNoTrace"
findSpan(trace, "callback").context().getParentId() == trace[0].context().getSpanId()
}

def "scala either promise completion"() {
setup:
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
int expectedNumberOfSpans = scalaTest.traceWithPromises()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
TEST_WRITER.size() == 1
trace.size() == expectedNumberOfSpans
trace[0].operationName == "ScalaConcurrentTests.traceWithPromises"
findSpan(trace, "keptPromise").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "keptPromise2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "brokenPromise").context().getParentId() == trace[0].context().getSpanId()
}

def "scala first completed future"() {
setup:
ScalaConcurrentTests scalaTest = new ScalaConcurrentTests()
int expectedNumberOfSpans = scalaTest.tracedWithFutureFirstCompletions()
TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)

expect:
TEST_WRITER.size() == 1
trace.size() == expectedNumberOfSpans
findSpan(trace, "timeout1").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "timeout2").context().getParentId() == trace[0].context().getSpanId()
findSpan(trace, "timeout3").context().getParentId() == trace[0].context().getSpanId()
}

private DDSpan findSpan(List<DDSpan> trace, String opName) {
for (DDSpan span : trace) {
if (span.getOperationName() == opName) {
return span
}
}
return null
}
}
Loading