Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented in-process baggage propagation #3248

Merged
merged 9 commits into from Jul 26, 2023
Merged
2 changes: 1 addition & 1 deletion CHANGELOG.asciidoc
Expand Up @@ -33,7 +33,7 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:

[float]
===== Features
* Added W3C baggage propagation - {pull}3236[#3236]
* Added W3C baggage propagation - {pull}3236[#3236], {pull}3248[#3248]
* Added support for baggage in OpenTelemetry bridge - {pull}3249[#3249]

[[release-notes-1.x]]
Expand Down
Expand Up @@ -19,10 +19,11 @@
package co.elastic.apm.agent.concurrent;

import co.elastic.apm.agent.common.ThreadUtils;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.sdk.DynamicTransformer;
import co.elastic.apm.agent.sdk.ElasticApmInstrumentation;
import co.elastic.apm.agent.sdk.state.GlobalState;
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.ElasticContext;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.Tracer;
import co.elastic.apm.agent.tracer.reference.ReferenceCountedMap;
Expand All @@ -42,7 +43,7 @@
@GlobalState
public class JavaConcurrent {

private static final ReferenceCountedMap<Object, AbstractSpan<?>> contextMap = GlobalTracer.get().newReferenceCountedMap();
private static final ReferenceCountedMap<Object, ElasticContext<?>> contextMap = GlobalTracer.get().newReferenceCountedMap();

private static final List<Class<? extends ElasticApmInstrumentation>> RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION = Collections.
<Class<? extends ElasticApmInstrumentation>>singletonList(RunnableCallableForkJoinTaskInstrumentation.class);
Expand Down Expand Up @@ -75,24 +76,25 @@ private static boolean shouldAvoidContextPropagation(@Nullable Object executable
* Retrieves the context mapped to the provided task and activates it on the current thread.
* It is the responsibility of the caller to deactivate the returned context at the right time.
* If the mapped context is already the active span of this thread, this method returns {@code null}.
* @param o a task for which running there may be a context to activate
*
* @param o a task for which running there may be a context to activate
* @param tracer the tracer
* @return the context mapped to the provided task or {@code null} if such does not exist or if the mapped context
* is already the active one on the current thread.
*/
@Nullable
public static AbstractSpan<?> restoreContext(Object o, Tracer tracer) {
public static ElasticContext<?> restoreContext(Object o, Tracer tracer) {
// When an Executor executes directly on the current thread we need to enable this thread for context propagation again
needsContext.set(Boolean.TRUE);

// we cannot remove yet, as this decrements the reference count, which may cause already ended spans to be recycled ahead of time
AbstractSpan<?> context = contextMap.get(o);
ElasticContext<?> context = contextMap.get(o);
if (context == null) {
return null;
}

try {
if (tracer.getActive() != context) {
if (tracer.currentContext() != context) {
return context.activate();
} else {
return null;
Expand All @@ -111,8 +113,8 @@ public static Runnable withContext(@Nullable Runnable runnable, Tracer tracer) {
return runnable;
}
needsContext.set(Boolean.FALSE);
AbstractSpan<?> active = tracer.getActive();
if (active == null) {
ElasticContext<?> active = tracer.currentContext();
if (active.isEmpty()) {
return runnable;
}
if (isLambda(runnable)) {
Expand All @@ -122,11 +124,13 @@ public static Runnable withContext(@Nullable Runnable runnable, Tracer tracer) {
return runnable;
}

private static void captureContext(Object task, AbstractSpan<?> active) {
private static void captureContext(Object task, ElasticContext<?> active) {
DynamicTransformer.ensureInstrumented(task.getClass(), RUNNABLE_CALLABLE_FJTASK_INSTRUMENTATION);
contextMap.put(task, active);
// Do no discard branches leading to async operations so not to break span references
active.setNonDiscardable();
if (active.getSpan() != null) {
active.getSpan().setNonDiscardable();
}
}

/**
Expand All @@ -138,8 +142,8 @@ public static <T> Callable<T> withContext(@Nullable Callable<T> callable, Tracer
return callable;
}
needsContext.set(Boolean.FALSE);
AbstractSpan<?> active = tracer.getActive();
if (active == null) {
ElasticContext<?> active = tracer.currentContext();
if (active.isEmpty()) {
return callable;
}
if (isLambda(callable)) {
Expand All @@ -155,8 +159,8 @@ public static <T> ForkJoinTask<T> withContext(@Nullable ForkJoinTask<T> task, Tr
return task;
}
needsContext.set(Boolean.FALSE);
AbstractSpan<?> active = tracer.getActive();
if (active == null) {
ElasticContext<?> active = tracer.currentContext();
if (active.isEmpty()) {
return task;
}
captureContext(task, active);
Expand Down
Expand Up @@ -23,6 +23,7 @@
import co.elastic.apm.agent.tracer.AbstractSpan;
import co.elastic.apm.agent.tracer.GlobalTracer;
import co.elastic.apm.agent.tracer.Tracer;
import co.elastic.apm.agent.tracer.ElasticContext;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
Expand Down Expand Up @@ -80,8 +81,8 @@ public static Object onEnter(@Advice.This Object thiz) {
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false)
public static void onExit(@Advice.Thrown Throwable thrown,
@Nullable @Advice.Enter Object context) {
if (context instanceof AbstractSpan) {
((AbstractSpan<?>) context).deactivate();
if (context != null) {
((ElasticContext<?>) context).deactivate();
}
}
}
Expand Down
Expand Up @@ -19,28 +19,30 @@
package co.elastic.apm.agent.concurrent;

import co.elastic.apm.agent.AbstractInstrumentationTest;
import co.elastic.apm.agent.impl.baggage.BaggageContext;
import co.elastic.apm.agent.impl.transaction.ElasticContext;
import co.elastic.apm.agent.impl.transaction.Span;
import co.elastic.apm.agent.impl.transaction.Transaction;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.springframework.core.task.SimpleAsyncTaskExecutor;
import org.springframework.core.task.SyncTaskExecutor;

import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static co.elastic.apm.agent.testutils.assertions.Assertions.assertThat;

@RunWith(Parameterized.class)
public class ExecutorInstrumentationTest extends AbstractInstrumentationTest {

private final Executor executor;
private Transaction transaction;

public ExecutorInstrumentationTest(Supplier<ExecutorService> supplier) {
executor = supplier.get();
Expand All @@ -51,10 +53,6 @@ public static Iterable<Supplier<Executor>> data() {
return Arrays.asList(SimpleAsyncTaskExecutor::new, SyncTaskExecutor::new);
}

@Before
public void setUp() {
transaction = tracer.startRootTransaction(null).withName("Transaction").activate();
}

@After
public void tearDown() {
Expand All @@ -63,14 +61,70 @@ public void tearDown() {

@Test
public void testExecutorExecute_Transaction() {
executor.execute(this::createAsyncSpan);
assertOnlySpanIsChildOfOnlyTransaction();
Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate();
executor.execute(() -> createAsyncSpan(transaction));
try {
// wait for the async operation to end
assertThat(reporter.getFirstSpan(1000)).isNotNull();
} finally {
transaction.deactivate().end();
}
assertThat(reporter.getTransactions()).hasSize(1);
assertThat(reporter.getSpans()).hasSize(1);
assertThat(reporter.getFirstSpan().isChildOf(reporter.getFirstTransaction())).isTrue();
}

@Test
public void testBaggagePropagationWithTransaction() throws InterruptedException {
Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate();
BaggageContext transactionWithBaggage = tracer.currentContext().withUpdatedBaggage()
.put("foo", "bar")
.buildContext()
.activate();

AtomicReference<ElasticContext<?>> propagatedContext = new AtomicReference<>();
CountDownLatch doneLatch = new CountDownLatch(1);
executor.execute(() -> {
propagatedContext.set(tracer.currentContext());
doneLatch.countDown();
});
transactionWithBaggage.deactivate();
transaction.deactivate().end();
doneLatch.await();

assertThat(propagatedContext.get().getBaggage())
.hasSize(1)
.containsEntry("foo", "bar");
assertThat(propagatedContext.get().getTransaction()).isSameAs(transaction);
}

@Test
public void testBaggagePropagationWithoutTransaction() throws InterruptedException {
BaggageContext transactionWithBaggage = tracer.currentContext().withUpdatedBaggage()
.put("foo", "bar")
.buildContext()
.activate();

AtomicReference<ElasticContext<?>> propagatedContext = new AtomicReference<>();
CountDownLatch doneLatch = new CountDownLatch(1);
executor.execute(() -> {
propagatedContext.set(tracer.currentContext());
doneLatch.countDown();
});
transactionWithBaggage.deactivate();
doneLatch.await();

assertThat(propagatedContext.get().getBaggage())
.hasSize(1)
.containsEntry("foo", "bar");
assertThat(propagatedContext.get().getTransaction()).isNull();
}

@Test
public void testExecutorExecute_Span() {
Transaction transaction = tracer.startRootTransaction(null).withName("Transaction").activate();
Span nonAsyncSpan = transaction.createSpan().withName("NonAsync").activate();
executor.execute(this::createAsyncSpan);
executor.execute(() -> createAsyncSpan(transaction));
try {
// wait for the async operation to end
assertThat(reporter.getFirstSpan(1000)).isNotNull();
Expand All @@ -84,20 +138,9 @@ public void testExecutorExecute_Span() {
assertThat(reporter.getFirstSpan().isChildOf(nonAsyncSpan)).isTrue();
}

private void assertOnlySpanIsChildOfOnlyTransaction() {
try {
// wait for the async operation to end
assertThat(reporter.getFirstSpan(1000)).isNotNull();
} finally {
transaction.deactivate().end();
}
assertThat(reporter.getTransactions()).hasSize(1);
assertThat(reporter.getSpans()).hasSize(1);
assertThat(reporter.getFirstSpan().isChildOf(reporter.getFirstTransaction())).isTrue();
}

private void createAsyncSpan() {
assertThat(tracer.currentTransaction()).isEqualTo(transaction);
private void createAsyncSpan(Transaction expectedCurrent) {
assertThat(tracer.currentTransaction()).isEqualTo(expectedCurrent);
tracer.getActive().createSpan().withName("Async").end();
}
}
Expand Up @@ -19,13 +19,12 @@
package co.elastic.apm.agent.concurrent;

import co.elastic.apm.agent.AbstractInstrumentationTest;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.tracer.ElasticContext;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
Expand All @@ -39,34 +38,39 @@ public class ForkJoinPoolTest extends AbstractInstrumentationTest {

private ForkJoinPool pool;
private Transaction transaction;
private ElasticContext<?> txWithBaggage;

@BeforeEach
void setUp() {
pool = new ForkJoinPool();
transaction = tracer.startRootTransaction(null).withName("transaction").activate();
txWithBaggage = tracer.currentContext().withUpdatedBaggage().put("foo", "bar").buildContext().activate();
}

@AfterEach
void tearDown() {
assertThat(tracer.getActive()).isEqualTo(transaction);
assertThat(tracer.currentContext()).isSameAs(txWithBaggage);
txWithBaggage.deactivate();
transaction.deactivate().end();

}

@Test
void testExecute() throws Exception {
final ForkJoinTask<? extends AbstractSpan<?>> task = newTask(() -> tracer.getActive());
final ForkJoinTask<ElasticContext<?>> task = newTask(() -> tracer.currentContext());
pool.execute(task);
assertThat(task.get()).isEqualTo(transaction);
assertThat(task.get()).isSameAs(txWithBaggage);
}

@Test
void testSubmit() throws Exception {
assertThat(pool.submit(newTask(() -> tracer.getActive())).get()).isEqualTo(transaction);
assertThat(pool.submit(newTask(() -> tracer.currentContext())).get()).isSameAs(txWithBaggage);
}

@Test
void testInvoke() throws Exception {
assertThat(pool.invoke(newTask(() -> tracer.getActive()))).isEqualTo(transaction);
assertThat(pool.invoke(newTask(() -> tracer.currentContext()))).isSameAs(txWithBaggage);
}

@Test
Expand All @@ -76,19 +80,19 @@ void testCompletableFuture() throws Exception {
// Preferences | Build, Execution, Deployment | Debugger | Async Stack Traces
// and uncheck the Instrumenting Agent checkbox
assertThat(CompletableFuture
.supplyAsync(() -> Objects.requireNonNull(tracer.getActive()))
.thenApplyAsync(active -> tracer.getActive())
.supplyAsync(() -> tracer.currentContext())
.thenApplyAsync(active -> tracer.currentContext())
.get())
.isEqualTo(transaction);
.isSameAs(txWithBaggage);
}

@Test
void testParallelStream() {
assertThat(Stream.of("foo", "bar", "baz")
.parallel()
.<AbstractSpan<?>>map(s -> tracer.getActive())
.<ElasticContext<?>>map(s -> tracer.currentContext())
.distinct())
.containsExactly(transaction);
.containsExactly(txWithBaggage);
}

public static class AdaptedSupplier<V> extends ForkJoinTask<V> implements Runnable {
Expand Down