From 9bdb3aa346fc97181c9600a9b3b6110963c0b39e Mon Sep 17 00:00:00 2001 From: Jonas Kunz Date: Wed, 26 Jul 2023 12:18:44 +0200 Subject: [PATCH] Implemented in-process baggage propagation (#3248) --- CHANGELOG.asciidoc | 2 +- .../apm/agent/concurrent/JavaConcurrent.java | 32 ++++--- ...leCallableForkJoinTaskInstrumentation.java | 5 +- .../ExecutorInstrumentationTest.java | 89 +++++++++++++----- .../agent/concurrent/ForkJoinPoolTest.java | 26 +++--- .../apm/agent/reactor/TracedSubscriber.java | 38 ++++---- .../agent/reactor/TracedSubscriberTest.java | 48 ++++++++-- .../FutureInstrumentation.java | 16 ++-- .../FutureInstrumentationSpec.scala | 39 ++++++-- .../TransactionAwareSubscriber.java | 8 +- .../vertx/AbstractVertxWebClientHelper.java | 93 +++++++++---------- .../agent/vertx/GenericHandlerWrapper.java | 30 +++--- .../apm/agent/vertx/SetTimerWrapper.java | 14 +-- .../webclient/AbstractVertxWebClientTest.java | 5 - .../vertx/v3/EventLoopInstrumentation.java | 4 +- .../webclient/WebClientInstrumentation.java | 9 +- .../vertx/v4/EventLoopInstrumentation.java | 4 +- .../webclient/HttpContextInstrumentation.java | 58 ++++++------ 18 files changed, 310 insertions(+), 210 deletions(-) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index e9c3f4dd55..dcc1d67e27 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -33,7 +33,7 @@ Use subheadings with the "=====" level for adding notes for unreleased changes: [float] ===== Features -* Added W3C baggage propagation - {pull}3236[#3236] +* Added W3C baggage propagation - {pull}3236[#3236], {pull}3248[#3248] * Added support for baggage in OpenTelemetry bridge - {pull}3249[#3249] [[release-notes-1.x]] diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/JavaConcurrent.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/JavaConcurrent.java index 9507eec38d..3f06f75079 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/JavaConcurrent.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/JavaConcurrent.java @@ -19,10 +19,11 @@ package co.elastic.apm.agent.concurrent; import co.elastic.apm.agent.common.ThreadUtils; -import co.elastic.apm.agent.tracer.AbstractSpan; import co.elastic.apm.agent.sdk.DynamicTransformer; import co.elastic.apm.agent.sdk.ElasticApmInstrumentation; import co.elastic.apm.agent.sdk.state.GlobalState; +import co.elastic.apm.agent.tracer.AbstractSpan; +import co.elastic.apm.agent.tracer.ElasticContext; import co.elastic.apm.agent.tracer.GlobalTracer; import co.elastic.apm.agent.tracer.Tracer; import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap; @@ -42,7 +43,7 @@ @GlobalState public class JavaConcurrent { - private static final ReferenceCountedMap> contextMap = GlobalTracer.get().newReferenceCountedMap(); + private static final ReferenceCountedMap> contextMap = GlobalTracer.get().newReferenceCountedMap(); private static final List> RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION = Collections. >singletonList(RunnableCallableForkJoinTaskInstrumentation.class); @@ -75,24 +76,25 @@ private static boolean shouldAvoidContextPropagation(@Nullable Object executable * Retrieves the context mapped to the provided task and activates it on the current thread. * It is the responsibility of the caller to deactivate the returned context at the right time. * If the mapped context is already the active span of this thread, this method returns {@code null}. - * @param o a task for which running there may be a context to activate + * + * @param o a task for which running there may be a context to activate * @param tracer the tracer * @return the context mapped to the provided task or {@code null} if such does not exist or if the mapped context * is already the active one on the current thread. */ @Nullable - public static AbstractSpan restoreContext(Object o, Tracer tracer) { + public static ElasticContext restoreContext(Object o, Tracer tracer) { // When an Executor executes directly on the current thread we need to enable this thread for context propagation again needsContext.set(Boolean.TRUE); // we cannot remove yet, as this decrements the reference count, which may cause already ended spans to be recycled ahead of time - AbstractSpan context = contextMap.get(o); + ElasticContext context = contextMap.get(o); if (context == null) { return null; } try { - if (tracer.getActive() != context) { + if (tracer.currentContext() != context) { return context.activate(); } else { return null; @@ -111,8 +113,8 @@ public static Runnable withContext(@Nullable Runnable runnable, Tracer tracer) { return runnable; } needsContext.set(Boolean.FALSE); - AbstractSpan active = tracer.getActive(); - if (active == null) { + ElasticContext active = tracer.currentContext(); + if (active.isEmpty()) { return runnable; } if (isLambda(runnable)) { @@ -122,11 +124,13 @@ public static Runnable withContext(@Nullable Runnable runnable, Tracer tracer) { return runnable; } - private static void captureContext(Object task, AbstractSpan active) { + private static void captureContext(Object task, ElasticContext active) { DynamicTransformer.ensureInstrumented(task.getClass(), RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION); contextMap.put(task, active); // Do no discard branches leading to async operations so not to break span references - active.setNonDiscardable(); + if (active.getSpan() != null) { + active.getSpan().setNonDiscardable(); + } } /** @@ -138,8 +142,8 @@ public static Callable withContext(@Nullable Callable callable, Tracer return callable; } needsContext.set(Boolean.FALSE); - AbstractSpan active = tracer.getActive(); - if (active == null) { + ElasticContext active = tracer.currentContext(); + if (active.isEmpty()) { return callable; } if (isLambda(callable)) { @@ -155,8 +159,8 @@ public static ForkJoinTask withContext(@Nullable ForkJoinTask task, Tr return task; } needsContext.set(Boolean.FALSE); - AbstractSpan active = tracer.getActive(); - if (active == null) { + ElasticContext active = tracer.currentContext(); + if (active.isEmpty()) { return task; } captureContext(task, active); diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java index 3210789442..12b8b96723 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java @@ -23,6 +23,7 @@ import co.elastic.apm.agent.tracer.AbstractSpan; import co.elastic.apm.agent.tracer.GlobalTracer; import co.elastic.apm.agent.tracer.Tracer; +import co.elastic.apm.agent.tracer.ElasticContext; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; @@ -80,8 +81,8 @@ public static Object onEnter(@Advice.This Object thiz) { @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false) public static void onExit(@Advice.Thrown Throwable thrown, @Nullable @Advice.Enter Object context) { - if (context instanceof AbstractSpan) { - ((AbstractSpan) context).deactivate(); + if (context != null) { + ((ElasticContext) context).deactivate(); } } } diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentationTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentationTest.java index d476474ec8..cbc36d65d7 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentationTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ExecutorInstrumentationTest.java @@ -19,10 +19,11 @@ package co.elastic.apm.agent.concurrent; import co.elastic.apm.agent.AbstractInstrumentationTest; +import co.elastic.apm.agent.impl.baggage.BaggageContext; +import co.elastic.apm.agent.impl.transaction.ElasticContext; import co.elastic.apm.agent.impl.transaction.Span; import co.elastic.apm.agent.impl.transaction.Transaction; import org.junit.After; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -30,17 +31,18 @@ import org.springframework.core.task.SyncTaskExecutor; import java.util.Arrays; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; -import static org.assertj.core.api.Assertions.assertThat; +import static co.elastic.apm.agent.testutils.assertions.Assertions.assertThat; @RunWith(Parameterized.class) public class ExecutorInstrumentationTest extends AbstractInstrumentationTest { private final Executor executor; - private Transaction transaction; public ExecutorInstrumentationTest(Supplier supplier) { executor = supplier.get(); @@ -51,10 +53,6 @@ public static Iterable> data() { return Arrays.asList(SimpleAsyncTaskExecutor::new, SyncTaskExecutor::new); } - @Before - public void setUp() { - transaction = tracer.startRootTransaction(null).withName("Transaction").activate(); - } @After public void tearDown() { @@ -63,14 +61,70 @@ public void tearDown() { @Test public void testExecutorExecute_Transaction() { - executor.execute(this::createAsyncSpan); - assertOnlySpanIsChildOfOnlyTransaction(); + Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate(); + executor.execute(() -> createAsyncSpan(transaction)); + try { + // wait for the async operation to end + assertThat(reporter.getFirstSpan(1000)).isNotNull(); + } finally { + transaction.deactivate().end(); + } + assertThat(reporter.getTransactions()).hasSize(1); + assertThat(reporter.getSpans()).hasSize(1); + assertThat(reporter.getFirstSpan().isChildOf(reporter.getFirstTransaction())).isTrue(); + } + + @Test + public void testBaggagePropagationWithTransaction() throws InterruptedException { + Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate(); + BaggageContext transactionWithBaggage = tracer.currentContext().withUpdatedBaggage() + .put("foo", "bar") + .buildContext() + .activate(); + + AtomicReference> propagatedContext = new AtomicReference<>(); + CountDownLatch doneLatch = new CountDownLatch(1); + executor.execute(() -> { + propagatedContext.set(tracer.currentContext()); + doneLatch.countDown(); + }); + transactionWithBaggage.deactivate(); + transaction.deactivate().end(); + doneLatch.await(); + + assertThat(propagatedContext.get().getBaggage()) + .hasSize(1) + .containsEntry("foo", "bar"); + assertThat(propagatedContext.get().getTransaction()).isSameAs(transaction); + } + + @Test + public void testBaggagePropagationWithoutTransaction() throws InterruptedException { + BaggageContext transactionWithBaggage = tracer.currentContext().withUpdatedBaggage() + .put("foo", "bar") + .buildContext() + .activate(); + + AtomicReference> propagatedContext = new AtomicReference<>(); + CountDownLatch doneLatch = new CountDownLatch(1); + executor.execute(() -> { + propagatedContext.set(tracer.currentContext()); + doneLatch.countDown(); + }); + transactionWithBaggage.deactivate(); + doneLatch.await(); + + assertThat(propagatedContext.get().getBaggage()) + .hasSize(1) + .containsEntry("foo", "bar"); + assertThat(propagatedContext.get().getTransaction()).isNull(); } @Test public void testExecutorExecute_Span() { + Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate(); Span nonAsyncSpan = transaction.createSpan().withName("NonAsync").activate(); - executor.execute(this::createAsyncSpan); + executor.execute(() -> createAsyncSpan(transaction)); try { // wait for the async operation to end assertThat(reporter.getFirstSpan(1000)).isNotNull(); @@ -84,20 +138,9 @@ public void testExecutorExecute_Span() { assertThat(reporter.getFirstSpan().isChildOf(nonAsyncSpan)).isTrue(); } - private void assertOnlySpanIsChildOfOnlyTransaction() { - try { - // wait for the async operation to end - assertThat(reporter.getFirstSpan(1000)).isNotNull(); - } finally { - transaction.deactivate().end(); - } - assertThat(reporter.getTransactions()).hasSize(1); - assertThat(reporter.getSpans()).hasSize(1); - assertThat(reporter.getFirstSpan().isChildOf(reporter.getFirstTransaction())).isTrue(); - } - private void createAsyncSpan() { - assertThat(tracer.currentTransaction()).isEqualTo(transaction); + private void createAsyncSpan(Transaction expectedCurrent) { + assertThat(tracer.currentTransaction()).isEqualTo(expectedCurrent); tracer.getActive().createSpan().withName("Async").end(); } } diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ForkJoinPoolTest.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ForkJoinPoolTest.java index 4564b68855..cc644b6603 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ForkJoinPoolTest.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/test/java/co/elastic/apm/agent/concurrent/ForkJoinPoolTest.java @@ -19,13 +19,12 @@ package co.elastic.apm.agent.concurrent; import co.elastic.apm.agent.AbstractInstrumentationTest; -import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Transaction; +import co.elastic.apm.agent.tracer.ElasticContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ForkJoinPool; import java.util.concurrent.ForkJoinTask; @@ -39,34 +38,39 @@ public class ForkJoinPoolTest extends AbstractInstrumentationTest { private ForkJoinPool pool; private Transaction transaction; + private ElasticContext txWithBaggage; @BeforeEach void setUp() { pool = new ForkJoinPool(); transaction = tracer.startRootTransaction(null).withName("transaction").activate(); + txWithBaggage = tracer.currentContext().withUpdatedBaggage().put("foo", "bar").buildContext().activate(); } @AfterEach void tearDown() { assertThat(tracer.getActive()).isEqualTo(transaction); + assertThat(tracer.currentContext()).isSameAs(txWithBaggage); + txWithBaggage.deactivate(); transaction.deactivate().end(); + } @Test void testExecute() throws Exception { - final ForkJoinTask> task = newTask(() -> tracer.getActive()); + final ForkJoinTask> task = newTask(() -> tracer.currentContext()); pool.execute(task); - assertThat(task.get()).isEqualTo(transaction); + assertThat(task.get()).isSameAs(txWithBaggage); } @Test void testSubmit() throws Exception { - assertThat(pool.submit(newTask(() -> tracer.getActive())).get()).isEqualTo(transaction); + assertThat(pool.submit(newTask(() -> tracer.currentContext())).get()).isSameAs(txWithBaggage); } @Test void testInvoke() throws Exception { - assertThat(pool.invoke(newTask(() -> tracer.getActive()))).isEqualTo(transaction); + assertThat(pool.invoke(newTask(() -> tracer.currentContext()))).isSameAs(txWithBaggage); } @Test @@ -76,19 +80,19 @@ void testCompletableFuture() throws Exception { // Preferences | Build, Execution, Deployment | Debugger | Async Stack Traces // and uncheck the Instrumenting Agent checkbox assertThat(CompletableFuture - .supplyAsync(() -> Objects.requireNonNull(tracer.getActive())) - .thenApplyAsync(active -> tracer.getActive()) + .supplyAsync(() -> tracer.currentContext()) + .thenApplyAsync(active -> tracer.currentContext()) .get()) - .isEqualTo(transaction); + .isSameAs(txWithBaggage); } @Test void testParallelStream() { assertThat(Stream.of("foo", "bar", "baz") .parallel() - .>map(s -> tracer.getActive()) + .>map(s -> tracer.currentContext()) .distinct()) - .containsExactly(transaction); + .containsExactly(txWithBaggage); } public static class AdaptedSupplier extends ForkJoinTask implements Runnable { diff --git a/apm-agent-plugins/apm-reactor-plugin/src/main/java/co/elastic/apm/agent/reactor/TracedSubscriber.java b/apm-agent-plugins/apm-reactor-plugin/src/main/java/co/elastic/apm/agent/reactor/TracedSubscriber.java index 7c6b6f3206..a1438d936b 100644 --- a/apm-agent-plugins/apm-reactor-plugin/src/main/java/co/elastic/apm/agent/reactor/TracedSubscriber.java +++ b/apm-agent-plugins/apm-reactor-plugin/src/main/java/co/elastic/apm/agent/reactor/TracedSubscriber.java @@ -18,12 +18,12 @@ */ package co.elastic.apm.agent.reactor; -import co.elastic.apm.agent.tracer.AbstractSpan; import co.elastic.apm.agent.sdk.logging.Logger; import co.elastic.apm.agent.sdk.logging.LoggerFactory; import co.elastic.apm.agent.sdk.state.GlobalVariables; import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent; import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap; +import co.elastic.apm.agent.tracer.ElasticContext; import co.elastic.apm.agent.tracer.GlobalTracer; import co.elastic.apm.agent.tracer.Tracer; import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap; @@ -47,7 +47,7 @@ public class TracedSubscriber implements CoreSubscriber { private static final AtomicBoolean isRegistered = GlobalVariables.get(ReactorInstrumentation.class, "reactor-hook-enabled", new AtomicBoolean(false)); - private static final ReferenceCountedMap, AbstractSpan> contextMap = GlobalTracer.get().newReferenceCountedMap(); + private static final ReferenceCountedMap, ElasticContext> contextMap = GlobalTracer.get().newReferenceCountedMap(); private static final String HOOK_KEY = "elastic-apm"; @@ -57,13 +57,13 @@ public class TracedSubscriber implements CoreSubscriber { private final Context context; - TracedSubscriber(CoreSubscriber subscriber, Tracer tracer, AbstractSpan context) { + TracedSubscriber(CoreSubscriber subscriber, Tracer tracer, ElasticContext context) { this.subscriber = subscriber; this.tracer = tracer; contextMap.put(this, context); // store our span/transaction into reactor context for later lookup without relying on active tracer state - this.context = subscriber.currentContext().put(AbstractSpan.class, context); + this.context = subscriber.currentContext().put(ElasticContext.class, context); } @Override @@ -78,7 +78,7 @@ public Context currentContext() { */ @Override public void onSubscribe(Subscription s) { - AbstractSpan context = getContext(); + ElasticContext context = getContext(); boolean hasActivated = doEnter("onSubscribe", context); Throwable thrown = null; try { @@ -99,7 +99,7 @@ public void onSubscribe(Subscription s) { */ @Override public void onNext(T next) { - AbstractSpan context = getContext(); + ElasticContext context = getContext(); boolean hasActivated = doEnter("onNext", context); Throwable thrown = null; try { @@ -120,7 +120,7 @@ public void onNext(T next) { */ @Override public void onError(Throwable t) { - AbstractSpan context = getContext(); + ElasticContext context = getContext(); boolean hasActivated = doEnter("onError", context); try { subscriber.onError(t); @@ -135,7 +135,7 @@ public void onError(Throwable t) { */ @Override public void onComplete() { - AbstractSpan context = getContext(); + ElasticContext context = getContext(); boolean hasActivated = doEnter("onComplete", context); try { subscriber.onComplete(); @@ -152,10 +152,10 @@ public void onComplete() { * @param context context * @return {@literal true} if context has been activated */ - private boolean doEnter(String method, @Nullable AbstractSpan context) { + private boolean doEnter(String method, @Nullable ElasticContext context) { debugTrace(true, method, context); - if (context == null || tracer.getActive() == context) { + if (context == null || tracer.currentContext() == context) { // already activated or discarded return false; } @@ -171,14 +171,14 @@ private boolean doEnter(String method, @Nullable AbstractSpan context) { * @param method method name (only for debugging) * @param context context */ - private void doExit(boolean deactivate, String method, @Nullable AbstractSpan context) { + private void doExit(boolean deactivate, String method, @Nullable ElasticContext context) { debugTrace(false, method, context); if (context == null || !deactivate) { return; } - if (context != tracer.getActive()) { + if (context != tracer.currentContext()) { // don't attempt to deactivate if not the active one return; } @@ -194,7 +194,7 @@ private void discardIf(boolean condition) { contextMap.remove(this); } - private void debugTrace(boolean isEnter, String method, @Nullable AbstractSpan context) { + private void debugTrace(boolean isEnter, String method, @Nullable ElasticContext context) { if (!log.isTraceEnabled()) { return; } @@ -205,7 +205,7 @@ private void debugTrace(boolean isEnter, String method, @Nullable AbstractSpan getContext() { + private ElasticContext getContext() { return contextMap.get(this); } @@ -252,19 +252,19 @@ public CoreSubscriber apply(Publisher publisher, CoreSubscriber active = tracer.getActive(); + ElasticContext active = tracer.currentContext(); // fallback to using context-stored span/transaction if not already active - if (active == null) { - active = subscriber.currentContext().getOrDefault(AbstractSpan.class, null); + if (active.isEmpty()) { + active = subscriber.currentContext().getOrDefault(ElasticContext.class, null); } - if (active == null) { + if (active == null || active.isEmpty()) { // no active context, we have nothing to wrap return subscriber; } - if(log.isTraceEnabled()) { + if (log.isTraceEnabled()) { log.trace("wrapping subscriber {} publisher {} with active span/transaction {}", safeToString(subscriber), publisher, active); } diff --git a/apm-agent-plugins/apm-reactor-plugin/src/test/java/co/elastic/apm/agent/reactor/TracedSubscriberTest.java b/apm-agent-plugins/apm-reactor-plugin/src/test/java/co/elastic/apm/agent/reactor/TracedSubscriberTest.java index d5d89e9646..aa99e7c493 100644 --- a/apm-agent-plugins/apm-reactor-plugin/src/test/java/co/elastic/apm/agent/reactor/TracedSubscriberTest.java +++ b/apm-agent-plugins/apm-reactor-plugin/src/test/java/co/elastic/apm/agent/reactor/TracedSubscriberTest.java @@ -19,8 +19,10 @@ package co.elastic.apm.agent.reactor; import co.elastic.apm.agent.AbstractInstrumentationTest; +import co.elastic.apm.agent.impl.baggage.BaggageContext; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.impl.transaction.Transaction; +import co.elastic.apm.agent.tracer.ElasticContext; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -146,6 +148,32 @@ void contextPropagation_DifferentThreads() { .verifyComplete(); } + @Test + void baggagePropagation_DifferentThreads() { + + // we have a transaction active in current thread + BaggageContext baggageContext = tracer.currentContext().withUpdatedBaggage() + .put("foo", "bar") + .buildContext() + .activate(); + + Flux flux = Flux.just(1, 2, 3).log("input") + // subscribe & publish on separate threads + .subscribeOn(SUBSCRIBE_SCHEDULER) + .publishOn(PUBLISH_SCHEDULER) + // + .map(TestObservation::capture); + + StepVerifier.create(flux.log("output")) + .expectNextMatches(inOtherThread(baggageContext, 1)) + .expectNextMatches(inOtherThread(baggageContext, 2)) + .expectNextMatches(inOtherThread(baggageContext, 3)) + .verifyComplete(); + + baggageContext.deactivate(); + } + + @Test void contextPropagation_Flux_Map_Zip() { transaction = startTestRootTransaction("root"); @@ -220,7 +248,7 @@ static Predicate inMainThread(@Nullable AbstractSpan expecte }; } - static Predicate inOtherThread(@Nullable AbstractSpan expectedContext, int expectedValue) { + static Predicate inOtherThread(@Nullable ElasticContext expectedContext, int expectedValue) { return observation -> { observation .checkActiveContext(expectedContext) @@ -250,12 +278,12 @@ static Predicate noActiveContext(int expectedValue) { */ private static class TestObservation { @Nullable - private final AbstractSpan activeContext; + private final ElasticContext activeContext; private final Long threadId; private final int value; private TestObservation(int value) { - this.activeContext = tracer.getActive(); + this.activeContext = tracer.currentContext(); this.threadId = currentThreadId(); this.value = value; } @@ -286,10 +314,16 @@ TestObservation checkThread(boolean mainThread) { return this; } - TestObservation checkActiveContext(@Nullable AbstractSpan expectedActiveContext) { - assertThat(activeContext) - .describedAs("%s active context in thread %d", activeContext == null ? "missing" : "unexpected", threadId) - .isEqualTo(expectedActiveContext); + TestObservation checkActiveContext(@Nullable ElasticContext expectedActiveContext) { + if (expectedActiveContext == null) { + assertThat(activeContext.isEmpty()) + .describedAs("unexpected active context in thread %d", threadId) + .isTrue(); + } else { + assertThat(activeContext) + .describedAs("%s active context in thread %d", activeContext.isEmpty() ? "missing" : "unexpected", threadId) + .isEqualTo(expectedActiveContext); + } return this; } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java index 1b5df42526..59277bfc5c 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java @@ -21,6 +21,7 @@ import co.elastic.apm.agent.sdk.ElasticApmInstrumentation; import co.elastic.apm.agent.tracer.AbstractSpan; import co.elastic.apm.agent.tracer.GlobalTracer; +import co.elastic.apm.agent.tracer.ElasticContext; import co.elastic.apm.agent.tracer.Tracer; import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap; import net.bytebuddy.asm.Advice; @@ -42,8 +43,7 @@ public abstract class FutureInstrumentation extends ElasticApmInstrumentation { private static final Tracer tracer = GlobalTracer.get(); @SuppressWarnings("WeakerAccess") - public static final ReferenceCountedMap> promisesToContext = tracer.newReferenceCountedMap(); - + public static final ReferenceCountedMap> promisesToContext = tracer.newReferenceCountedMap(); @Nonnull @Override @@ -66,8 +66,8 @@ public ElementMatcher getMethodMatcher() { public static class AdviceClass { @Advice.OnMethodExit(suppress = Throwable.class, inline = false) public static void onExit(@Advice.This Object thiz) { - final AbstractSpan context = tracer.getActive(); - if (context != null) { + final ElasticContext context = tracer.currentContext(); + if (!context.isEmpty()) { promisesToContext.put(thiz, context); } } @@ -93,7 +93,7 @@ public static class AdviceClass { public static Object onEnter(@Advice.This Object thiz) { // We cannot remove yet, as this may decrement the ref count of the span to 0 if it has already ended, // thus causing it to be recycled just before we activate it on the current thread. So we first get(). - AbstractSpan context = promisesToContext.get(thiz); + ElasticContext context = promisesToContext.get(thiz); if (context != null) { context.activate(); // Now it's safe to remove, as ref count is at least 2 @@ -103,9 +103,9 @@ public static Object onEnter(@Advice.This Object thiz) { } @Advice.OnMethodExit(suppress = Throwable.class, inline = false) - public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { - if (abstractSpanObj instanceof AbstractSpan) { - AbstractSpan context = (AbstractSpan) abstractSpanObj; + public static void onExit(@Advice.Enter @Nullable Object contextObj) { + if (contextObj != null) { + ElasticContext context = (ElasticContext) contextObj; context.deactivate(); } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala index 847f3f11ed..79e7a296e4 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala @@ -1,18 +1,16 @@ package co.elastic.apm.agent.scalaconcurrent -import java.util.concurrent.Executors - import co.elastic.apm.agent.MockReporter import co.elastic.apm.agent.bci.ElasticApmAgent import co.elastic.apm.agent.configuration.{CoreConfiguration, SpyConfiguration} -import co.elastic.apm.agent.impl.transaction.Transaction import co.elastic.apm.agent.impl.{ElasticApmTracer, ElasticApmTracerBuilder} import munit.FunSuite import net.bytebuddy.agent.ByteBuddyAgent import org.stagemonitor.configuration.ConfigurationRegistry +import java.util.concurrent.Executors +import scala.concurrent._ import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future, Promise} import scala.util.{Failure, Success} class FutureInstrumentationSpec extends FunSuite { @@ -20,7 +18,6 @@ class FutureInstrumentationSpec extends FunSuite { private var reporter: MockReporter = _ private var tracer: ElasticApmTracer = _ private var coreConfiguration: CoreConfiguration = _ - private var transaction: Transaction = _ override def beforeEach(context: BeforeEach): Unit = { reporter = new MockReporter @@ -29,7 +26,6 @@ class FutureInstrumentationSpec extends FunSuite { tracer = new ElasticApmTracerBuilder().configurationRegistry(config).reporter(reporter).build ElasticApmAgent.initInstrumentation(tracer, ByteBuddyAgent.install) tracer.start(false) - transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() } override def afterEach(context: AfterEach): Unit = ElasticApmAgent.reset() @@ -38,7 +34,8 @@ class FutureInstrumentationSpec extends FunSuite { implicit val executionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - val future = Future("Test") + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + val future = Future("Test") .map(_.length) .flatMap(l => Future(l * 2)) .map(_.toString) @@ -51,7 +48,30 @@ class FutureInstrumentationSpec extends FunSuite { reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], true ) + } + + test("Scala Future should propagate Baggage across different threads") { + + implicit val executionContext: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + + val baggageContext = tracer.currentContext().withUpdatedBaggage() + .put("foo", "bar") + .buildContext() + .activate() + val future = Future("Test") + .map(_.length) + .flatMap(l => Future(l * 2)) + .map(_.toString) + .flatMap(s => Future(s"$s-$s")) + .map(_ => + tracer.currentContext() + ) + + baggageContext.deactivate(); + val activeCtx = Await.result(future, 10.seconds) + assertEquals(activeCtx.getBaggage.get("foo"), "bar") } @@ -59,6 +79,7 @@ class FutureInstrumentationSpec extends FunSuite { implicit val executionContext: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() new TestFutureTraceMethods().invokeAsync(tracer) transaction.deactivate().end() assertEquals(reporter.getTransactions.size(), 1) @@ -74,6 +95,7 @@ class FutureInstrumentationSpec extends FunSuite { implicit val multiPoolEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() val future = Future .traverse(1 to 100) { _ => Future.sequence(List( @@ -114,6 +136,7 @@ class FutureInstrumentationSpec extends FunSuite { implicit val multiPoolEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() val promise = Promise[Int]() Future { Thread.sleep(100) } @@ -141,6 +164,7 @@ class FutureInstrumentationSpec extends FunSuite { implicit val multiPoolEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() val future = Future .sequence(List( Future(Thread.sleep(25)) @@ -161,6 +185,7 @@ class FutureInstrumentationSpec extends FunSuite { implicit val multiPoolEc: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() val promise = Promise[Int]() Future diff --git a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/TransactionAwareSubscriber.java b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/TransactionAwareSubscriber.java index 56a5983ed6..3747089052 100644 --- a/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/TransactionAwareSubscriber.java +++ b/apm-agent-plugins/apm-spring-webflux/apm-spring-webflux-spring5/src/main/java/co/elastic/apm/agent/springwebflux/TransactionAwareSubscriber.java @@ -18,14 +18,14 @@ */ package co.elastic.apm.agent.springwebflux; -import co.elastic.apm.agent.tracer.AbstractSpan; +import co.elastic.apm.agent.sdk.logging.Logger; +import co.elastic.apm.agent.sdk.logging.LoggerFactory; +import co.elastic.apm.agent.tracer.ElasticContext; import co.elastic.apm.agent.tracer.GlobalTracer; import co.elastic.apm.agent.tracer.Transaction; import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import co.elastic.apm.agent.sdk.logging.Logger; -import co.elastic.apm.agent.sdk.logging.LoggerFactory; import org.springframework.web.server.ServerWebExchange; import reactor.core.CoreSubscriber; import reactor.util.context.Context; @@ -72,7 +72,7 @@ class TransactionAwareSubscriber implements CoreSubscriber, Subscription { // store transaction into subscriber context it can be looked-up by reactor when the transaction // is not already active in current thread. - this.context = subscriber.currentContext().put(AbstractSpan.class, transaction); + this.context = subscriber.currentContext().put(ElasticContext.class, transaction); } @Override diff --git a/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/AbstractVertxWebClientHelper.java b/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/AbstractVertxWebClientHelper.java index 950629679f..ab306a28fb 100644 --- a/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/AbstractVertxWebClientHelper.java +++ b/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/AbstractVertxWebClientHelper.java @@ -37,6 +37,7 @@ public abstract class AbstractVertxWebClientHelper { private static final String WEB_CLIENT_SPAN_KEY = AbstractVertxWebClientHelper.class.getName() + ".span"; + private static final String PROPAGATION_CONTEXT_KEY = AbstractVertxWebClientHelper.class.getName() + ".propCtx"; private final Tracer tracer = GlobalTracer.get(); @@ -50,45 +51,38 @@ public void setHeader(String headerName, String headerValue, HttpClientRequest c } } - public void startSpan(ElasticContext activeContext, HttpContext httpContext, HttpClientRequest httpRequest) { - Object existingSpanObj = httpContext.get(WEB_CLIENT_SPAN_KEY); - - AbstractSpan propagateContextOf; - - if (existingSpanObj != null) { - // there is already an active span for this HTTP request, - // don't create a new span but propagate tracing headers - propagateContextOf = (Span) existingSpanObj; - } else { - URI requestUri = URI.create(httpRequest.absoluteURI()); - Span span = HttpClientHelper.startHttpClientSpan(activeContext, getMethod(httpRequest), requestUri, null); - - if (span != null) { - propagateContextOf = span; - span.incrementReferences(); - httpContext.set(WEB_CLIENT_SPAN_KEY, span); - } else { - propagateContextOf = activeContext.getSpan(); - } + public void startSpanOrFollowRedirect(ElasticContext activeContext, HttpContext httpContext, HttpClientRequest httpRequest) { + ElasticContext existingPropagationCtx = httpContext.get(PROPAGATION_CONTEXT_KEY); + + if (existingPropagationCtx != null) { + // Repropagate headers in case of redirects + existingPropagationCtx.propagateContext(httpRequest, HeaderSetter.INSTANCE, null); + return; } - propagateContextOf.activate(); - try { - tracer.currentContext().propagateContext(httpRequest, HeaderSetter.INSTANCE, null); - } finally { - propagateContextOf.deactivate(); + if (activeContext.isEmpty()) { + return; //Nothing to propagate and we'll never start an exit span due to missing transaction + } + + URI requestUri = URI.create(httpRequest.absoluteURI()); + Span span = HttpClientHelper.startHttpClientSpan(activeContext, getMethod(httpRequest), requestUri, null); + ElasticContext toPropagate = activeContext; + if (span != null) { + //no need to increment references of the span, the span will be kept alive by the incrementReferences() on the context below + httpContext.set(WEB_CLIENT_SPAN_KEY, span); + span.activate(); + toPropagate = tracer.currentContext(); + span.deactivate(); } + + toPropagate.incrementReferences(); + httpContext.set(PROPAGATION_CONTEXT_KEY, toPropagate); + toPropagate.propagateContext(httpRequest, HeaderSetter.INSTANCE, null); } public void followRedirect(HttpContext httpContext, HttpClientRequest httpRequest) { - Object existingSpanObj = httpContext.get(WEB_CLIENT_SPAN_KEY); - if (existingSpanObj != null) { - Span existingSpan = (Span) existingSpanObj; - existingSpan.activate(); - try { - tracer.currentContext().propagateContext(httpRequest, HeaderSetter.INSTANCE, null); - } finally { - existingSpan.deactivate(); - } + ElasticContext existingPropagationCtx = httpContext.get(PROPAGATION_CONTEXT_KEY); + if (existingPropagationCtx != null) { + existingPropagationCtx.propagateContext(httpRequest, HeaderSetter.INSTANCE, null); } } @@ -101,24 +95,25 @@ public void failSpan(HttpContext httpContext, Throwable thrown, @Nullable Abs } private void finalizeSpan(HttpContext httpContext, int statusCode, @Nullable Throwable thrown, @Nullable AbstractSpan parent) { - Object spanObj = httpContext.get(WEB_CLIENT_SPAN_KEY); - - if (spanObj != null) { + Span span = httpContext.get(WEB_CLIENT_SPAN_KEY); + ElasticContext propagationCtx = httpContext.get(PROPAGATION_CONTEXT_KEY); + if (propagationCtx != null) { // Setting to null removes from the attributes map httpContext.set(WEB_CLIENT_SPAN_KEY, null); - - Span span = (Span) spanObj; - span.decrementReferences(); - - if (thrown != null) { - span.captureException(thrown).withOutcome(Outcome.FAILURE); - } - - if (statusCode > 0) { - span.getContext().getHttp().withStatusCode(statusCode); + httpContext.set(PROPAGATION_CONTEXT_KEY, null); + try { + if (span != null) { + if (thrown != null) { + span.captureException(thrown).withOutcome(Outcome.FAILURE); + } + if (statusCode > 0) { + span.getContext().getHttp().withStatusCode(statusCode); + } + span.end(); + } + } finally { + propagationCtx.decrementReferences(); } - - span.end(); } else if (parent != null) { parent.captureException(thrown); } diff --git a/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/GenericHandlerWrapper.java b/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/GenericHandlerWrapper.java index e81bce8b8d..9463669629 100644 --- a/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/GenericHandlerWrapper.java +++ b/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/GenericHandlerWrapper.java @@ -18,40 +18,44 @@ */ package co.elastic.apm.agent.vertx; -import co.elastic.apm.agent.tracer.GlobalTracer; import co.elastic.apm.agent.tracer.AbstractSpan; +import co.elastic.apm.agent.tracer.ElasticContext; +import co.elastic.apm.agent.tracer.GlobalTracer; import io.vertx.core.Handler; public class GenericHandlerWrapper implements Handler { protected final Handler actualHandler; - private final AbstractSpan parentSpan; + private final ElasticContext parentContext; - public GenericHandlerWrapper(AbstractSpan parentSpan, Handler actualHandler) { - this.parentSpan = parentSpan; + public GenericHandlerWrapper(ElasticContext parentContext, Handler actualHandler) { + this.parentContext = parentContext; this.actualHandler = actualHandler; - parentSpan.incrementReferences(); + parentContext.incrementReferences(); } @Override public void handle(T event) { - parentSpan.activate(); - parentSpan.decrementReferences(); + parentContext.activate(); + parentContext.decrementReferences(); try { actualHandler.handle(event); } catch (Throwable throwable) { - parentSpan.captureException(throwable); + AbstractSpan activeSpan = parentContext.getSpan(); + if (activeSpan != null) { + activeSpan.captureException(throwable); + } throw throwable; } finally { - parentSpan.deactivate(); + parentContext.deactivate(); } } - public static Handler wrapIfActiveSpan(Handler handler) { - AbstractSpan currentSpan = GlobalTracer.get().getActive(); + public static Handler wrapIfNonEmptyContext(Handler handler) { + ElasticContext currentContext = GlobalTracer.get().currentContext(); - if (currentSpan != null) { - handler = new GenericHandlerWrapper<>(currentSpan, handler); + if (!currentContext.isEmpty()) { + handler = new GenericHandlerWrapper<>(currentContext, handler); } return handler; diff --git a/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/SetTimerWrapper.java b/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/SetTimerWrapper.java index 0510b365b6..be3b30492c 100644 --- a/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/SetTimerWrapper.java +++ b/apm-agent-plugins/apm-vertx/apm-vertx-common/src/main/java/co/elastic/apm/agent/vertx/SetTimerWrapper.java @@ -18,8 +18,8 @@ */ package co.elastic.apm.agent.vertx; +import co.elastic.apm.agent.tracer.ElasticContext; import co.elastic.apm.agent.tracer.GlobalTracer; -import co.elastic.apm.agent.tracer.AbstractSpan; import io.vertx.core.Handler; public class SetTimerWrapper extends GenericHandlerWrapper { @@ -39,16 +39,16 @@ public void handle(Long event) { } } - public SetTimerWrapper(AbstractSpan parentSpan, Handler actualHandler) { - super(parentSpan, actualHandler); + public SetTimerWrapper(ElasticContext parentContext, Handler actualHandler) { + super(parentContext, actualHandler); } - public static Handler wrapTimerIfActiveSpan(Handler handler) { - AbstractSpan currentSpan = GlobalTracer.get().getActive(); + public static Handler wrapTimerIfNonEmptyContext(Handler handler) { + ElasticContext current = GlobalTracer.get().currentContext(); // do not wrap if there is no parent span or if we are in the recursive context of the same type of timer - if (currentSpan != null && !handler.getClass().getName().equals(activeTimerHandlerPerThread.get())) { - handler = new SetTimerWrapper(currentSpan, handler); + if (!current.isEmpty() && !handler.getClass().getName().equals(activeTimerHandlerPerThread.get())) { + handler = new SetTimerWrapper(current, handler); } return handler; diff --git a/apm-agent-plugins/apm-vertx/apm-vertx-common/src/test/java/co/elastic/apm/agent/vertx/webclient/AbstractVertxWebClientTest.java b/apm-agent-plugins/apm-vertx/apm-vertx-common/src/test/java/co/elastic/apm/agent/vertx/webclient/AbstractVertxWebClientTest.java index f2752609a9..202a8a60e2 100644 --- a/apm-agent-plugins/apm-vertx/apm-vertx-common/src/test/java/co/elastic/apm/agent/vertx/webclient/AbstractVertxWebClientTest.java +++ b/apm-agent-plugins/apm-vertx/apm-vertx-common/src/test/java/co/elastic/apm/agent/vertx/webclient/AbstractVertxWebClientTest.java @@ -97,11 +97,6 @@ public void testFailedRequest() { doVerifyFailedRequestHttpSpan("not-existing.com", "/error"); } - @Override - public void testBaggagePropagatedWithoutTrace() { - //TODO: remove this NOOP override when vert-x context propagation (instead of span-propagation) is correctly implemented - } - protected void doVerifyFailedRequestHttpSpan(String host, String path) { expectSpan(path) .withHost(host) diff --git a/apm-agent-plugins/apm-vertx/apm-vertx3-plugin/src/main/java/co/elastic/apm/agent/vertx/v3/EventLoopInstrumentation.java b/apm-agent-plugins/apm-vertx/apm-vertx3-plugin/src/main/java/co/elastic/apm/agent/vertx/v3/EventLoopInstrumentation.java index ae4e3782a4..7402a8b54f 100644 --- a/apm-agent-plugins/apm-vertx/apm-vertx3-plugin/src/main/java/co/elastic/apm/agent/vertx/v3/EventLoopInstrumentation.java +++ b/apm-agent-plugins/apm-vertx/apm-vertx3-plugin/src/main/java/co/elastic/apm/agent/vertx/v3/EventLoopInstrumentation.java @@ -58,7 +58,7 @@ public static class SetTimerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) @Advice.AssignReturned.ToArguments(@ToArgument(value = 1)) public static Handler setTimerEnter(@Advice.Argument(value = 1) Handler handler) { - return SetTimerWrapper.wrapTimerIfActiveSpan(handler); + return SetTimerWrapper.wrapTimerIfNonEmptyContext(handler); } } @@ -116,7 +116,7 @@ public static class ExecuteOnContextAdvice { @Advice.AssignReturned.ToArguments(@ToArgument(value = 0)) @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) public static Handler executeBlockingEnter(@Advice.Argument(value = 0) Handler handler) { - return GenericHandlerWrapper.wrapIfActiveSpan(handler); + return GenericHandlerWrapper.wrapIfNonEmptyContext(handler); } } } diff --git a/apm-agent-plugins/apm-vertx/apm-vertx3-plugin/src/main/java/co/elastic/apm/agent/vertx/v3/webclient/WebClientInstrumentation.java b/apm-agent-plugins/apm-vertx/apm-vertx3-plugin/src/main/java/co/elastic/apm/agent/vertx/v3/webclient/WebClientInstrumentation.java index 55e8a4e068..06babd1acf 100644 --- a/apm-agent-plugins/apm-vertx/apm-vertx3-plugin/src/main/java/co/elastic/apm/agent/vertx/v3/webclient/WebClientInstrumentation.java +++ b/apm-agent-plugins/apm-vertx/apm-vertx3-plugin/src/main/java/co/elastic/apm/agent/vertx/v3/webclient/WebClientInstrumentation.java @@ -18,7 +18,6 @@ */ package co.elastic.apm.agent.vertx.v3.webclient; -import co.elastic.apm.agent.tracer.AbstractSpan; import co.elastic.apm.agent.vertx.AbstractVertxWebClientHelper; import co.elastic.apm.agent.vertx.v3.Vertx3Instrumentation; import io.vertx.core.Context; @@ -83,13 +82,7 @@ public static class HttpContextSendRequestAdvice extends AdviceBase { public static void sendRequest(@Advice.This HttpContext httpContext, @Advice.Argument(value = 0) HttpClientRequest request, @Advice.FieldValue(value = "context") Context vertxContext) { - AbstractSpan parent = tracer.getActive(); - - if (null != parent) { - webClientHelper.startSpan(tracer.currentContext(), httpContext, request); - } else { - webClientHelper.followRedirect(httpContext, request); - } + webClientHelper.startSpanOrFollowRedirect(tracer.currentContext(), httpContext, request); } } } diff --git a/apm-agent-plugins/apm-vertx/apm-vertx4-plugin/src/main/java/co/elastic/apm/agent/vertx/v4/EventLoopInstrumentation.java b/apm-agent-plugins/apm-vertx/apm-vertx4-plugin/src/main/java/co/elastic/apm/agent/vertx/v4/EventLoopInstrumentation.java index 5a5e57bcd0..755fd5f17f 100644 --- a/apm-agent-plugins/apm-vertx/apm-vertx4-plugin/src/main/java/co/elastic/apm/agent/vertx/v4/EventLoopInstrumentation.java +++ b/apm-agent-plugins/apm-vertx/apm-vertx4-plugin/src/main/java/co/elastic/apm/agent/vertx/v4/EventLoopInstrumentation.java @@ -62,7 +62,7 @@ public static class SetTimerAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) @Advice.AssignReturned.ToArguments(@ToArgument(value = 1)) public static Handler setTimerEnter(@Advice.Argument(value = 1) Handler handler) { - return SetTimerWrapper.wrapTimerIfActiveSpan(handler); + return SetTimerWrapper.wrapTimerIfNonEmptyContext(handler); } } @@ -124,7 +124,7 @@ public static class ExecuteOnContextAdvice { @Advice.AssignReturned.ToArguments(@ToArgument(value = 1)) @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) public static Handler executeBlockingEnter(@Advice.Argument(value = 1) Handler handler) { - return GenericHandlerWrapper.wrapIfActiveSpan(handler); + return GenericHandlerWrapper.wrapIfNonEmptyContext(handler); } } } diff --git a/apm-agent-plugins/apm-vertx/apm-vertx4-plugin/src/main/java/co/elastic/apm/agent/vertx/v4/webclient/HttpContextInstrumentation.java b/apm-agent-plugins/apm-vertx/apm-vertx4-plugin/src/main/java/co/elastic/apm/agent/vertx/v4/webclient/HttpContextInstrumentation.java index 3be1f35053..47b8fa5325 100644 --- a/apm-agent-plugins/apm-vertx/apm-vertx4-plugin/src/main/java/co/elastic/apm/agent/vertx/v4/webclient/HttpContextInstrumentation.java +++ b/apm-agent-plugins/apm-vertx/apm-vertx4-plugin/src/main/java/co/elastic/apm/agent/vertx/v4/webclient/HttpContextInstrumentation.java @@ -18,7 +18,8 @@ */ package co.elastic.apm.agent.vertx.v4.webclient; -import co.elastic.apm.agent.tracer.AbstractSpan; +import co.elastic.apm.agent.tracer.ElasticContext; +import co.elastic.apm.agent.tracer.Transaction; import co.elastic.apm.agent.vertx.AbstractVertxWebClientHelper; import co.elastic.apm.agent.vertx.AbstractVertxWebHelper; import co.elastic.apm.agent.vertx.v4.Vertx4Instrumentation; @@ -40,7 +41,7 @@ public abstract class HttpContextInstrumentation extends Vertx4Instrumentation { - protected static final String WEB_CLIENT_PARENT_SPAN_KEY = AbstractVertxWebClientHelper.class.getName() + ".parent"; + protected static final String WEB_CLIENT_PARENT_CONTEXT_KEY = HttpContextInstrumentation.class.getName() + ".parent"; @Override public Collection getInstrumentationGroupNames() { @@ -78,13 +79,12 @@ public String getAdviceClassName() { public static class HttpContextPrepareRequestAdvice extends AdviceBase { - @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) public static void prepareRequest(@Advice.This HttpContext httpContext) { - AbstractSpan activeSpan = tracer.getActive(); - if (null != activeSpan) { - activeSpan.incrementReferences(); - httpContext.set(WEB_CLIENT_PARENT_SPAN_KEY, activeSpan); + ElasticContext currentContext = tracer.currentContext(); + if (!currentContext.isEmpty()) { + currentContext.incrementReferences(); + httpContext.set(WEB_CLIENT_PARENT_CONTEXT_KEY, currentContext); } } } @@ -110,24 +110,26 @@ public static class HttpContextSendRequestAdvice extends AdviceBase { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) public static void sendRequest(@Advice.This HttpContext httpContext, @Advice.Argument(value = 0) HttpClientRequest request, @Advice.FieldValue(value = "context") Context vertxContext) { - Object parentSpan = httpContext.get(WEB_CLIENT_PARENT_SPAN_KEY); + ElasticContext parent = httpContext.get(WEB_CLIENT_PARENT_CONTEXT_KEY); - if (parentSpan != null) { + if (parent != null) { // Setting to null removes from the context attributes map - httpContext.set(WEB_CLIENT_PARENT_SPAN_KEY, null); - ((AbstractSpan) parentSpan).decrementReferences(); + httpContext.set(WEB_CLIENT_PARENT_CONTEXT_KEY, null); } else { - parentSpan = vertxContext.getLocal(AbstractVertxWebHelper.CONTEXT_TRANSACTION_KEY); + Transaction vertxTransaction = vertxContext.getLocal(AbstractVertxWebHelper.CONTEXT_TRANSACTION_KEY); + if (vertxTransaction != null) { + vertxTransaction.activate(); + parent = tracer.currentContext(); + parent.incrementReferences(); + vertxTransaction.deactivate(); + } } - if (parentSpan != null) { - AbstractSpan parent = (AbstractSpan) parentSpan; - //TODO: do actual context propagation instead of span propagation - parent.activate(); + if (parent != null) { try { - webClientHelper.startSpan(tracer.currentContext(), httpContext, request); + webClientHelper.startSpanOrFollowRedirect(parent, httpContext, request); } finally { - parent.deactivate(); + parent.decrementReferences(); } } } @@ -179,17 +181,17 @@ public static class HttpContextFailAdvice extends AdviceBase { @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) public static void fail(@Advice.This HttpContext httpContext, @Advice.Argument(value = 0) Throwable thrown) { - - AbstractSpan parent = null; - Object parentFromContext = httpContext.get(WEB_CLIENT_PARENT_SPAN_KEY); - if (parentFromContext != null) { - parent = (AbstractSpan) parentFromContext; - - // Setting to null removes from the context attributes map - httpContext.set(WEB_CLIENT_PARENT_SPAN_KEY, null); - parent.decrementReferences(); + ElasticContext parent = httpContext.get(WEB_CLIENT_PARENT_CONTEXT_KEY); + if (parent != null) { + httpContext.set(WEB_CLIENT_PARENT_CONTEXT_KEY, null); + try { + webClientHelper.failSpan(httpContext, thrown, parent.getSpan()); + } finally { + parent.decrementReferences(); + } + } else { + webClientHelper.failSpan(httpContext, thrown, null); } - webClientHelper.failSpan(httpContext, thrown, parent); } } }