Skip to content

Commit

Permalink
Merge pull request #2050 from DataDog/ban/akka-http-bah
Browse files Browse the repository at this point in the history
Akka Actor and Akka HTTP bindAndHandle instrumentation
  • Loading branch information
bantonsson committed Nov 13, 2020
2 parents 853bf9c + f18e638 commit 7c3ce96
Show file tree
Hide file tree
Showing 25 changed files with 1,289 additions and 509 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,30 @@ compileAkka23TestGroovy {
classpath += files(sourceSets.akka23Test.scala.classesDirectory)
}

// Run the akk23Test against the normal and latestDep akka version as well
sourceSets {
test {
scala {
source sourceSets.akka23Test.allSource
}
groovy {
source sourceSets.akka23Test.allSource
}
}
latestDepTest {
scala {
source sourceSets.akka23Test.allSource
}
groovy {
source sourceSets.akka23Test.allSource
}
}
}
compileLatestDepTestGroovy {
classpath += files(sourceSets.latestDepTest.scala.classesDirectory)
}


dependencies {
compileOnly group: 'com.typesafe.akka', name: "akka-actor_$scalaVersion", version: akkaVersion

Expand Down
Original file line number Diff line number Diff line change
@@ -1,35 +1,82 @@
import datadog.trace.core.DDSpan
import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.agent.test.AgentTestRunner
import spock.lang.Shared

class AkkaActorTest extends AgentTestRunner {
@Shared
def akkaTester = new AkkaActors()

def "akka #testMethod"() {
def "akka actor send #name #iterations"() {
setup:
AkkaActors akkaTester = new AkkaActors()
akkaTester."$testMethod"()
def barrier = akkaTester.block(name)

TEST_WRITER.waitForTraces(1)
List<DDSpan> trace = TEST_WRITER.get(0)
when:
(1..iterations).each {i ->
akkaTester.send(name, "$who $i")
}
barrier.release()

expect:
TEST_WRITER.size() == 1
trace.size() == 2
trace[0].resourceName.toString() == "AkkaActors.$testMethod"
findSpan(trace, "$expectedGreeting, Akka").context().getParentId() == trace[0].getSpanId()
then:
assertTraces(iterations) {
(1..iterations).each {i ->
trace(2) {
sortSpansByStart()
span {
resourceName "AkkaActors.send"
operationName "$name"
tags {
"$Tags.COMPONENT" "trace"
defaultTags()
}
}
span {
resourceName "Receiver.tracedChild"
operationName "$expectedGreeting, $who $i"
childOf(span(0))
tags {
"$Tags.COMPONENT" "trace"
defaultTags()
}
}
}
}
}

where:
testMethod | expectedGreeting
"basicTell" | "Howdy"
"basicAsk" | "Howdy"
"basicForward" | "Hello"
name | who | expectedGreeting | iterations
"tell" | "Akka" | "Howdy" | 1
"ask" | "Makka" | "Hi-diddly-ho" | 1
"forward" | "Pakka" | "Hello" | 1
"tell" | "Pakka" | "Howdy" | 10
"ask" | "Makka" | "Hi-diddly-ho" | 10
"forward" | "Akka" | "Hello" | 10
}

private DDSpan findSpan(List<DDSpan> trace, String opName) {
for (DDSpan span : trace) {
if (span.getOperationName() == opName) {
return span
def "actor message handling should close leaked scopes"() {
when:
akkaTester.leak("Leaker", "drip")

then:
assertTraces(1) {
trace(2) {
sortSpansByStart()
span {
resourceName "AkkaActors.leak"
operationName "leak all the things"
tags {
"$Tags.COMPONENT" "trace"
defaultTags()
}
}
span {
resourceName("drip")
operationName "Howdy, Leaker"
childOf(span(0))
tags {
defaultTags()
}
}
}
}
return null
}
}
Original file line number Diff line number Diff line change
@@ -1,106 +1,105 @@
import java.util.concurrent.Semaphore

import akka.actor.{Actor, ActorLogging, ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import datadog.trace.agent.test.AgentTestRunner.blockUntilChildSpansFinished
import datadog.trace.api.Trace
import datadog.trace.bootstrap.instrumentation.api.AgentTracer.{
activateSpan,
activeScope,
activeSpan
activeSpan,
startSpan
}

import scala.concurrent.duration._

// ! == send-message
object AkkaActors {
val system: ActorSystem = ActorSystem("helloAkka")

val printer: ActorRef = system.actorOf(Receiver.props, "receiverActor")

val howdyGreeter: ActorRef =
system.actorOf(Greeter.props("Howdy", printer), "howdyGreeter")

class AkkaActors extends AutoCloseable {
val system: ActorSystem = ActorSystem("akka-actors-test")
val receiver: ActorRef =
system.actorOf(Receiver.props, "receiver")
val forwarder: ActorRef =
system.actorOf(Forwarder.props(printer), "forwarderActor")
val helloGreeter: ActorRef =
system.actorOf(Greeter.props("Hello", forwarder), "helloGreeter")

@Trace
def tracedChild(opName: String): Unit = {
activeSpan().setSpanName(opName)
system.actorOf(Forwarder.props(receiver), "forwarder")
val tellGreeter: ActorRef =
system.actorOf(Greeter.props("Howdy", receiver), "tell-greeter")
val askGreeter: ActorRef =
system.actorOf(Greeter.props("Hi-diddly-ho", receiver), "ask-greeter")
val forwardGreeter: ActorRef =
system.actorOf(Greeter.props("Hello", forwarder), "forward-greeter")

override def close(): Unit = {
system.terminate()
}
}

class AkkaActors {

import AkkaActors._
import Greeter._

implicit val timeout: Timeout = 5.minutes
implicit val timeout: Timeout = 10.seconds

@Trace
def basicTell(): Unit = {
try {
activeScope().setAsyncPropagation(true)
howdyGreeter ! WhoToGreet("Akka")
howdyGreeter ! Greet
} finally {
blockUntilChildSpansFinished(1)
}
private val actors =
Map("tell" -> tellGreeter, "ask" -> askGreeter, "forward" -> forwardGreeter)

def block(name: String): Semaphore = {
val barrier = new Semaphore(0)
actors(name) ! Block(barrier)
barrier
}

@Trace
def basicAsk(): Unit = {
try {
activeScope().setAsyncPropagation(true)
howdyGreeter ! WhoToGreet("Akka")
howdyGreeter ? Greet
} finally {
blockUntilChildSpansFinished(1)
def send(name: String, who: String): Unit = {
val actor = actors(name)
activeScope().setAsyncPropagation(true)
activeSpan().setSpanName(name)
actor ! WhoToGreet(who)
if (name == "ask") {
actor ? Greet
} else {
actor ! Greet
}
}

@Trace
def basicForward(): Unit = {
try {
activeScope().setAsyncPropagation(true)
helloGreeter ! WhoToGreet("Akka")
helloGreeter ? Greet
} finally {
blockUntilChildSpansFinished(1)
}
def leak(who: String, leak: String): Unit = {
activeScope().setAsyncPropagation(true)
activeSpan().setSpanName("leak all the things")
tellGreeter ! WhoToGreet(who)
tellGreeter ! Leak(leak)
}
}

object Greeter {
def props(message: String, receiverActor: ActorRef): Props =
Props(new Greeter(message, receiverActor))

final case class Block(barrier: Semaphore)
final case class WhoToGreet(who: String)

case object Greet

final case class Leak(leak: String)
}

class Greeter(message: String, receiverActor: ActorRef) extends Actor {

import Greeter._
import Receiver._

var greeting = ""

def receive = {
case Block(barrier) =>
barrier.acquire()
case WhoToGreet(who) =>
greeting = s"$message, $who"
case Greet =>
receiverActor ! Greeting(greeting)
case Leak(leak) =>
val span = startSpan(greeting)
span.setResourceName(leak)
activateSpan(span)
span.finish()
}
}

object Receiver {
def props: Props = Props[Receiver]

final case class Greeting(greeting: String)

}

class Receiver extends Actor with ActorLogging {
Expand All @@ -109,9 +108,13 @@ class Receiver extends Actor with ActorLogging {

def receive = {
case Greeting(greeting) => {
AkkaActors.tracedChild(greeting)
tracedChild(greeting)
}
}

@Trace
def tracedChild(opName: String): Unit = {
activeSpan().setSpanName(opName)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package datadog.trace.instrumentation.akka.concurrent;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeScope;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;

import akka.dispatch.Envelope;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.java.concurrent.AdviceUtils;
import datadog.trace.bootstrap.instrumentation.java.concurrent.State;
import datadog.trace.context.TraceScope;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

@AutoService(Instrumenter.class)
public class AkkaActorCellInstrumentation extends Instrumenter.Default {

public AkkaActorCellInstrumentation() {
super("java_concurrent", "akka_concurrent", "akka_actor");
}

@Override
public ElementMatcher<? super TypeDescription> typeMatcher() {
return named("akka.actor.ActorCell");
}

@Override
public Map<String, String> contextStore() {
return singletonMap("akka.dispatch.Envelope", State.class.getName());
}

@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(isMethod().and(named("invoke")), getClass().getName() + "$InvokeAdvice");
}

/**
* This instrumentation is defensive and closes all scopes on the scope stack that were not there
* when we started processing this actor message. The reason for that is twofold.
*
* <p>1) An actor is self contained, and driven by a thread that could serve many other purposes,
* and a scope should not leak out after a message has been processed.
*
* <p>2) We rely on this cleanup mechanism to be able to intentionally leak the scope in the
* {@code AkkaHttpServerInstrumentation} so that it propagates to the user provided request
* handling code that will execute on the same thread in the same actor.
*/
public static class InvokeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static TraceScope enter(@Advice.Argument(value = 0) Envelope envelope) {
TraceScope scope =
AdviceUtils.startTaskScope(
InstrumentationContext.get(Envelope.class, State.class), envelope);
if (scope != null) {
return scope;
}
// If there is no scope created from the envelope, we create our own noopSpan to make sure
// that we can close all scopes up until this position after exit.
AgentSpan span = new AgentTracer.NoopAgentSpan();
return activateSpan(span, false);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void exit(@Advice.Enter TraceScope scope) {
// Clean up any leaking scopes from akka-streams/akka-http et.c.
TraceScope activeScope = activeScope();
while (activeScope != null && activeScope != scope) {
activeScope.close();
activeScope = activeScope();
}
while (activeScope == scope) {
scope.close();
activeScope = activeScope();
}
}
}
}

0 comments on commit 7c3ce96

Please sign in to comment.