From 8c479a54a14dd32fefb4127b3ae7cd291ffcdd71 Mon Sep 17 00:00:00 2001 From: Milan van der Meer <5628925+milanvdm@users.noreply.github.com> Date: Tue, 29 Jun 2021 15:13:07 +0200 Subject: [PATCH 1/8] WIP --- .../src/test/resources/log4j2-test.xml | 2 +- .../apm-scala-concurrent-plugin/pom.xml | 9 +- .../FutureInstrumentation.java | 94 ++++- ...ic.apm.agent.sdk.ElasticApmInstrumentation | 6 +- .../FutureInstrumentationSpec.scala | 371 ++++++++++-------- 5 files changed, 313 insertions(+), 169 deletions(-) diff --git a/apm-agent-core/src/test/resources/log4j2-test.xml b/apm-agent-core/src/test/resources/log4j2-test.xml index db122fc04d..e7c91d2e6c 100644 --- a/apm-agent-core/src/test/resources/log4j2-test.xml +++ b/apm-agent-core/src/test/resources/log4j2-test.xml @@ -14,6 +14,6 @@ - + diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml index e3d39617f1..966cf39810 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml @@ -16,6 +16,11 @@ + + org.scala-lang + scala-library + 2.13.4 + ${project.groupId} apm-java-concurrent-plugin @@ -36,9 +41,9 @@ net.alchim31.maven scala-maven-plugin - 4.3.1 + 4.4.0 - 2.13.2 + 2.13.4 diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java index 175f9355e5..0ee73ef9e9 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java @@ -19,21 +19,25 @@ package co.elastic.apm.agent.scalaconcurrent; import co.elastic.apm.agent.bci.TracerAwareInstrumentation; +import co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.Future; +import scala.concurrent.Future$; +import scala.util.Try; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; -import static net.bytebuddy.matcher.ElementMatchers.isConstructor; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.*; public abstract class FutureInstrumentation extends TracerAwareInstrumentation { @@ -41,13 +45,47 @@ public abstract class FutureInstrumentation extends TracerAwareInstrumentation { public static final WeakConcurrentMap> promisesToContext = new WeakConcurrentMap.WithInlinedExpunction<>(); + private static final Logger logger = LoggerFactory.getLogger(WeakKeySoftValueLoadingCache.class); + @Nonnull @Override public Collection getInstrumentationGroupNames() { return Arrays.asList("scala-future", "experimental"); } - public static class ConstructorInstrumentation extends FutureInstrumentation { + public static class FutureObjectInstrumentation extends FutureInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return named("scala.concurrent.Future$"); + } + + @Override + public ElementMatcher getMethodMatcher() { + return isTypeInitializer(); + } + + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + public static void onExit() { + logger.warn("=============="); + logger.warn("Match on Future.unit"); + logger.warn("=============="); + + final AbstractSpan context = tracer.getActive(); + if (context != null) { + logger.warn("=============="); + logger.warn("Match on Future.unit " + context); + logger.warn("=============="); + } + // Remove context on the initial Future.unit initialization such that following + // chaining methods are not linked to this constant 'origin' Future. + final Try unitFuture = Future$.MODULE$.unit().value().get(); + logger.warn(promisesToContext.toString()); +// promisesToContext.put(unitFuture, null); + } + } + + public static class TransformationConstructorInstrumentation extends FutureInstrumentation { @Override public ElementMatcher getTypeMatcher() { @@ -63,6 +101,10 @@ public ElementMatcher getMethodMatcher() { public static void onExit(@Advice.This Object thiz) { final AbstractSpan context = tracer.getActive(); if (context != null) { + logger.warn("=============="); + logger.warn("Constructor " + context); + logger.warn("=============="); + logger.warn(promisesToContext.toString()); promisesToContext.put(thiz, context); // this span might be ended before the Promise$Transformation#run method starts // we have to avoid that this span gets recycled, even in the above mentioned case @@ -72,7 +114,7 @@ public static void onExit(@Advice.This Object thiz) { } - public static class RunInstrumentation extends FutureInstrumentation { + public static class TransformationRunInstrumentation extends FutureInstrumentation { @Override public ElementMatcher getTypeMatcher() { @@ -87,8 +129,50 @@ public ElementMatcher getMethodMatcher() { @Nullable @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) public static Object onEnter(@Advice.This Object thiz) { + logger.warn(promisesToContext.toString()); + AbstractSpan context = promisesToContext.remove(thiz); + if (context != null) { + logger.warn("=============="); + logger.warn("Run " + context); + logger.warn("=============="); + context.activate(); + // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run + // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice + context.decrementReferences(); + } + return context; + } + + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { + if (abstractSpanObj instanceof AbstractSpan) { + AbstractSpan context = (AbstractSpan) abstractSpanObj; + context.deactivate(); + } + } + } + + public static class TransformationSubmitWithValueInstrumentation extends FutureInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return named("scala.concurrent.impl.Promise$Transformation"); + } + + @Override + public ElementMatcher getMethodMatcher() { + return named("submitWithValue").and(returns(void.class)); + } + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + public static Object onEnter(@Advice.This Object thiz) { + logger.warn(promisesToContext.toString()); AbstractSpan context = promisesToContext.remove(thiz); if (context != null) { + logger.warn("=============="); + logger.warn("SubmitWithValue " + context); + logger.warn("=============="); context.activate(); // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation index fc420a060d..c66929343c 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation @@ -1,2 +1,4 @@ -co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$ConstructorInstrumentation -co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$RunInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$FutureObjectInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationConstructorInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationRunInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationSubmitWithValueInstrumentation diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala index 847f3f11ed..8a18b61221 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala @@ -1,7 +1,6 @@ package co.elastic.apm.agent.scalaconcurrent -import java.util.concurrent.Executors - +import java.util.concurrent.{Executors, ForkJoinPool} import co.elastic.apm.agent.MockReporter import co.elastic.apm.agent.bci.ElasticApmAgent import co.elastic.apm.agent.configuration.{CoreConfiguration, SpyConfiguration} @@ -20,7 +19,6 @@ class FutureInstrumentationSpec extends FunSuite { private var reporter: MockReporter = _ private var tracer: ElasticApmTracer = _ private var coreConfiguration: CoreConfiguration = _ - private var transaction: Transaction = _ override def beforeEach(context: BeforeEach): Unit = { reporter = new MockReporter @@ -29,166 +27,221 @@ class FutureInstrumentationSpec extends FunSuite { tracer = new ElasticApmTracerBuilder().configurationRegistry(config).reporter(reporter).build ElasticApmAgent.initInstrumentation(tracer, ByteBuddyAgent.install) tracer.start(false) - transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() } override def afterEach(context: AfterEach): Unit = ElasticApmAgent.reset() - test("Scala Future should propagate the tracing-context correctly across different threads") { - implicit val executionContext: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - - val future = Future("Test") - .map(_.length) - .flatMap(l => Future(l * 2)) - .map(_.toString) - .flatMap(s => Future(s"$s-$s")) - .map(_ => tracer.currentTransaction().addCustomContext("future", true)) - - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - true - ) - - - } - - test("Worker thread should correctly set context on the current transaction") { - implicit val executionContext: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - - new TestFutureTraceMethods().invokeAsync(tracer) - transaction.deactivate().end() - assertEquals(reporter.getTransactions.size(), 1) - assertEquals(reporter.getSpans.size(), 0) - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - true - ) - } - - test("Multiple async operations should be able to set context on the current transaction") { - - implicit val multiPoolEc: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) - - val future = Future - .traverse(1 to 100) { _ => - Future.sequence(List( - Future { - Thread.sleep(25) - tracer.currentTransaction().addCustomContext("future1", true) - }, - Future { - Thread.sleep(50) - tracer.currentTransaction().addCustomContext("future2", true) - }, - Future { - Thread.sleep(10) - tracer.currentTransaction().addCustomContext("future3", true) - } - )) - } - - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], - true - ) - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], - true - ) - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], - true - ) - - } - - test("Handle a combination of Promises and Futures correctly") { - - implicit val multiPoolEc: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) - - val promise = Promise[Int]() - - Future { Thread.sleep(100) } - .map(_ => 42) - .onComplete { - case Success(value) => promise.success(value) - case Failure(exception) => promise.failure(exception) - } - - val future = promise - .future - .map(_ => tracer.currentTransaction().addCustomContext("future", true)) - - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - true - ) - - } - - test("Handle a Future.sequence correctly") { - - implicit val multiPoolEc: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) - - val future = Future - .sequence(List( - Future(Thread.sleep(25)) - )) - .map(_ => tracer.currentTransaction().addCustomContext("future", true)) - - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - true - ) - - } - - test("Handle a combination of Promises and complex Futures correctly") { - - implicit val multiPoolEc: ExecutionContextExecutor = - ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) - - val promise = Promise[Int]() - - Future - .sequence(List( - Future(Thread.sleep(25)) - )) - .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) - .map(_ => 42) - .onComplete { - case Success(value) => promise.success(value) - case Failure(exception) => promise.failure(exception) - } - - val future = promise - .future - .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) - - Await.ready(future, 10.seconds) - transaction.deactivate().end() - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], - true - ) - assertEquals( - reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], - true - ) - +// test("Scala Future should propagate the tracing-context correctly across different threads") { +// implicit val executionContext: ExecutionContextExecutor = +// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) +// +// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() +// +// val future = Future("Test") +// .map(_.length) +// .flatMap(l => Future(l * 2)) +// .map(_.toString) +// .flatMap(s => Future(s"$s-$s")) +// .map(_ => tracer.currentTransaction().addCustomContext("future", true)) +// +// Await.ready(future, 10.seconds) +// transaction.deactivate().end() +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], +// true +// ) +// +// +// } +// +// test("Worker thread should correctly set context on the current transaction") { +// implicit val executionContext: ExecutionContextExecutor = +// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) +// +// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() +// +// new TestFutureTraceMethods().invokeAsync(tracer) +// transaction.deactivate().end() +// assertEquals(reporter.getTransactions.size(), 1) +// assertEquals(reporter.getSpans.size(), 0) +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], +// true +// ) +// } +// +// test("Multiple async operations should be able to set context on the current transaction") { +// implicit val multiPoolEc: ExecutionContextExecutor = +// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) +// +// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() +// +// val future = Future +// .traverse(1 to 100) { _ => +// Future.sequence(List( +// Future { +// Thread.sleep(25) +// tracer.currentTransaction().addCustomContext("future1", true) +// }, +// Future { +// Thread.sleep(50) +// tracer.currentTransaction().addCustomContext("future2", true) +// }, +// Future { +// Thread.sleep(10) +// tracer.currentTransaction().addCustomContext("future3", true) +// } +// )) +// } +// +// Await.ready(future, 10.seconds) +// transaction.deactivate().end() +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], +// true +// ) +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], +// true +// ) +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], +// true +// ) +// +// } +// +// test("Handle a combination of Promises and Futures correctly") { +// implicit val multiPoolEc: ExecutionContextExecutor = +// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) +// +// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() +// +// val promise = Promise[Int]() +// +// Future { Thread.sleep(100) } +// .map(_ => 42) +// .onComplete { +// case Success(value) => promise.success(value) +// case Failure(exception) => promise.failure(exception) +// } +// +// val future = promise +// .future +// .map(_ => tracer.currentTransaction().addCustomContext("future", true)) +// +// Await.ready(future, 10.seconds) +// transaction.deactivate().end() +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], +// true +// ) +// +// } +// +// test("Handle a Future.sequence correctly") { +// implicit val multiPoolEc: ExecutionContextExecutor = +// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) +// +// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() +// +// val future = Future +// .sequence(List( +// Future(Thread.sleep(25)) +// )) +// .map(_ => tracer.currentTransaction().addCustomContext("future", true)) +// +// Await.ready(future, 10.seconds) +// transaction.deactivate().end() +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], +// true +// ) +// +// } +// +// test("Handle a combination of Promises and complex Futures correctly") { +// implicit val multiPoolEc: ExecutionContextExecutor = +// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) +// +// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() +// +// val promise = Promise[Int]() +// +// Future +// .sequence(List( +// Future(Thread.sleep(25)) +// )) +// .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) +// .map(_ => 42) +// .onComplete { +// case Success(value) => promise.success(value) +// case Failure(exception) => promise.failure(exception) +// } +// +// val future = promise +// .future +// .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) +// +// Await.ready(future, 10.seconds) +// transaction.deactivate().end() +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], +// true +// ) +// assertEquals( +// reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], +// true +// ) +// +// } + + test("Scala Future should not propagate the tracing-context to unrelated threads") { + implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(10)) + + val fs = (1 to 1).map(transactionNumber => Future { + Thread.sleep(10) + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate() + + println(s"thread=${Thread.currentThread().getId} transaction=$transactionNumber, trace=${tracer.currentTransaction()} starting transaction") + + val futures = (1 to 1) + .map(futureNumber => Future { + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + + println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, trace=${tracer.currentTransaction()} before futureNumber=$futureNumber, $currentTransactionNumber") + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + (transaction, futureNumber) + } + .map { case (transaction: Transaction, futureNumber: Int) => + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, trace=${tracer.currentTransaction()} map1 futureNumber=$futureNumber, $currentTransactionNumber") + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + transaction + }) + + val future = Future.sequence(futures) + + Await.result(future, 30.seconds) + + transaction.deactivate().end() + + }) + + val res = Future.sequence(fs) + + Await.result(res, 60.seconds) } } From 2b19a4df7272ff16f6825e71c7b3cb93ecf13d94 Mon Sep 17 00:00:00 2001 From: Milan van der Meer <5628925+milanvdm@users.noreply.github.com> Date: Tue, 6 Jul 2021 20:25:15 +0200 Subject: [PATCH 2/8] Update versions --- apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml index 966cf39810..85eff298c3 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml @@ -19,7 +19,7 @@ org.scala-lang scala-library - 2.13.4 + 2.13.6 ${project.groupId} @@ -30,7 +30,7 @@ org.scalameta munit_2.13 - 0.7.9 + 0.7.26 test @@ -41,9 +41,9 @@ net.alchim31.maven scala-maven-plugin - 4.4.0 + 4.5.3 - 2.13.4 + 2.13.6 From af6e231fc771888e48a3374d67ed8c8a27625006 Mon Sep 17 00:00:00 2001 From: Milan van der Meer <5628925+milanvdm@users.noreply.github.com> Date: Tue, 6 Jul 2021 20:58:37 +0200 Subject: [PATCH 3/8] WIP --- .../apm-scala-concurrent-plugin/pom.xml | 1 - .../FutureInstrumentation.java | 269 ++++++++++-------- ...ic.apm.agent.sdk.ElasticApmInstrumentation | 9 +- .../FutureInstrumentationSpec.scala | 6 +- 4 files changed, 157 insertions(+), 128 deletions(-) diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml index 85eff298c3..c79b7b2cd7 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml @@ -25,7 +25,6 @@ ${project.groupId} apm-java-concurrent-plugin ${project.version} - test org.scalameta diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java index 0ee73ef9e9..acd6688847 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java @@ -21,6 +21,7 @@ import co.elastic.apm.agent.bci.TracerAwareInstrumentation; import co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache; import co.elastic.apm.agent.impl.transaction.AbstractSpan; +import co.elastic.apm.agent.sdk.advice.AssignTo; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; @@ -36,6 +37,8 @@ import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; +import java.util.concurrent.Executor; +import co.elastic.apm.agent.concurrent.JavaConcurrent; import static net.bytebuddy.matcher.ElementMatchers.*; @@ -53,140 +56,166 @@ public Collection getInstrumentationGroupNames() { return Arrays.asList("scala-future", "experimental"); } - public static class FutureObjectInstrumentation extends FutureInstrumentation { + public static class BatchedExecutionContextInstrumentation extends FutureInstrumentation { @Override public ElementMatcher getTypeMatcher() { - return named("scala.concurrent.Future$"); + return hasSuperType(named("scala.concurrent.BatchingExecutor")); } @Override public ElementMatcher getMethodMatcher() { - return isTypeInitializer(); - } - - @Advice.OnMethodExit(suppress = Throwable.class, inline = false) - public static void onExit() { - logger.warn("=============="); - logger.warn("Match on Future.unit"); - logger.warn("=============="); - - final AbstractSpan context = tracer.getActive(); - if (context != null) { - logger.warn("=============="); - logger.warn("Match on Future.unit " + context); - logger.warn("=============="); - } - // Remove context on the initial Future.unit initialization such that following - // chaining methods are not linked to this constant 'origin' Future. - final Try unitFuture = Future$.MODULE$.unit().value().get(); - logger.warn(promisesToContext.toString()); -// promisesToContext.put(unitFuture, null); - } - } - - public static class TransformationConstructorInstrumentation extends FutureInstrumentation { - - @Override - public ElementMatcher getTypeMatcher() { - return named("scala.concurrent.impl.Promise$Transformation"); - } - - @Override - public ElementMatcher getMethodMatcher() { - return isConstructor(); - } - - @Advice.OnMethodExit(suppress = Throwable.class, inline = false) - public static void onExit(@Advice.This Object thiz) { - final AbstractSpan context = tracer.getActive(); - if (context != null) { - logger.warn("=============="); - logger.warn("Constructor " + context); - logger.warn("=============="); - logger.warn(promisesToContext.toString()); - promisesToContext.put(thiz, context); - // this span might be ended before the Promise$Transformation#run method starts - // we have to avoid that this span gets recycled, even in the above mentioned case - context.incrementReferences(); - } - } - - } - - public static class TransformationRunInstrumentation extends FutureInstrumentation { - - @Override - public ElementMatcher getTypeMatcher() { - return named("scala.concurrent.impl.Promise$Transformation"); - } - - @Override - public ElementMatcher getMethodMatcher() { - return named("run").and(returns(void.class)); + return named("submitForExecution").and(returns(void.class)).and(takesArguments(Runnable.class)); } @Nullable + @AssignTo.Argument(0) @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) - public static Object onEnter(@Advice.This Object thiz) { - logger.warn(promisesToContext.toString()); - AbstractSpan context = promisesToContext.remove(thiz); - if (context != null) { - logger.warn("=============="); - logger.warn("Run " + context); - logger.warn("=============="); - context.activate(); - // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run - // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice - context.decrementReferences(); - } - return context; + public static Runnable onExecute(@Advice.Argument(0) @Nullable Runnable runnable) { + return JavaConcurrent.withContext(runnable, tracer); } - @Advice.OnMethodExit(suppress = Throwable.class, inline = false) - public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { - if (abstractSpanObj instanceof AbstractSpan) { - AbstractSpan context = (AbstractSpan) abstractSpanObj; - context.deactivate(); - } + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false) + public static void onExit(@Nullable @Advice.Thrown Throwable thrown, + @Advice.Argument(value = 0) @Nullable Runnable runnable) { + JavaConcurrent.doFinally(thrown, runnable); } } - public static class TransformationSubmitWithValueInstrumentation extends FutureInstrumentation { - - @Override - public ElementMatcher getTypeMatcher() { - return named("scala.concurrent.impl.Promise$Transformation"); - } - - @Override - public ElementMatcher getMethodMatcher() { - return named("submitWithValue").and(returns(void.class)); - } - - @Nullable - @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) - public static Object onEnter(@Advice.This Object thiz) { - logger.warn(promisesToContext.toString()); - AbstractSpan context = promisesToContext.remove(thiz); - if (context != null) { - logger.warn("=============="); - logger.warn("SubmitWithValue " + context); - logger.warn("=============="); - context.activate(); - // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run - // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice - context.decrementReferences(); - } - return context; - } - - @Advice.OnMethodExit(suppress = Throwable.class, inline = false) - public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { - if (abstractSpanObj instanceof AbstractSpan) { - AbstractSpan context = (AbstractSpan) abstractSpanObj; - context.deactivate(); - } - } - } +// public static class FutureObjectInstrumentation extends FutureInstrumentation { +// +// @Override +// public ElementMatcher getTypeMatcher() { +// return named("scala.concurrent.Future$"); +// } +// +// @Override +// public ElementMatcher getMethodMatcher() { +// return isTypeInitializer(); +// } +// +// @Advice.OnMethodExit(suppress = Throwable.class, inline = false) +// public static void onExit() { +// logger.warn("=============="); +// logger.warn("Match on Future.unit"); +// logger.warn("=============="); +// +// final AbstractSpan context = tracer.getActive(); +// if (context != null) { +// logger.warn("=============="); +// logger.warn("Match on Future.unit " + context); +// logger.warn("=============="); +// } +// // Remove context on the initial Future.unit initialization such that following +// // chaining methods are not linked to this constant 'origin' Future. +// final Try unitFuture = Future$.MODULE$.unit().value().get(); +// logger.warn(promisesToContext.toString()); +//// promisesToContext.put(unitFuture, null); +// } +// } +// +// public static class TransformationConstructorInstrumentation extends FutureInstrumentation { +// +// @Override +// public ElementMatcher getTypeMatcher() { +// return named("scala.concurrent.impl.Promise$Transformation"); +// } +// +// @Override +// public ElementMatcher getMethodMatcher() { +// return isConstructor(); +// } +// +// @Advice.OnMethodExit(suppress = Throwable.class, inline = false) +// public static void onExit(@Advice.This Object thiz) { +// final AbstractSpan context = tracer.getActive(); +// if (context != null) { +// logger.warn("=============="); +// logger.warn("Constructor " + context); +// logger.warn("=============="); +// logger.warn(promisesToContext.toString()); +// promisesToContext.put(thiz, context); +// // this span might be ended before the Promise$Transformation#run method starts +// // we have to avoid that this span gets recycled, even in the above mentioned case +// context.incrementReferences(); +// } +// } +// +// } +// +// public static class TransformationRunInstrumentation extends FutureInstrumentation { +// +// @Override +// public ElementMatcher getTypeMatcher() { +// return named("scala.concurrent.impl.Promise$Transformation"); +// } +// +// @Override +// public ElementMatcher getMethodMatcher() { +// return named("run").and(returns(void.class)); +// } +// +// @Nullable +// @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) +// public static Object onEnter(@Advice.This Object thiz) { +// logger.warn(promisesToContext.toString()); +// AbstractSpan context = promisesToContext.remove(thiz); +// if (context != null) { +// logger.warn("=============="); +// logger.warn("Run " + context); +// logger.warn("=============="); +// context.activate(); +// // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run +// // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice +// context.decrementReferences(); +// } +// return context; +// } +// +// @Advice.OnMethodExit(suppress = Throwable.class, inline = false) +// public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { +// if (abstractSpanObj instanceof AbstractSpan) { +// AbstractSpan context = (AbstractSpan) abstractSpanObj; +// context.deactivate(); +// } +// } +// } +// +// public static class TransformationSubmitWithValueInstrumentation extends FutureInstrumentation { +// +// @Override +// public ElementMatcher getTypeMatcher() { +// return named("scala.concurrent.impl.Promise$Transformation"); +// } +// +// @Override +// public ElementMatcher getMethodMatcher() { +// return named("submitWithValue").and(returns(void.class)); +// } +// +// @Nullable +// @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) +// public static Object onEnter(@Advice.This Object thiz) { +// logger.warn(promisesToContext.toString()); +// AbstractSpan context = promisesToContext.remove(thiz); +// if (context != null) { +// logger.warn("=============="); +// logger.warn("SubmitWithValue " + context); +// logger.warn("=============="); +// context.activate(); +// // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run +// // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice +// context.decrementReferences(); +// } +// return context; +// } +// +// @Advice.OnMethodExit(suppress = Throwable.class, inline = false) +// public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { +// if (abstractSpanObj instanceof AbstractSpan) { +// AbstractSpan context = (AbstractSpan) abstractSpanObj; +// context.deactivate(); +// } +// } +// } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation index c66929343c..62e615d092 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation @@ -1,4 +1,5 @@ -co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$FutureObjectInstrumentation -co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationConstructorInstrumentation -co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationRunInstrumentation -co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationSubmitWithValueInstrumentation +#co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$FutureObjectInstrumentation +#co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationConstructorInstrumentation +#co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationRunInstrumentation +#co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationSubmitWithValueInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$BatchedExecutionContextInstrumentation diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala index 8a18b61221..a279245819 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala @@ -197,16 +197,16 @@ class FutureInstrumentationSpec extends FunSuite { // } test("Scala Future should not propagate the tracing-context to unrelated threads") { - implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(10)) + implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(5)) - val fs = (1 to 1).map(transactionNumber => Future { + val fs = (1 to 10).map(transactionNumber => Future { Thread.sleep(10) val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate() println(s"thread=${Thread.currentThread().getId} transaction=$transactionNumber, trace=${tracer.currentTransaction()} starting transaction") - val futures = (1 to 1) + val futures = (1 to 10) .map(futureNumber => Future { Thread.sleep(10) From 031ec2a31cbc08944115176380d3eba34caaf4c2 Mon Sep 17 00:00:00 2001 From: Milan van der Meer <5628925+milanvdm@users.noreply.github.com> Date: Sun, 11 Jul 2021 20:37:22 +0200 Subject: [PATCH 4/8] WIP --- .../latest/testapp/generated/HelloGrpc.java | 2 +- .../FutureInstrumentation.java | 210 ++++----- ...ic.apm.agent.sdk.ElasticApmInstrumentation | 2 + .../FutureInstrumentationSpec.scala | 434 +++++++++--------- 4 files changed, 325 insertions(+), 323 deletions(-) diff --git a/apm-agent-plugins/apm-grpc/apm-grpc-test-latest/src/test/java/co/elastic/apm/agent/grpc/latest/testapp/generated/HelloGrpc.java b/apm-agent-plugins/apm-grpc/apm-grpc-test-latest/src/test/java/co/elastic/apm/agent/grpc/latest/testapp/generated/HelloGrpc.java index fe0787ef17..8c3814ec99 100644 --- a/apm-agent-plugins/apm-grpc/apm-grpc-test-latest/src/test/java/co/elastic/apm/agent/grpc/latest/testapp/generated/HelloGrpc.java +++ b/apm-agent-plugins/apm-grpc/apm-grpc-test-latest/src/test/java/co/elastic/apm/agent/grpc/latest/testapp/generated/HelloGrpc.java @@ -23,7 +23,7 @@ /** */ @javax.annotation.Generated( - value = "by gRPC proto compiler (version 1.38.1)", + value = "by gRPC proto compiler (version 1.39.0)", comments = "Source: rpc.proto") public final class HelloGrpc { diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java index acd6688847..9da85b1ce1 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java @@ -48,7 +48,7 @@ public abstract class FutureInstrumentation extends TracerAwareInstrumentation { public static final WeakConcurrentMap> promisesToContext = new WeakConcurrentMap.WithInlinedExpunction<>(); - private static final Logger logger = LoggerFactory.getLogger(WeakKeySoftValueLoadingCache.class); + private static final Logger logger = LoggerFactory.getLogger(FutureInstrumentation.class); @Nonnull @Override @@ -114,108 +114,108 @@ public static void onExit(@Nullable @Advice.Thrown Throwable thrown, // } // } // -// public static class TransformationConstructorInstrumentation extends FutureInstrumentation { -// -// @Override -// public ElementMatcher getTypeMatcher() { -// return named("scala.concurrent.impl.Promise$Transformation"); -// } -// -// @Override -// public ElementMatcher getMethodMatcher() { -// return isConstructor(); -// } -// -// @Advice.OnMethodExit(suppress = Throwable.class, inline = false) -// public static void onExit(@Advice.This Object thiz) { -// final AbstractSpan context = tracer.getActive(); -// if (context != null) { -// logger.warn("=============="); -// logger.warn("Constructor " + context); -// logger.warn("=============="); -// logger.warn(promisesToContext.toString()); -// promisesToContext.put(thiz, context); -// // this span might be ended before the Promise$Transformation#run method starts -// // we have to avoid that this span gets recycled, even in the above mentioned case -// context.incrementReferences(); -// } -// } -// -// } -// -// public static class TransformationRunInstrumentation extends FutureInstrumentation { -// -// @Override -// public ElementMatcher getTypeMatcher() { -// return named("scala.concurrent.impl.Promise$Transformation"); -// } -// -// @Override -// public ElementMatcher getMethodMatcher() { -// return named("run").and(returns(void.class)); -// } -// -// @Nullable -// @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) -// public static Object onEnter(@Advice.This Object thiz) { -// logger.warn(promisesToContext.toString()); -// AbstractSpan context = promisesToContext.remove(thiz); -// if (context != null) { -// logger.warn("=============="); -// logger.warn("Run " + context); -// logger.warn("=============="); -// context.activate(); -// // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run -// // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice -// context.decrementReferences(); -// } -// return context; -// } -// -// @Advice.OnMethodExit(suppress = Throwable.class, inline = false) -// public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { -// if (abstractSpanObj instanceof AbstractSpan) { -// AbstractSpan context = (AbstractSpan) abstractSpanObj; -// context.deactivate(); -// } -// } -// } -// -// public static class TransformationSubmitWithValueInstrumentation extends FutureInstrumentation { -// -// @Override -// public ElementMatcher getTypeMatcher() { -// return named("scala.concurrent.impl.Promise$Transformation"); -// } -// -// @Override -// public ElementMatcher getMethodMatcher() { -// return named("submitWithValue").and(returns(void.class)); -// } -// -// @Nullable -// @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) -// public static Object onEnter(@Advice.This Object thiz) { -// logger.warn(promisesToContext.toString()); -// AbstractSpan context = promisesToContext.remove(thiz); -// if (context != null) { -// logger.warn("=============="); -// logger.warn("SubmitWithValue " + context); -// logger.warn("=============="); -// context.activate(); -// // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run -// // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice -// context.decrementReferences(); -// } -// return context; -// } -// -// @Advice.OnMethodExit(suppress = Throwable.class, inline = false) -// public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { -// if (abstractSpanObj instanceof AbstractSpan) { -// AbstractSpan context = (AbstractSpan) abstractSpanObj; -// context.deactivate(); -// } -// } -// } + public static class TransformationConstructorInstrumentation extends FutureInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return named("scala.concurrent.impl.Promise$Transformation"); + } + + @Override + public ElementMatcher getMethodMatcher() { + return isConstructor(); + } + + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + public static void onExit(@Advice.This Object thiz) { + final AbstractSpan context = tracer.getActive(); + if (context != null) { + logger.warn("=============="); + logger.warn("Constructor " + context + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); + logger.warn("Constructor Trans " + (tracer.currentTransaction() == context) + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); + logger.warn("=============="); + promisesToContext.put(thiz, context); + // this span might be ended before the Promise$Transformation#run method starts + // we have to avoid that this span gets recycled, even in the above mentioned case + context.incrementReferences(); + // Do no discard branches leading to async operations so not to break span references + context.setNonDiscardable(); + } + } + + } + + public static class TransformationSubmitWithValueInstrumentation extends FutureInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return named("scala.concurrent.impl.Promise$Transformation"); + } + + @Override + public ElementMatcher getMethodMatcher() { + return named("submitWithValue").and(returns(named("scala.concurrent.impl.Promise$Transformation"))); + } + + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + public static void onEnter(@Advice.This Object thiz) { + final AbstractSpan context = tracer.getActive(); + if (context != null) { + logger.warn("=============="); + logger.warn("SubmitWithValue " + context + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); + logger.warn("=============="); + promisesToContext.put(thiz, context); + // this span might be ended before the Promise$Transformation#run method starts + // we have to avoid that this span gets recycled, even in the above mentioned case + context.incrementReferences(); + // Do no discard branches leading to async operations so not to break span references + context.setNonDiscardable(); + } + } + } + + public static class TransformationRunInstrumentation extends FutureInstrumentation { + + @Override + public ElementMatcher getTypeMatcher() { + return named("scala.concurrent.impl.Promise$Transformation"); + } + + @Override + public ElementMatcher getMethodMatcher() { + return named("run").and(returns(void.class)); + } + + @Nullable + @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) + public static Object onEnter(@Advice.This Object thiz) { + AbstractSpan context = promisesToContext.remove(thiz); + if (context != null) { + logger.warn("=============="); + logger.warn("Enter Run " + context + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); + if (tracer.getActive() != context) context.activate(); + // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run + // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice + context.decrementReferences(); + logger.warn("Enter Run After 1 " + context + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); + logger.warn("Enter Run After 2 " + context.getReferenceCount() + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); + logger.warn("=============="); + } + return context; + } + + @Advice.OnMethodExit(suppress = Throwable.class, inline = false) + public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { + if (abstractSpanObj instanceof AbstractSpan) { + AbstractSpan context = (AbstractSpan) abstractSpanObj; + logger.warn("=============="); + logger.warn("Exit Run " + context + " on thread " + Thread.currentThread().getId()); + logger.warn("Exit Run Active 1 " + tracer.getActive() + " on thread " + Thread.currentThread().getId()); + logger.warn("Exit Run Active 2 " + context.getReferenceCount() + " on thread " + Thread.currentThread().getId()); + context.deactivate(); + logger.warn("Exit Run Active 3 " + tracer.getActive() + " on thread " + Thread.currentThread().getId()); + logger.warn("=============="); + } + } + } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation index 62e615d092..7138f858bb 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation @@ -3,3 +3,5 @@ #co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationRunInstrumentation #co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationSubmitWithValueInstrumentation co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$BatchedExecutionContextInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationConstructorInstrumentation +co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationRunInstrumentation diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala index a279245819..fe88f71802 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala @@ -31,209 +31,209 @@ class FutureInstrumentationSpec extends FunSuite { override def afterEach(context: AfterEach): Unit = ElasticApmAgent.reset() -// test("Scala Future should propagate the tracing-context correctly across different threads") { -// implicit val executionContext: ExecutionContextExecutor = -// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) -// -// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() -// -// val future = Future("Test") -// .map(_.length) -// .flatMap(l => Future(l * 2)) -// .map(_.toString) -// .flatMap(s => Future(s"$s-$s")) -// .map(_ => tracer.currentTransaction().addCustomContext("future", true)) -// -// Await.ready(future, 10.seconds) -// transaction.deactivate().end() -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], -// true -// ) -// -// -// } -// -// test("Worker thread should correctly set context on the current transaction") { -// implicit val executionContext: ExecutionContextExecutor = -// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) -// -// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() -// -// new TestFutureTraceMethods().invokeAsync(tracer) -// transaction.deactivate().end() -// assertEquals(reporter.getTransactions.size(), 1) -// assertEquals(reporter.getSpans.size(), 0) -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], -// true -// ) -// } -// -// test("Multiple async operations should be able to set context on the current transaction") { -// implicit val multiPoolEc: ExecutionContextExecutor = -// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) -// -// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() -// -// val future = Future -// .traverse(1 to 100) { _ => -// Future.sequence(List( -// Future { -// Thread.sleep(25) -// tracer.currentTransaction().addCustomContext("future1", true) -// }, -// Future { -// Thread.sleep(50) -// tracer.currentTransaction().addCustomContext("future2", true) -// }, -// Future { -// Thread.sleep(10) -// tracer.currentTransaction().addCustomContext("future3", true) -// } -// )) -// } -// -// Await.ready(future, 10.seconds) -// transaction.deactivate().end() -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], -// true -// ) -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], -// true -// ) -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], -// true -// ) -// -// } -// -// test("Handle a combination of Promises and Futures correctly") { -// implicit val multiPoolEc: ExecutionContextExecutor = -// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) -// -// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() -// -// val promise = Promise[Int]() -// -// Future { Thread.sleep(100) } -// .map(_ => 42) -// .onComplete { -// case Success(value) => promise.success(value) -// case Failure(exception) => promise.failure(exception) -// } -// -// val future = promise -// .future -// .map(_ => tracer.currentTransaction().addCustomContext("future", true)) -// -// Await.ready(future, 10.seconds) -// transaction.deactivate().end() -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], -// true -// ) -// -// } -// -// test("Handle a Future.sequence correctly") { -// implicit val multiPoolEc: ExecutionContextExecutor = -// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) -// -// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() -// -// val future = Future -// .sequence(List( -// Future(Thread.sleep(25)) -// )) -// .map(_ => tracer.currentTransaction().addCustomContext("future", true)) -// -// Await.ready(future, 10.seconds) -// transaction.deactivate().end() -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], -// true -// ) -// -// } -// -// test("Handle a combination of Promises and complex Futures correctly") { -// implicit val multiPoolEc: ExecutionContextExecutor = -// ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) -// -// val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() -// -// val promise = Promise[Int]() -// -// Future -// .sequence(List( -// Future(Thread.sleep(25)) -// )) -// .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) -// .map(_ => 42) -// .onComplete { -// case Success(value) => promise.success(value) -// case Failure(exception) => promise.failure(exception) -// } -// -// val future = promise -// .future -// .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) -// -// Await.ready(future, 10.seconds) -// transaction.deactivate().end() -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], -// true -// ) -// assertEquals( -// reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], -// true -// ) -// -// } + // test("Scala Future should propagate the tracing-context correctly across different threads") { + // implicit val executionContext: ExecutionContextExecutor = + // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + // + // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + // + // val future = Future("Test") + // .map(_.length) + // .flatMap(l => Future(l * 2)) + // .map(_.toString) + // .flatMap(s => Future(s"$s-$s")) + // .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + // + // Await.ready(future, 10.seconds) + // transaction.deactivate().end() + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + // true + // ) + // + // + // } + // + // test("Worker thread should correctly set context on the current transaction") { + // implicit val executionContext: ExecutionContextExecutor = + // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + // + // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + // + // new TestFutureTraceMethods().invokeAsync(tracer) + // transaction.deactivate().end() + // assertEquals(reporter.getTransactions.size(), 1) + // assertEquals(reporter.getSpans.size(), 0) + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + // true + // ) + // } + // + // test("Multiple async operations should be able to set context on the current transaction") { + // implicit val multiPoolEc: ExecutionContextExecutor = + // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + // + // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + // + // val future = Future + // .traverse(1 to 100) { _ => + // Future.sequence(List( + // Future { + // Thread.sleep(25) + // tracer.currentTransaction().addCustomContext("future1", true) + // }, + // Future { + // Thread.sleep(50) + // tracer.currentTransaction().addCustomContext("future2", true) + // }, + // Future { + // Thread.sleep(10) + // tracer.currentTransaction().addCustomContext("future3", true) + // } + // )) + // } + // + // Await.ready(future, 10.seconds) + // transaction.deactivate().end() + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + // true + // ) + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + // true + // ) + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], + // true + // ) + // + // } + // + // test("Handle a combination of Promises and Futures correctly") { + // implicit val multiPoolEc: ExecutionContextExecutor = + // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + // + // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + // + // val promise = Promise[Int]() + // + // Future { Thread.sleep(100) } + // .map(_ => 42) + // .onComplete { + // case Success(value) => promise.success(value) + // case Failure(exception) => promise.failure(exception) + // } + // + // val future = promise + // .future + // .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + // + // Await.ready(future, 10.seconds) + // transaction.deactivate().end() + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + // true + // ) + // + // } + // + // test("Handle a Future.sequence correctly") { + // implicit val multiPoolEc: ExecutionContextExecutor = + // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + // + // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + // + // val future = Future + // .sequence(List( + // Future(Thread.sleep(25)) + // )) + // .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + // + // Await.ready(future, 10.seconds) + // transaction.deactivate().end() + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + // true + // ) + // + // } + // + // test("Handle a combination of Promises and complex Futures correctly") { + // implicit val multiPoolEc: ExecutionContextExecutor = + // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) + // + // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + // + // val promise = Promise[Int]() + // + // Future + // .sequence(List( + // Future(Thread.sleep(25)) + // )) + // .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) + // .map(_ => 42) + // .onComplete { + // case Success(value) => promise.success(value) + // case Failure(exception) => promise.failure(exception) + // } + // + // val future = promise + // .future + // .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) + // + // Await.ready(future, 10.seconds) + // transaction.deactivate().end() + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + // true + // ) + // assertEquals( + // reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + // true + // ) + // + // } test("Scala Future should not propagate the tracing-context to unrelated threads") { implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(5)) - val fs = (1 to 10).map(transactionNumber => Future { + val fs = (1 to 5).map(transactionNumber => Future { Thread.sleep(10) val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate() - println(s"thread=${Thread.currentThread().getId} transaction=$transactionNumber, trace=${tracer.currentTransaction()} starting transaction") + println(s"MILAN - thread=${Thread.currentThread().getId} transaction=$transactionNumber, span=${tracer.getActive}, trace=${tracer.currentTransaction()} starting transaction") - val futures = (1 to 10) + val futures = (1 to 1) .map(futureNumber => Future { Thread.sleep(10) val currentTransactionNumber = tracer.currentTransaction().getNameAsString - println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, trace=${tracer.currentTransaction()} before futureNumber=$futureNumber, $currentTransactionNumber") + println(s"MILAN - thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} before futureNumber=$futureNumber, $currentTransactionNumber") assertEquals(transaction, tracer.currentTransaction()) assertEquals(currentTransactionNumber.toInt, transactionNumber) (transaction, futureNumber) - } - .map { case (transaction: Transaction, futureNumber: Int) => - Thread.sleep(10) - - val currentTransactionNumber = tracer.currentTransaction().getNameAsString - println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, trace=${tracer.currentTransaction()} map1 futureNumber=$futureNumber, $currentTransactionNumber") - - assertEquals(transaction, tracer.currentTransaction()) - assertEquals(currentTransactionNumber.toInt, transactionNumber) - - transaction }) +// .map { case (transaction: Transaction, futureNumber: Int) => +// Thread.sleep(10) +// +// val currentTransactionNumber = tracer.currentTransaction().getNameAsString +// println(s"MILAN - thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} map1 futureNumber=$futureNumber, $currentTransactionNumber") +// +// assertEquals(transaction, tracer.currentTransaction()) +// assertEquals(currentTransactionNumber.toInt, transactionNumber) +// +// transaction +// }) val future = Future.sequence(futures) - Await.result(future, 30.seconds) + Await.result(future, 10.seconds) transaction.deactivate().end() @@ -241,51 +241,51 @@ class FutureInstrumentationSpec extends FunSuite { val res = Future.sequence(fs) - Await.result(res, 60.seconds) + Await.result(res, 15.seconds) } } - private class TestFutureTraceMethods { - - /** - * Calling this method results in this method call tree: - * - * main thread | worker thread - * ------------------------------------------------------------------------------------------- - * invokeAsync | - * | | - * --- blockingMethodOnMainThread | - * | | - * --- nonBlockingMethodOnMainThread | - * | | - * --------------------------> methodOnWorkerThread - * | | - * | --- longMethod - * | - */ - def invokeAsync(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = blockingMethodOnMainThread(tracer) - - private def blockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = { - try { - Await.result(nonBlockingMethodOnMainThread(tracer), 10.seconds) - } catch { - case e: Exception => e.printStackTrace() - } +private class TestFutureTraceMethods { + + /** + * Calling this method results in this method call tree: + * + * main thread | worker thread + * ------------------------------------------------------------------------------------------- + * invokeAsync | + * | | + * --- blockingMethodOnMainThread | + * | | + * --- nonBlockingMethodOnMainThread | + * | | + * --------------------------> methodOnWorkerThread + * | | + * | --- longMethod + * | + */ + def invokeAsync(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = blockingMethodOnMainThread(tracer) + + private def blockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Unit = { + try { + Await.result(nonBlockingMethodOnMainThread(tracer), 10.seconds) + } catch { + case e: Exception => e.printStackTrace() } + } - private def nonBlockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Future[Unit] = - Future(methodOnWorkerThread(tracer)) + private def nonBlockingMethodOnMainThread(tracer: ElasticApmTracer)(implicit ec: ExecutionContext): Future[Unit] = + Future(methodOnWorkerThread(tracer)) - private def methodOnWorkerThread(tracer: ElasticApmTracer): Unit = longMethod(tracer) + private def methodOnWorkerThread(tracer: ElasticApmTracer): Unit = longMethod(tracer) - private def longMethod(tracer: ElasticApmTracer): Unit = { - try { - Thread.sleep(100) - tracer.currentTransaction().addCustomContext("future", true) - } catch { - case e: InterruptedException => e.printStackTrace() - } + private def longMethod(tracer: ElasticApmTracer): Unit = { + try { + Thread.sleep(100) + tracer.currentTransaction().addCustomContext("future", true) + } catch { + case e: InterruptedException => e.printStackTrace() } + } } From 1471abf0a16a2041cbb4c59a308538523b5dfc01 Mon Sep 17 00:00:00 2001 From: Milan van der Meer <5628925+milanvdm@users.noreply.github.com> Date: Mon, 12 Jul 2021 23:43:47 +0200 Subject: [PATCH 5/8] Make tests pass for scala Futures --- .../src/test/resources/log4j2-test.xml | 2 +- ...leCallableForkJoinTaskInstrumentation.java | 2 +- .../apm-scala-concurrent-plugin/pom.xml | 11 +- .../FutureInstrumentation.java | 100 +---- ...ic.apm.agent.sdk.ElasticApmInstrumentation | 4 - .../FutureInstrumentationSpec.scala | 374 +++++++++--------- .../agent/scalaconcurrent/RerunSuite.scala | 39 ++ 7 files changed, 242 insertions(+), 290 deletions(-) create mode 100644 apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/RerunSuite.scala diff --git a/apm-agent-core/src/test/resources/log4j2-test.xml b/apm-agent-core/src/test/resources/log4j2-test.xml index e7c91d2e6c..db122fc04d 100644 --- a/apm-agent-core/src/test/resources/log4j2-test.xml +++ b/apm-agent-core/src/test/resources/log4j2-test.xml @@ -14,6 +14,6 @@ - + diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java index 774b975782..83da50d063 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java @@ -75,7 +75,7 @@ public static Object onEnter(@Advice.This Object thiz) { @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false) public static void onExit(@Advice.Thrown Throwable thrown, @Nullable @Advice.Enter Object context) { - if (context instanceof AbstractSpan) { + if (context instanceof AbstractSpan && tracer.getActive() == context) { ((AbstractSpan) context).deactivate(); } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml index c79b7b2cd7..26dadb98e6 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml @@ -16,16 +16,17 @@ - - org.scala-lang - scala-library - 2.13.6 - ${project.groupId} apm-java-concurrent-plugin ${project.version} + + org.scala-lang + scala-library + 2.13.6 + test + org.scalameta munit_2.13 diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java index 9da85b1ce1..823cb31dcb 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java @@ -19,7 +19,7 @@ package co.elastic.apm.agent.scalaconcurrent; import co.elastic.apm.agent.bci.TracerAwareInstrumentation; -import co.elastic.apm.agent.cache.WeakKeySoftValueLoadingCache; +import co.elastic.apm.agent.concurrent.JavaConcurrent; import co.elastic.apm.agent.impl.transaction.AbstractSpan; import co.elastic.apm.agent.sdk.advice.AssignTo; import com.blogspot.mydailyjava.weaklockfree.WeakConcurrentMap; @@ -27,18 +27,11 @@ import net.bytebuddy.description.method.MethodDescription; import net.bytebuddy.description.type.TypeDescription; import net.bytebuddy.matcher.ElementMatcher; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import scala.concurrent.Future; -import scala.concurrent.Future$; -import scala.util.Try; import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; -import java.util.concurrent.Executor; -import co.elastic.apm.agent.concurrent.JavaConcurrent; import static net.bytebuddy.matcher.ElementMatchers.*; @@ -48,8 +41,6 @@ public abstract class FutureInstrumentation extends TracerAwareInstrumentation { public static final WeakConcurrentMap> promisesToContext = new WeakConcurrentMap.WithInlinedExpunction<>(); - private static final Logger logger = LoggerFactory.getLogger(FutureInstrumentation.class); - @Nonnull @Override public Collection getInstrumentationGroupNames() { @@ -82,38 +73,7 @@ public static void onExit(@Nullable @Advice.Thrown Throwable thrown, } } -// public static class FutureObjectInstrumentation extends FutureInstrumentation { -// -// @Override -// public ElementMatcher getTypeMatcher() { -// return named("scala.concurrent.Future$"); -// } -// -// @Override -// public ElementMatcher getMethodMatcher() { -// return isTypeInitializer(); -// } -// -// @Advice.OnMethodExit(suppress = Throwable.class, inline = false) -// public static void onExit() { -// logger.warn("=============="); -// logger.warn("Match on Future.unit"); -// logger.warn("=============="); -// -// final AbstractSpan context = tracer.getActive(); -// if (context != null) { -// logger.warn("=============="); -// logger.warn("Match on Future.unit " + context); -// logger.warn("=============="); -// } -// // Remove context on the initial Future.unit initialization such that following -// // chaining methods are not linked to this constant 'origin' Future. -// final Try unitFuture = Future$.MODULE$.unit().value().get(); -// logger.warn(promisesToContext.toString()); -//// promisesToContext.put(unitFuture, null); -// } -// } -// + public static class TransformationConstructorInstrumentation extends FutureInstrumentation { @Override @@ -130,10 +90,6 @@ public ElementMatcher getMethodMatcher() { public static void onExit(@Advice.This Object thiz) { final AbstractSpan context = tracer.getActive(); if (context != null) { - logger.warn("=============="); - logger.warn("Constructor " + context + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); - logger.warn("Constructor Trans " + (tracer.currentTransaction() == context) + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); - logger.warn("=============="); promisesToContext.put(thiz, context); // this span might be ended before the Promise$Transformation#run method starts // we have to avoid that this span gets recycled, even in the above mentioned case @@ -145,35 +101,6 @@ public static void onExit(@Advice.This Object thiz) { } - public static class TransformationSubmitWithValueInstrumentation extends FutureInstrumentation { - - @Override - public ElementMatcher getTypeMatcher() { - return named("scala.concurrent.impl.Promise$Transformation"); - } - - @Override - public ElementMatcher getMethodMatcher() { - return named("submitWithValue").and(returns(named("scala.concurrent.impl.Promise$Transformation"))); - } - - @Advice.OnMethodEnter(suppress = Throwable.class, inline = false) - public static void onEnter(@Advice.This Object thiz) { - final AbstractSpan context = tracer.getActive(); - if (context != null) { - logger.warn("=============="); - logger.warn("SubmitWithValue " + context + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); - logger.warn("=============="); - promisesToContext.put(thiz, context); - // this span might be ended before the Promise$Transformation#run method starts - // we have to avoid that this span gets recycled, even in the above mentioned case - context.incrementReferences(); - // Do no discard branches leading to async operations so not to break span references - context.setNonDiscardable(); - } - } - } - public static class TransformationRunInstrumentation extends FutureInstrumentation { @Override @@ -191,15 +118,12 @@ public ElementMatcher getMethodMatcher() { public static Object onEnter(@Advice.This Object thiz) { AbstractSpan context = promisesToContext.remove(thiz); if (context != null) { - logger.warn("=============="); - logger.warn("Enter Run " + context + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); - if (tracer.getActive() != context) context.activate(); - // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run - // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice - context.decrementReferences(); - logger.warn("Enter Run After 1 " + context + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); - logger.warn("Enter Run After 2 " + context.getReferenceCount() + " on thread " + Thread.currentThread().getId() + " for " + thiz.hashCode()); - logger.warn("=============="); + if (tracer.getActive() != context) { + context.activate(); + // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run + // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice + context.decrementReferences(); + } } return context; } @@ -208,13 +132,7 @@ public static Object onEnter(@Advice.This Object thiz) { public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { if (abstractSpanObj instanceof AbstractSpan) { AbstractSpan context = (AbstractSpan) abstractSpanObj; - logger.warn("=============="); - logger.warn("Exit Run " + context + " on thread " + Thread.currentThread().getId()); - logger.warn("Exit Run Active 1 " + tracer.getActive() + " on thread " + Thread.currentThread().getId()); - logger.warn("Exit Run Active 2 " + context.getReferenceCount() + " on thread " + Thread.currentThread().getId()); - context.deactivate(); - logger.warn("Exit Run Active 3 " + tracer.getActive() + " on thread " + Thread.currentThread().getId()); - logger.warn("=============="); + if (tracer.getActive() == context) context.deactivate(); } } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation index 7138f858bb..a850a9e3fe 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/resources/META-INF/services/co.elastic.apm.agent.sdk.ElasticApmInstrumentation @@ -1,7 +1,3 @@ -#co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$FutureObjectInstrumentation -#co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationConstructorInstrumentation -#co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationRunInstrumentation -#co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationSubmitWithValueInstrumentation co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$BatchedExecutionContextInstrumentation co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationConstructorInstrumentation co.elastic.apm.agent.scalaconcurrent.FutureInstrumentation$TransformationRunInstrumentation diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala index fe88f71802..4ca93d5841 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala @@ -1,20 +1,19 @@ package co.elastic.apm.agent.scalaconcurrent -import java.util.concurrent.{Executors, ForkJoinPool} import co.elastic.apm.agent.MockReporter import co.elastic.apm.agent.bci.ElasticApmAgent import co.elastic.apm.agent.configuration.{CoreConfiguration, SpyConfiguration} import co.elastic.apm.agent.impl.transaction.Transaction import co.elastic.apm.agent.impl.{ElasticApmTracer, ElasticApmTracerBuilder} -import munit.FunSuite import net.bytebuddy.agent.ByteBuddyAgent import org.stagemonitor.configuration.ConfigurationRegistry +import java.util.concurrent.{Executors, ForkJoinPool} import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future, Promise} +import scala.concurrent._ import scala.util.{Failure, Success} -class FutureInstrumentationSpec extends FunSuite { +class FutureInstrumentationSpec extends RerunSuite { private var reporter: MockReporter = _ private var tracer: ElasticApmTracer = _ @@ -31,217 +30,216 @@ class FutureInstrumentationSpec extends FunSuite { override def afterEach(context: AfterEach): Unit = ElasticApmAgent.reset() - // test("Scala Future should propagate the tracing-context correctly across different threads") { - // implicit val executionContext: ExecutionContextExecutor = - // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - // - // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - // - // val future = Future("Test") - // .map(_.length) - // .flatMap(l => Future(l * 2)) - // .map(_.toString) - // .flatMap(s => Future(s"$s-$s")) - // .map(_ => tracer.currentTransaction().addCustomContext("future", true)) - // - // Await.ready(future, 10.seconds) - // transaction.deactivate().end() - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - // true - // ) - // - // - // } - // - // test("Worker thread should correctly set context on the current transaction") { - // implicit val executionContext: ExecutionContextExecutor = - // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) - // - // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - // - // new TestFutureTraceMethods().invokeAsync(tracer) - // transaction.deactivate().end() - // assertEquals(reporter.getTransactions.size(), 1) - // assertEquals(reporter.getSpans.size(), 0) - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - // true - // ) - // } - // - // test("Multiple async operations should be able to set context on the current transaction") { - // implicit val multiPoolEc: ExecutionContextExecutor = - // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) - // - // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - // - // val future = Future - // .traverse(1 to 100) { _ => - // Future.sequence(List( - // Future { - // Thread.sleep(25) - // tracer.currentTransaction().addCustomContext("future1", true) - // }, - // Future { - // Thread.sleep(50) - // tracer.currentTransaction().addCustomContext("future2", true) - // }, - // Future { - // Thread.sleep(10) - // tracer.currentTransaction().addCustomContext("future3", true) - // } - // )) - // } - // - // Await.ready(future, 10.seconds) - // transaction.deactivate().end() - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], - // true - // ) - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], - // true - // ) - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], - // true - // ) - // - // } - // - // test("Handle a combination of Promises and Futures correctly") { - // implicit val multiPoolEc: ExecutionContextExecutor = - // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) - // - // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - // - // val promise = Promise[Int]() - // - // Future { Thread.sleep(100) } - // .map(_ => 42) - // .onComplete { - // case Success(value) => promise.success(value) - // case Failure(exception) => promise.failure(exception) - // } - // - // val future = promise - // .future - // .map(_ => tracer.currentTransaction().addCustomContext("future", true)) - // - // Await.ready(future, 10.seconds) - // transaction.deactivate().end() - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - // true - // ) - // - // } - // - // test("Handle a Future.sequence correctly") { - // implicit val multiPoolEc: ExecutionContextExecutor = - // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) - // - // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - // - // val future = Future - // .sequence(List( - // Future(Thread.sleep(25)) - // )) - // .map(_ => tracer.currentTransaction().addCustomContext("future", true)) - // - // Await.ready(future, 10.seconds) - // transaction.deactivate().end() - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], - // true - // ) - // - // } - // - // test("Handle a combination of Promises and complex Futures correctly") { - // implicit val multiPoolEc: ExecutionContextExecutor = - // ExecutionContext.fromExecutor(Executors.newFixedThreadPool(3)) - // - // val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() - // - // val promise = Promise[Int]() - // - // Future - // .sequence(List( - // Future(Thread.sleep(25)) - // )) - // .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) - // .map(_ => 42) - // .onComplete { - // case Success(value) => promise.success(value) - // case Failure(exception) => promise.failure(exception) - // } - // - // val future = promise - // .future - // .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) - // - // Await.ready(future, 10.seconds) - // transaction.deactivate().end() - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], - // true - // ) - // assertEquals( - // reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], - // true - // ) - // - // } - - test("Scala Future should not propagate the tracing-context to unrelated threads") { - implicit val executionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(5)) + override def munitExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(10)) + + test("Scala Future should propagate the tracing-context correctly across different threads") { + implicit val executionContext: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val future = Future("Test") + .map(_.length) + .flatMap(l => Future(l * 2)) + .map(_.toString) + .flatMap(s => Future(s"$s-$s")) + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + + Await.ready(future, 10.seconds) + transaction.end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + } + + test("Worker thread should correctly set context on the current transaction") { + implicit val executionContext: ExecutionContextExecutor = + ExecutionContext.fromExecutor(Executors.newFixedThreadPool(1)) + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + new TestFutureTraceMethods().invokeAsync(tracer) + transaction.end() + assertEquals(reporter.getTransactions.size(), 1) + assertEquals(reporter.getSpans.size(), 0) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + } + + test("Multiple async operations should be able to set context on the current transaction") { + implicit val multiPoolEc: ExecutionContext = munitExecutionContext + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val future = Future + .traverse(1 to 100) { _ => + Future.sequence(List( + Future { + Thread.sleep(25) + tracer.currentTransaction().addCustomContext("future1", true) + }, + Future { + Thread.sleep(50) + tracer.currentTransaction().addCustomContext("future2", true) + }, + Future { + Thread.sleep(10) + tracer.currentTransaction().addCustomContext("future3", true) + } + )) + } + + Await.ready(future, 10.seconds) + + println(s"thread=${Thread.currentThread().getId}, span=${tracer.getActive}. trace=${tracer.currentTransaction()}, transaction=$transaction") + + transaction.end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], + true + ) + + } + + test("Handle a combination of Promises and Futures correctly") { + implicit val multiPoolEc: ExecutionContext = munitExecutionContext + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val promise = Promise[Int]() + + Future { Thread.sleep(100) } + .map(_ => 42) + .onComplete { + case Success(value) => promise.success(value) + case Failure(exception) => promise.failure(exception) + } + + val future = promise + .future + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + + Await.ready(future, 10.seconds) + transaction.end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + + } + + test("Handle a Future.sequence correctly") { + implicit val multiPoolEc: ExecutionContext = munitExecutionContext + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val future = Future + .sequence(List( + Future(Thread.sleep(25)) + )) + .map(_ => tracer.currentTransaction().addCustomContext("future", true)) + + Await.ready(future, 10.seconds) + transaction.end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], + true + ) + + } + + test("Handle a combination of Promises and complex Futures correctly") { + implicit val multiPoolEc: ExecutionContext = munitExecutionContext + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val promise = Promise[Int]() + + Future + .sequence(List( + Future(Thread.sleep(25)) + )) + .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) + .map(_ => 42) + .onComplete { + case Success(value) => promise.success(value) + case Failure(exception) => promise.failure(exception) + } + + val future = promise + .future + .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) + + Await.ready(future, 10.seconds) + transaction.end() + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + true + ) + + } + + test("Scala Future should not propagate the tracing-context to unrelated threads".tag(Rerun(10))) { + implicit val executionContext: ExecutionContext = munitExecutionContext val fs = (1 to 5).map(transactionNumber => Future { Thread.sleep(10) val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate() - println(s"MILAN - thread=${Thread.currentThread().getId} transaction=$transactionNumber, span=${tracer.getActive}, trace=${tracer.currentTransaction()} starting transaction") + println(s"thread=${Thread.currentThread().getId} transaction=$transactionNumber, span=${tracer.getActive}, trace=${tracer.currentTransaction()} starting transaction") - val futures = (1 to 1) + val futures = (1 to 5) .map(futureNumber => Future { Thread.sleep(10) val currentTransactionNumber = tracer.currentTransaction().getNameAsString - println(s"MILAN - thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} before futureNumber=$futureNumber, $currentTransactionNumber") + println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} before futureNumber=$futureNumber, $currentTransactionNumber") assertEquals(transaction, tracer.currentTransaction()) assertEquals(currentTransactionNumber.toInt, transactionNumber) (transaction, futureNumber) + } + .map { case (transaction: Transaction, futureNumber: Int) => + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} map1 futureNumber=$futureNumber, $currentTransactionNumber") + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + transaction }) -// .map { case (transaction: Transaction, futureNumber: Int) => -// Thread.sleep(10) -// -// val currentTransactionNumber = tracer.currentTransaction().getNameAsString -// println(s"MILAN - thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} map1 futureNumber=$futureNumber, $currentTransactionNumber") -// -// assertEquals(transaction, tracer.currentTransaction()) -// assertEquals(currentTransactionNumber.toInt, transactionNumber) -// -// transaction -// }) val future = Future.sequence(futures) - Await.result(future, 10.seconds) + Await.result(future, 5.seconds) - transaction.deactivate().end() + transaction.end() }) val res = Future.sequence(fs) - Await.result(res, 15.seconds) + Await.result(res, 5.seconds) } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/RerunSuite.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/RerunSuite.scala new file mode 100644 index 0000000000..d5edde6f90 --- /dev/null +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/RerunSuite.scala @@ -0,0 +1,39 @@ +package co.elastic.apm.agent.scalaconcurrent + +import munit.{GenericAfterEach, GenericBeforeEach} + +case class Rerun(maxReruns: Int) extends munit.Tag("Rerun") +class RerunSuite extends munit.FunSuite { + + def retryFutureOnException(test: Test, maxReruns: Int, n: Int = 0): TestValue = { + val result = + if (n == maxReruns - 1) test.body() + else { + test.body().recoverWith { + case failedTest: munit.ComparisonFailException => throw failedTest + case _ => + this.afterEach(new GenericAfterEach(test)) + this.beforeEach(new GenericBeforeEach(test)) + retryFutureOnException(test, maxReruns, n + 1) + }(munitExecutionContext) + } + + Thread.sleep(100) + + result + } + + override def munitTestTransforms: List[TestTransform] = super.munitTestTransforms ++ List( + new TestTransform("Rerun", { test => + val maxReruns = test.tags + .collectFirst { case Rerun(maxReruns) => maxReruns } + .getOrElse(1) + if (maxReruns == 1) test + else { + test.withBody(() => { + retryFutureOnException(test, maxReruns) + }) + } + }) + ) +} From 9df3541d66f43b12ccbfa9001e28bcb093090c6a Mon Sep 17 00:00:00 2001 From: Milan van der Meer <5628925+milanvdm@users.noreply.github.com> Date: Tue, 13 Jul 2021 11:39:10 +0200 Subject: [PATCH 6/8] Fix deadlock with global ExecutionContext --- ...leCallableForkJoinTaskInstrumentation.java | 2 +- .../FutureInstrumentation.java | 18 +- .../FutureInstrumentationSpec.scala | 181 +++++++++++++----- .../agent/scalaconcurrent/RerunSuite.scala | 39 ---- 4 files changed, 148 insertions(+), 92 deletions(-) delete mode 100644 apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/RerunSuite.scala diff --git a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java index 83da50d063..774b975782 100644 --- a/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java +++ b/apm-agent-plugins/apm-java-concurrent-plugin/src/main/java/co/elastic/apm/agent/concurrent/RunnableCallableForkJoinTaskInstrumentation.java @@ -75,7 +75,7 @@ public static Object onEnter(@Advice.This Object thiz) { @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class, inline = false) public static void onExit(@Advice.Thrown Throwable thrown, @Nullable @Advice.Enter Object context) { - if (context instanceof AbstractSpan && tracer.getActive() == context) { + if (context instanceof AbstractSpan) { ((AbstractSpan) context).deactivate(); } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java index 823cb31dcb..c8478abf52 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/main/java/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentation.java @@ -33,7 +33,11 @@ import java.util.Arrays; import java.util.Collection; -import static net.bytebuddy.matcher.ElementMatchers.*; +import static net.bytebuddy.matcher.ElementMatchers.hasSuperType; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.returns; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; public abstract class FutureInstrumentation extends TracerAwareInstrumentation { @@ -118,12 +122,10 @@ public ElementMatcher getMethodMatcher() { public static Object onEnter(@Advice.This Object thiz) { AbstractSpan context = promisesToContext.remove(thiz); if (context != null) { - if (tracer.getActive() != context) { - context.activate(); - // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run - // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice - context.decrementReferences(); - } + context.activate(); + // decrements the reference we incremented to avoid that the parent context gets recycled before the promise is run + // because we have activated it, we can be sure it doesn't get recycled until we deactivate in the OnMethodExit advice + context.decrementReferences(); } return context; } @@ -132,7 +134,7 @@ public static Object onEnter(@Advice.This Object thiz) { public static void onExit(@Advice.Enter @Nullable Object abstractSpanObj) { if (abstractSpanObj instanceof AbstractSpan) { AbstractSpan context = (AbstractSpan) abstractSpanObj; - if (tracer.getActive() == context) context.deactivate(); + context.deactivate(); } } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala index 4ca93d5841..b38c03909c 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/FutureInstrumentationSpec.scala @@ -5,6 +5,7 @@ import co.elastic.apm.agent.bci.ElasticApmAgent import co.elastic.apm.agent.configuration.{CoreConfiguration, SpyConfiguration} import co.elastic.apm.agent.impl.transaction.Transaction import co.elastic.apm.agent.impl.{ElasticApmTracer, ElasticApmTracerBuilder} +import munit.FunSuite import net.bytebuddy.agent.ByteBuddyAgent import org.stagemonitor.configuration.ConfigurationRegistry @@ -13,7 +14,7 @@ import scala.concurrent.duration._ import scala.concurrent._ import scala.util.{Failure, Success} -class FutureInstrumentationSpec extends RerunSuite { +class FutureInstrumentationSpec extends FunSuite { private var reporter: MockReporter = _ private var tracer: ElasticApmTracer = _ @@ -30,7 +31,7 @@ class FutureInstrumentationSpec extends RerunSuite { override def afterEach(context: AfterEach): Unit = ElasticApmAgent.reset() - override def munitExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(10)) + override def munitExecutionContext: ExecutionContext = ExecutionContext.fromExecutorService(new ForkJoinPool(5)) test("Scala Future should propagate the tracing-context correctly across different threads") { implicit val executionContext: ExecutionContextExecutor = @@ -46,7 +47,8 @@ class FutureInstrumentationSpec extends RerunSuite { .map(_ => tracer.currentTransaction().addCustomContext("future", true)) Await.ready(future, 10.seconds) - transaction.end() + transaction.deactivate().end() + assertEquals( reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], true @@ -60,7 +62,8 @@ class FutureInstrumentationSpec extends RerunSuite { val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() new TestFutureTraceMethods().invokeAsync(tracer) - transaction.end() + transaction.deactivate().end() + assertEquals(reporter.getTransactions.size(), 1) assertEquals(reporter.getSpans.size(), 0) assertEquals( @@ -94,9 +97,50 @@ class FutureInstrumentationSpec extends RerunSuite { Await.ready(future, 10.seconds) - println(s"thread=${Thread.currentThread().getId}, span=${tracer.getActive}. trace=${tracer.currentTransaction()}, transaction=$transaction") + transaction.deactivate().end() + + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future2").asInstanceOf[Boolean], + true + ) + assertEquals( + reporter.getTransactions.get(0).getContext.getCustom("future3").asInstanceOf[Boolean], + true + ) + + } + + test("Multiple async operations should be able to set context on the current transaction with global EC") { + implicit val multiPoolEc: ExecutionContext = ExecutionContext.global + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName("Transaction").activate() + + val future = Future + .traverse(1 to 100) { _ => + Future.sequence(List( + Future { + Thread.sleep(25) + tracer.currentTransaction().addCustomContext("future1", true) + }, + Future { + Thread.sleep(50) + tracer.currentTransaction().addCustomContext("future2", true) + }, + Future { + Thread.sleep(10) + tracer.currentTransaction().addCustomContext("future3", true) + } + )) + } + + Await.ready(future, 10.seconds) + + transaction.deactivate().end() - transaction.end() assertEquals( reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], true @@ -131,7 +175,8 @@ class FutureInstrumentationSpec extends RerunSuite { .map(_ => tracer.currentTransaction().addCustomContext("future", true)) Await.ready(future, 10.seconds) - transaction.end() + transaction.deactivate().end() + assertEquals( reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], true @@ -151,7 +196,7 @@ class FutureInstrumentationSpec extends RerunSuite { .map(_ => tracer.currentTransaction().addCustomContext("future", true)) Await.ready(future, 10.seconds) - transaction.end() + transaction.deactivate().end() assertEquals( reporter.getTransactions.get(0).getContext.getCustom("future").asInstanceOf[Boolean], true @@ -166,23 +211,24 @@ class FutureInstrumentationSpec extends RerunSuite { val promise = Promise[Int]() - Future - .sequence(List( - Future(Thread.sleep(25)) - )) - .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) - .map(_ => 42) - .onComplete { - case Success(value) => promise.success(value) - case Failure(exception) => promise.failure(exception) - } + Future + .sequence(List( + Future(Thread.sleep(25)) + )) + .map(_ => tracer.currentTransaction().addCustomContext("future1", true)) + .map(_ => 42) + .onComplete { + case Success(value) => promise.success(value) + case Failure(exception) => promise.failure(exception) + } val future = promise .future .map(_ => tracer.currentTransaction().addCustomContext("future2", true)) Await.ready(future, 10.seconds) - transaction.end() + transaction.deactivate().end() + assertEquals( reporter.getTransactions.get(0).getContext.getCustom("future1").asInstanceOf[Boolean], true @@ -194,52 +240,99 @@ class FutureInstrumentationSpec extends RerunSuite { } - test("Scala Future should not propagate the tracing-context to unrelated threads".tag(Rerun(10))) { + test("Scala Future should not propagate the tracing-context to unrelated threads with async deactivation") { implicit val executionContext: ExecutionContext = munitExecutionContext - val fs = (1 to 5).map(transactionNumber => Future { + val fs = (1 to 10).map(transactionNumber => Future { Thread.sleep(10) val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate() - println(s"thread=${Thread.currentThread().getId} transaction=$transactionNumber, span=${tracer.getActive}, trace=${tracer.currentTransaction()} starting transaction") + val futures = (1 to 10) + .map { futureNumber => + Future { + Thread.sleep(10) - val futures = (1 to 5) - .map(futureNumber => Future { - Thread.sleep(10) + val currentTransactionNumber = tracer.currentTransaction().getNameAsString - val currentTransactionNumber = tracer.currentTransaction().getNameAsString + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) - println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} before futureNumber=$futureNumber, $currentTransactionNumber") + (transaction, futureNumber) + } + .map { case (transaction: Transaction, futureNumber: Int) => + Thread.sleep(10) - assertEquals(transaction, tracer.currentTransaction()) - assertEquals(currentTransactionNumber.toInt, transactionNumber) + val currentTransactionNumber = tracer.currentTransaction().getNameAsString - (transaction, futureNumber) + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + transaction + } } - .map { case (transaction: Transaction, futureNumber: Int) => - Thread.sleep(10) - val currentTransactionNumber = tracer.currentTransaction().getNameAsString - println(s"thread=${Thread.currentThread().getId} transactionNumber=$transactionNumber, span=${tracer.getActive}. trace=${tracer.currentTransaction()} map1 futureNumber=$futureNumber, $currentTransactionNumber") + val future = Future.sequence(futures) - assertEquals(transaction, tracer.currentTransaction()) - assertEquals(currentTransactionNumber.toInt, transactionNumber) + future.onComplete(_ => transaction.deactivate().end()) - transaction - }) + future - val future = Future.sequence(futures) + }) - Await.result(future, 5.seconds) + Await.result(Future.sequence(fs), 10.seconds) - transaction.end() + assert(tracer.currentTransaction() == null) + } - }) + test("Scala Future should not propagate the tracing-context to unrelated threads with deadlock") { + // See https://github.com/scala/bug/issues/12089 + implicit val executionContext: ExecutionContext = ExecutionContext.global + + val fs = (1 to 10).map { transactionNumber => + Future { + Thread.sleep(10) + + val transaction = tracer.startRootTransaction(getClass.getClassLoader).withName(transactionNumber.toString).activate() + + val futures = (1 to 10) + .map { futureNumber => + Future { + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + (transaction, futureNumber) + } + .map { case (transaction: Transaction, futureNumber: Int) => + Thread.sleep(10) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + transaction + } + } + + Await.result(Future.sequence(futures), Duration.Inf) + + val currentTransactionNumber = tracer.currentTransaction().getNameAsString + assertEquals(transaction, tracer.currentTransaction()) + assertEquals(currentTransactionNumber.toInt, transactionNumber) + + transaction.deactivate().end() + + } + } - val res = Future.sequence(fs) + Await.result(Future.sequence(fs), Duration.Inf) - Await.result(res, 5.seconds) + assert(tracer.currentTransaction() == null) } } diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/RerunSuite.scala b/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/RerunSuite.scala deleted file mode 100644 index d5edde6f90..0000000000 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/src/test/scala/co/elastic/apm/agent/scalaconcurrent/RerunSuite.scala +++ /dev/null @@ -1,39 +0,0 @@ -package co.elastic.apm.agent.scalaconcurrent - -import munit.{GenericAfterEach, GenericBeforeEach} - -case class Rerun(maxReruns: Int) extends munit.Tag("Rerun") -class RerunSuite extends munit.FunSuite { - - def retryFutureOnException(test: Test, maxReruns: Int, n: Int = 0): TestValue = { - val result = - if (n == maxReruns - 1) test.body() - else { - test.body().recoverWith { - case failedTest: munit.ComparisonFailException => throw failedTest - case _ => - this.afterEach(new GenericAfterEach(test)) - this.beforeEach(new GenericBeforeEach(test)) - retryFutureOnException(test, maxReruns, n + 1) - }(munitExecutionContext) - } - - Thread.sleep(100) - - result - } - - override def munitTestTransforms: List[TestTransform] = super.munitTestTransforms ++ List( - new TestTransform("Rerun", { test => - val maxReruns = test.tags - .collectFirst { case Rerun(maxReruns) => maxReruns } - .getOrElse(1) - if (maxReruns == 1) test - else { - test.withBody(() => { - retryFutureOnException(test, maxReruns) - }) - } - }) - ) -} From abdacbdd829982a7c54022ef4bf8ee9e813057ed Mon Sep 17 00:00:00 2001 From: Milan van der Meer <5628925+milanvdm@users.noreply.github.com> Date: Tue, 13 Jul 2021 11:40:50 +0200 Subject: [PATCH 7/8] Remove unrelated change --- .../apm/agent/grpc/latest/testapp/generated/HelloGrpc.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apm-agent-plugins/apm-grpc/apm-grpc-test-latest/src/test/java/co/elastic/apm/agent/grpc/latest/testapp/generated/HelloGrpc.java b/apm-agent-plugins/apm-grpc/apm-grpc-test-latest/src/test/java/co/elastic/apm/agent/grpc/latest/testapp/generated/HelloGrpc.java index 8c3814ec99..fe0787ef17 100644 --- a/apm-agent-plugins/apm-grpc/apm-grpc-test-latest/src/test/java/co/elastic/apm/agent/grpc/latest/testapp/generated/HelloGrpc.java +++ b/apm-agent-plugins/apm-grpc/apm-grpc-test-latest/src/test/java/co/elastic/apm/agent/grpc/latest/testapp/generated/HelloGrpc.java @@ -23,7 +23,7 @@ /** */ @javax.annotation.Generated( - value = "by gRPC proto compiler (version 1.39.0)", + value = "by gRPC proto compiler (version 1.38.1)", comments = "Source: rpc.proto") public final class HelloGrpc { From 11e5562a77c19299f404b7edfa500e50173c17c7 Mon Sep 17 00:00:00 2001 From: Milan van der Meer <5628925+milanvdm@users.noreply.github.com> Date: Sat, 25 Sep 2021 12:12:46 +0200 Subject: [PATCH 8/8] Upgrade dependencies --- apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml index 5511d3157c..d4804160fc 100644 --- a/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml +++ b/apm-agent-plugins/apm-scala-concurrent-plugin/pom.xml @@ -30,7 +30,7 @@ org.scalameta munit_2.13 - 0.7.26 + 0.7.29 test @@ -41,7 +41,7 @@ net.alchim31.maven scala-maven-plugin - 4.5.3 + 4.5.4 2.13.6