From fb8a65f575d54581f61bc1e6db9dc1eac903b6b5 Mon Sep 17 00:00:00 2001 From: Yury Gribkov Date: Wed, 26 Jun 2024 00:42:07 -0700 Subject: [PATCH] ServiceTalk async context propagation instrumentation --- .../instrumentation/servicetalk/build.gradle | 30 ++++ .../AbstractAsyncContextInstrumentation.java | 19 +++ .../ContextMapInstrumentation.java | 50 ++++++ .../ContextPreservingInstrumentation.java | 72 ++++++++ ...ontextPreservingInstrumentationTest.groovy | 156 ++++++++++++++++++ settings.gradle | 1 + 6 files changed, 328 insertions(+) create mode 100644 dd-java-agent/instrumentation/servicetalk/build.gradle create mode 100644 dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/AbstractAsyncContextInstrumentation.java create mode 100644 dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/ContextMapInstrumentation.java create mode 100644 dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/ContextPreservingInstrumentation.java create mode 100644 dd-java-agent/instrumentation/servicetalk/src/test/groovy/ContextPreservingInstrumentationTest.groovy diff --git a/dd-java-agent/instrumentation/servicetalk/build.gradle b/dd-java-agent/instrumentation/servicetalk/build.gradle new file mode 100644 index 00000000000..504105e93e9 --- /dev/null +++ b/dd-java-agent/instrumentation/servicetalk/build.gradle @@ -0,0 +1,30 @@ +plugins { + id 'java-test-fixtures' +} + +muzzle { + pass { + group = 'io.servicetalk' + module = 'servicetalk-concurrent-api' + // prev versions missing ContextMap + versions = '[0.41.12,)' + assertInverse = true + } + pass { + group = 'io.servicetalk' + module = 'servicetalk-context-api' + versions = '[0.1.0,)' + assertInverse = true + } +} + +apply from: "$rootDir/gradle/java.gradle" + +dependencies { + compileOnly group: 'io.servicetalk', name: 'servicetalk-concurrent-api', version: '0.42.45' + compileOnly group: 'io.servicetalk', name: 'servicetalk-context-api', version: '0.42.45' + + testImplementation group: 'io.servicetalk', name: 'servicetalk-concurrent-api', version: '0.42.0' + testImplementation group: 'io.servicetalk', name: 'servicetalk-context-api', version: '0.42.0' +} + diff --git a/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/AbstractAsyncContextInstrumentation.java b/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/AbstractAsyncContextInstrumentation.java new file mode 100644 index 00000000000..d4f96264555 --- /dev/null +++ b/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/AbstractAsyncContextInstrumentation.java @@ -0,0 +1,19 @@ +package datadog.trace.instrumentation.servicetalk; + +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.util.Collections; +import java.util.Map; + +public abstract class AbstractAsyncContextInstrumentation extends InstrumenterModule.Tracing { + + public AbstractAsyncContextInstrumentation() { + super("servicetalk", "servicetalk-concurrent"); + } + + @Override + public Map contextStore() { + return Collections.singletonMap( + "io.servicetalk.context.api.ContextMap", AgentSpan.class.getName()); + } +} diff --git a/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/ContextMapInstrumentation.java b/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/ContextMapInstrumentation.java new file mode 100644 index 00000000000..216955ca971 --- /dev/null +++ b/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/ContextMapInstrumentation.java @@ -0,0 +1,50 @@ +package datadog.trace.instrumentation.servicetalk; + +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.CallDepthThreadLocalMap; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import io.servicetalk.context.api.ContextMap; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public class ContextMapInstrumentation extends AbstractAsyncContextInstrumentation + implements Instrumenter.ForSingleType { + + @Override + public String instrumentedType() { + return "io.servicetalk.concurrent.api.CopyOnWriteContextMap"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$Construct"); + } + + private static final class Construct { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static boolean enter(@Advice.Origin Class clazz) { + int level = CallDepthThreadLocalMap.incrementCallDepth(clazz); + return level == 0; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit( + @Advice.Origin Class clazz, + @Advice.Enter final boolean topLevel, + @Advice.This ContextMap contextMap) { + if (!topLevel) { + return; + } + CallDepthThreadLocalMap.reset(clazz); + + InstrumentationContext.get(ContextMap.class, AgentSpan.class) + .put(contextMap, AgentTracer.activeSpan()); + } + } +} diff --git a/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/ContextPreservingInstrumentation.java b/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/ContextPreservingInstrumentation.java new file mode 100644 index 00000000000..0ae956c4eba --- /dev/null +++ b/dd-java-agent/instrumentation/servicetalk/src/main/java/datadog/trace/instrumentation/servicetalk/ContextPreservingInstrumentation.java @@ -0,0 +1,72 @@ +package datadog.trace.instrumentation.servicetalk; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.namedOneOf; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import io.servicetalk.context.api.ContextMap; +import net.bytebuddy.asm.Advice; + +@AutoService(InstrumenterModule.class) +public class ContextPreservingInstrumentation extends AbstractAsyncContextInstrumentation + implements Instrumenter.ForKnownTypes { + + @Override + public String[] knownMatchingTypes() { + return new String[] { + "io.servicetalk.concurrent.api.ContextPreservingBiConsumer", + "io.servicetalk.concurrent.api.ContextPreservingBiFunction", + "io.servicetalk.concurrent.api.ContextPreservingCallable", + "io.servicetalk.concurrent.api.ContextPreservingCancellable", + "io.servicetalk.concurrent.api.ContextPreservingCompletableSubscriber", + "io.servicetalk.concurrent.api.ContextPreservingConsumer", + "io.servicetalk.concurrent.api.ContextPreservingFunction", + "io.servicetalk.concurrent.api.ContextPreservingRunnable", + "io.servicetalk.concurrent.api.ContextPreservingSingleSubscriber", + "io.servicetalk.concurrent.api.ContextPreservingSubscriber", + "io.servicetalk.concurrent.api.ContextPreservingSubscription", + }; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + namedOneOf( + "accept", + "apply", + "call", + "cancel", + "onComplete", + "onError", + "onSuccess", + "request", + "onNext", + "onSubscribe", + "run"), + getClass().getName() + "$Wrapper"); + } + + public static final class Wrapper { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope enter(@Advice.FieldValue("saved") final ContextMap contextMap) { + AgentSpan parent = + InstrumentationContext.get(ContextMap.class, AgentSpan.class).get(contextMap); + if (parent != null) { + return AgentTracer.activateSpan(parent); + } + return null; + } + + @Advice.OnMethodExit(suppress = Throwable.class) + public static void exit(@Advice.Enter final AgentScope agentScope) { + if (agentScope != null) { + agentScope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/servicetalk/src/test/groovy/ContextPreservingInstrumentationTest.groovy b/dd-java-agent/instrumentation/servicetalk/src/test/groovy/ContextPreservingInstrumentationTest.groovy new file mode 100644 index 00000000000..0c0f31c65af --- /dev/null +++ b/dd-java-agent/instrumentation/servicetalk/src/test/groovy/ContextPreservingInstrumentationTest.groovy @@ -0,0 +1,156 @@ +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.bootstrap.instrumentation.api.AgentScope +import datadog.trace.bootstrap.instrumentation.api.AgentTracer +import io.servicetalk.concurrent.api.AsyncContext +import io.servicetalk.context.api.ContextMap + +import java.util.concurrent.ExecutorService +import java.util.concurrent.Executors + +import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace + +class ContextPreservingInstrumentationTest extends AgentTestRunner { + + def "wrapBiConsumer"() { + setup: + def parent = startParentContext() + def wrapped = + asyncContextProvider.wrapBiConsumer({ t, u -> childSpan() }, parent.contextMap) + + when: + runInSeparateThread{ wrapped.accept(null, null) } + parent.releaseParentSpan() + + then: + assertParentChildTrace() + } + + def "wrapBiFunction"() { + setup: + def parent = startParentContext() + def wrapped = + asyncContextProvider.wrapBiFunction({ t, u -> childSpan() }, parent.contextMap) + + when: + runInSeparateThread{ wrapped.apply(null, null) } + parent.releaseParentSpan() + + then: + assertParentChildTrace() + } + + def "wrapCallable"() { + setup: + def parent = startParentContext() + def wrapped = + asyncContextProvider.wrapCallable({ -> childSpan() }, parent.contextMap) + + when: + runInSeparateThread{ wrapped.call() } + parent.releaseParentSpan() + + then: + assertParentChildTrace() + } + + def "wrapConsumer"() { + setup: + def parent = startParentContext() + def wrapped = + asyncContextProvider.wrapConsumer({ t -> childSpan() }, parent.contextMap) + + when: + runInSeparateThread{ wrapped.accept(null) } + parent.releaseParentSpan() + + then: + assertParentChildTrace() + } + + def "wrapFunction"() { + setup: + def parent = startParentContext() + def wrapped = + asyncContextProvider.wrapFunction({ t -> childSpan() }, parent.contextMap) + + when: + runInSeparateThread { wrapped.apply(null) } + parent.releaseParentSpan() + + then: + assertParentChildTrace() + } + + def "wrapRunnable"() { + setup: + def parent = startParentContext() + def wrapped = + asyncContextProvider.wrapRunnable({ -> childSpan() }, parent.contextMap) + + when: + runInSeparateThread(wrapped) + parent.releaseParentSpan() + + then: + assertParentChildTrace() + } + + ExecutorService executor = Executors.newFixedThreadPool(5) + def asyncContextProvider = AsyncContext.provider + + def cleanup() { + if (executor != null) { + executor.shutdown() + } + } + + private runInSeparateThread(Runnable runnable) { + executor.submit(runnable).get() + } + + /** + * Captures async context. Also uses continuation to prevent the span from being reported until it is released. + */ + private class ParentContext { + final ContextMap contextMap = AsyncContext.context().copy() + final AgentScope.Continuation spanContinuation = AgentTracer.capture() + + def releaseParentSpan() { + spanContinuation.cancel() + } + } + + private startParentContext() { + runUnderTrace("parent") { + new ParentContext() + } + } + + /** + * Asserts a parent-child trace meaning that async context propagation works correctly. + */ + private void assertParentChildTrace() { + assertTraces(1) { + trace(2) { + sortSpansByStart() + span { + operationName "parent" + tags { + defaultTags() + } + } + span { + childOf span(0) + operationName "child" + tags { + defaultTags() + } + } + } + } + } + + private childSpan() { + AgentTracer.startSpan("test", "child").finish() + } +} diff --git a/settings.gradle b/settings.gradle index 310ea686bed..694803827de 100644 --- a/settings.gradle +++ b/settings.gradle @@ -399,6 +399,7 @@ include ':dd-java-agent:instrumentation:scala-promise:scala-promise-2.10' include ':dd-java-agent:instrumentation:scala-promise:scala-promise-2.13' include ':dd-java-agent:instrumentation:scalatest' include ':dd-java-agent:instrumentation:selenium' +include ':dd-java-agent:instrumentation:servicetalk' include ':dd-java-agent:instrumentation:servlet' include ':dd-java-agent:instrumentation:servlet-common' include ':dd-java-agent:instrumentation:servlet:request-2'