diff --git a/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie b/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie index a55ca9f645f..ad2b96727af 100644 --- a/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie +++ b/dd-java-agent/agent-tooling/src/main/resources/datadog/trace/agent/tooling/bytebuddy/matcher/ignored_class_name.trie @@ -299,6 +299,7 @@ 2 org.springframework.jndi.* 2 org.springframework.lang.* 2 org.springframework.messaging.* +0 org.springframework.messaging.handler.invocation.InvocableHandlerMethod 2 org.springframework.objenesis.* 2 org.springframework.orm.* 2 org.springframework.remoting.* diff --git a/dd-java-agent/instrumentation/spring-messaging-4/build.gradle b/dd-java-agent/instrumentation/spring-messaging-4/build.gradle new file mode 100644 index 00000000000..80cda772f9c --- /dev/null +++ b/dd-java-agent/instrumentation/spring-messaging-4/build.gradle @@ -0,0 +1,36 @@ + +muzzle { + pass { + group = 'org.springframework' + module = 'spring-messaging' + versions = "[4.0.0.RELEASE,)" + assertInverse = true + } +} + +ext { + minJavaVersionForTests = JavaVersion.VERSION_17 +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest', 'test') + +[compileTestGroovy, compileLatestDepTestGroovy].each { + it.javaLauncher = getJavaLauncherFor(17) +} + +dependencies { + compileOnly group: 'org.springframework', name: 'spring-messaging', version: '4.0.0.RELEASE' + + // capture SQS send and receive spans, propagate trace details in messages + testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4') + testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2') + testImplementation project(':dd-java-agent:instrumentation:aws-java-sqs-2.0') + + testImplementation group: 'jakarta.annotation', name: 'jakarta.annotation-api', version: '2.0.0' + testImplementation group: 'io.awspring.cloud', name: 'spring-cloud-aws-sqs', version: '3.0.1' + testImplementation group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.13', version: '1.2.3' + + latestDepTestImplementation group: 'org.springframework', name: 'spring-messaging', version: '+' +} diff --git a/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageDecorator.java b/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageDecorator.java new file mode 100644 index 00000000000..1cd547c75dc --- /dev/null +++ b/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageDecorator.java @@ -0,0 +1,43 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.bootstrap.instrumentation.api.InternalSpanTypes.MESSAGE_CONSUMER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; + +import datadog.trace.api.naming.SpanNaming; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.bootstrap.instrumentation.decorator.MessagingClientDecorator; + +public final class SpringMessageDecorator extends MessagingClientDecorator { + public static final SpringMessageDecorator DECORATE = new SpringMessageDecorator(); + + public static final CharSequence SPRING_INBOUND = + UTF8BytesString.create( + SpanNaming.instance().namingSchema().messaging().inboundOperation("spring")); + + public static final CharSequence COMPONENT_NAME = UTF8BytesString.create("spring-messaging"); + + @Override + protected CharSequence spanType() { + return MESSAGE_CONSUMER; + } + + @Override + protected String[] instrumentationNames() { + return new String[] {"spring-messaging"}; + } + + @Override + protected CharSequence component() { + return COMPONENT_NAME; + } + + @Override + protected String service() { + return null; + } + + @Override + protected String spanKind() { + return SPAN_KIND_CONSUMER; + } +} diff --git a/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageExtractAdapter.java b/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageExtractAdapter.java new file mode 100644 index 00000000000..102bd7bd4b9 --- /dev/null +++ b/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageExtractAdapter.java @@ -0,0 +1,42 @@ +package datadog.trace.instrumentation.springmessaging; + +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; +import de.thetaphi.forbiddenapis.SuppressForbidden; +import java.util.Locale; +import java.util.Map; +import java.util.function.Function; +import org.springframework.messaging.Message; + +public final class SpringMessageExtractAdapter + implements AgentPropagation.ContextVisitor> { + private static final Function KEY_MAPPER = + new Function() { + @SuppressForbidden + @Override + public String apply(String key) { + // normalize headers from different providers; raw SQS, JMS, spring-messaging, etc. + if ("AWSTraceHeader".equals(key) || "Sqs_Msa_AWSTraceHeader".equals(key)) { + return "x-amzn-trace-id"; + } + return key.replace("__dash__", "-").replace('$', '-').toLowerCase(Locale.ROOT); + } + }; + + private final DDCache cache = DDCaches.newFixedSizeCache(32); + + public static final SpringMessageExtractAdapter GETTER = new SpringMessageExtractAdapter(); + + @Override + public void forEachKey(Message carrier, AgentPropagation.KeyClassifier classifier) { + for (Map.Entry header : carrier.getHeaders().entrySet()) { + if (header.getValue() instanceof String) { + String lowerCaseKey = cache.computeIfAbsent(header.getKey(), KEY_MAPPER); + if (!classifier.accept(lowerCaseKey, (String) header.getValue())) { + return; + } + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java b/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java new file mode 100644 index 00000000000..eef26d9cd72 --- /dev/null +++ b/dd-java-agent/instrumentation/spring-messaging-4/src/main/java/datadog/trace/instrumentation/springmessaging/SpringMessageHandlerInstrumentation.java @@ -0,0 +1,87 @@ +package datadog.trace.instrumentation.springmessaging; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.DECORATE; +import static datadog.trace.instrumentation.springmessaging.SpringMessageDecorator.SPRING_INBOUND; +import static datadog.trace.instrumentation.springmessaging.SpringMessageExtractAdapter.GETTER; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import net.bytebuddy.asm.Advice; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; + +@AutoService(Instrumenter.class) +public final class SpringMessageHandlerInstrumentation extends Instrumenter.Tracing + implements Instrumenter.ForSingleType { + + public SpringMessageHandlerInstrumentation() { + super("spring-messaging", "spring-messaging-4"); + } + + @Override + public String instrumentedType() { + return "org.springframework.messaging.handler.invocation.InvocableHandlerMethod"; + } + + @Override + public void adviceTransformations(AdviceTransformation transformation) { + transformation.applyAdvice( + isMethod() + .and( + named("invoke") + .and(takesArgument(0, named("org.springframework.messaging.Message")))), + SpringMessageHandlerInstrumentation.class.getName() + "$HandleMessageAdvice"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".SpringMessageDecorator", + packageName + ".SpringMessageExtractAdapter", + packageName + ".SpringMessageExtractAdapter$1" + }; + } + + public static class HandleMessageAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static AgentScope onEnter( + @Advice.This InvocableHandlerMethod thiz, @Advice.Argument(0) Message message) { + AgentSpan.Context parentContext; + AgentSpan parent = activeSpan(); + if (null != parent) { + // prefer existing context, assume it was already extracted from this message + parentContext = parent.context(); + } else { + // otherwise try to re-extract the message context to avoid disconnected trace + parentContext = propagate().extract(message, GETTER); + } + AgentSpan span = startSpan(SPRING_INBOUND, parentContext); + DECORATE.afterStart(span); + span.setResourceName(DECORATE.spanNameForMethod(thiz.getMethod())); + return activateSpan(span); + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void onExit(@Advice.Enter AgentScope scope, @Advice.Thrown Throwable error) { + if (null == scope) { + return; + } + AgentSpan span = scope.span(); + if (null != error) { + DECORATE.onError(span, error); + } + scope.close(); + DECORATE.beforeFinish(span); + span.finish(); + } + } +} diff --git a/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/SpringListenerSQSTest.groovy b/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/SpringListenerSQSTest.groovy new file mode 100644 index 00000000000..2d9d060ccf2 --- /dev/null +++ b/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/SpringListenerSQSTest.groovy @@ -0,0 +1,182 @@ +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan + +import datadog.trace.agent.test.AgentTestRunner +import datadog.trace.agent.test.asserts.TraceAssert +import datadog.trace.agent.test.utils.TraceUtils +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.config.GeneralConfig +import datadog.trace.bootstrap.instrumentation.api.Tags +import datadog.trace.core.DDSpan +import io.awspring.cloud.sqs.operations.SqsTemplate +import listener.Config +import org.elasticmq.rest.sqs.SQSRestServer +import org.springframework.context.annotation.AnnotationConfigApplicationContext +import software.amazon.awssdk.services.sqs.SqsAsyncClient + +class SpringListenerSQSTest extends AgentTestRunner { + + @Override + protected void configurePreAgent() { + super.configurePreAgent() + injectSysConfig(GeneralConfig.SERVICE_NAME, "my-service") + } + + def "receiving message context used when no immediate context"() { + setup: + def context = new AnnotationConfigApplicationContext(Config) + def address = context.getBean(SQSRestServer).waitUntilStarted().localAddress() + def template = SqsTemplate.newTemplate(context.getBean(SqsAsyncClient)) + TEST_WRITER.waitForTraces(2) + TEST_WRITER.clear() + + when: + TraceUtils.runUnderTrace("parent") { + template.sendAsync("SpringListenerSQS", "a message") + } + + then: + def sendingSpan + assertTraces(4, SORT_TRACES_BY_START) { + trace(3) { + sendMessage(it, address, span(2)) + getQueueUrl(it, address, span(2)) + basicSpan(it, "parent") + sendingSpan = span(0) + } + trace(1) { + receiveMessage(it, address, sendingSpan) + } + trace(1) { + springSqsListener(it, sendingSpan) + } + trace(1) { + deleteMessageBatch(it, address) + } + } + } + + static sendMessage(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan) { + traceAssert.span { + serviceName "sqs" + operationName "aws.http" + resourceName "Sqs.SendMessage" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(parentSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_URL" "http://localhost:${address.port}/" + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "SendMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/SpringListenerSQS" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + defaultTags() + } + } + } + + static getQueueUrl(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan) { + traceAssert.span { + serviceName "java-aws-sdk" + operationName "aws.http" + resourceName "Sqs.GetQueueUrl" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(parentSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_URL" "http://localhost:${address.port}/" + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "GetQueueUrl" + "aws.agent" "java-aws-sdk" + "aws.queue.name" "SpringListenerSQS" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + "queuename" "SpringListenerSQS" + defaultTags() + } + } + } + + static receiveMessage(TraceAssert traceAssert, InetSocketAddress address, DDSpan parentSpan) { + traceAssert.span { + serviceName "sqs" + operationName "aws.http" + resourceName "Sqs.ReceiveMessage" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + childOf(parentSpan) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "ReceiveMessage" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/SpringListenerSQS" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + defaultTags(true) + } + } + } + + static springSqsListener(TraceAssert traceAssert, DDSpan parentSpan) { + traceAssert.span { + serviceName "my-service" + operationName "spring.consume" + resourceName "TestListener.observe" + spanType DDSpanTypes.MESSAGE_CONSUMER + errored false + measured true + childOf(parentSpan) + tags { + "$Tags.COMPONENT" "spring-messaging" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CONSUMER + defaultTags(true) + } + } + } + + static deleteMessageBatch(TraceAssert traceAssert, InetSocketAddress address) { + traceAssert.span { + serviceName "sqs" + operationName "aws.http" + resourceName "Sqs.DeleteMessageBatch" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + parent() + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_URL" "http://localhost:${address.port}/" + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" address.port + "$Tags.PEER_HOSTNAME" "localhost" + "aws.service" "Sqs" + "aws_service" "Sqs" + "aws.operation" "DeleteMessageBatch" + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:${address.port}/000000000000/SpringListenerSQS" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + defaultTags() + } + } + } +} diff --git a/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/listener/Config.groovy b/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/listener/Config.groovy new file mode 100644 index 00000000000..5b08e04460f --- /dev/null +++ b/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/listener/Config.groovy @@ -0,0 +1,50 @@ +package listener + +import io.awspring.cloud.sqs.config.SqsBootstrapConfiguration +import io.awspring.cloud.sqs.config.SqsMessageListenerContainerFactory +import org.elasticmq.rest.sqs.SQSRestServer +import org.elasticmq.rest.sqs.SQSRestServerBuilder +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.context.annotation.Import +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider +import software.amazon.awssdk.services.sqs.SqsAsyncClient + +import jakarta.annotation.PreDestroy + +@Configuration +@Import(SqsBootstrapConfiguration) +class Config { + + @Bean + SQSRestServer sqsRestServer() { + return SQSRestServerBuilder.withInterface("localhost").withDynamicPort().start() + } + + @Bean + SqsMessageListenerContainerFactory defaultSqsListenerContainerFactory() { + return SqsMessageListenerContainerFactory + .builder() + .sqsAsyncClient(sqsAsyncClient()) + .build() + } + + @Bean + SqsAsyncClient sqsAsyncClient() { + def sqsAddress = sqsRestServer().waitUntilStarted().localAddress() + return SqsAsyncClient.builder() + .credentialsProvider(AnonymousCredentialsProvider.create()) + .endpointOverride(new URI("http://localhost:${sqsAddress.port}")) + .build() + } + + @Bean + TestListener testListener() { + return new TestListener() + } + + @PreDestroy + void destroy() { + sqsRestServer().stopAndWait() + } +} diff --git a/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/listener/TestListener.groovy b/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/listener/TestListener.groovy new file mode 100644 index 00000000000..7d0dd2e612d --- /dev/null +++ b/dd-java-agent/instrumentation/spring-messaging-4/src/test/groovy/listener/TestListener.groovy @@ -0,0 +1,12 @@ +package listener + +import io.awspring.cloud.sqs.annotation.SqsListener +import org.springframework.stereotype.Component + +@Component +class TestListener { + @SqsListener(queueNames = "SpringListenerSQS") + void observe(String message) { + println "Received $message" + } +} diff --git a/dd-smoke-tests/spring-boot-rabbit/src/main/java/datadog/smoketest/springboot/rabbit/Receiver.java b/dd-smoke-tests/spring-boot-rabbit/src/main/java/datadog/smoketest/springboot/rabbit/Receiver.java index 769bf98dd4b..64b8e889a97 100644 --- a/dd-smoke-tests/spring-boot-rabbit/src/main/java/datadog/smoketest/springboot/rabbit/Receiver.java +++ b/dd-smoke-tests/spring-boot-rabbit/src/main/java/datadog/smoketest/springboot/rabbit/Receiver.java @@ -1,6 +1,5 @@ package datadog.smoketest.springboot.rabbit; -import datadog.trace.api.Trace; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.springframework.amqp.rabbit.annotation.RabbitListener; @@ -26,7 +25,6 @@ public Receiver(AppConfig config, RabbitTemplate template) { } } - @Trace @RabbitListener(queues = "${rabbit.receiver.queue}") public void receiveMessage(String msg) { if (forwardMessage) { diff --git a/dd-smoke-tests/spring-boot-rabbit/src/test/groovy/datadog/smoketest/SpringBootRabbitIntegrationTest.groovy b/dd-smoke-tests/spring-boot-rabbit/src/test/groovy/datadog/smoketest/SpringBootRabbitIntegrationTest.groovy index a30b5e4ad56..ccad8eb2f6f 100644 --- a/dd-smoke-tests/spring-boot-rabbit/src/test/groovy/datadog/smoketest/SpringBootRabbitIntegrationTest.groovy +++ b/dd-smoke-tests/spring-boot-rabbit/src/test/groovy/datadog/smoketest/SpringBootRabbitIntegrationTest.groovy @@ -82,9 +82,9 @@ class SpringBootRabbitIntegrationTest extends AbstractServerSmokeTest { ] if (processIndex == 0) { expected.add("[${service}:servlet.request:GET /roundtrip/{message}[${service}:spring.handler:WebController.roundtrip[${service}:amqp.command:basic.publish -> otherqueue]]]") - expected.add("[rabbitmq:amqp.deliver:amqp.deliver queue[${service}:amqp.command:basic.deliver queue[${service}:amqp.consume:amqp.consume queue[${service}:trace.annotation:Receiver.receiveMessage]]]]") + expected.add("[rabbitmq:amqp.deliver:amqp.deliver queue[${service}:amqp.command:basic.deliver queue[${service}:amqp.consume:amqp.consume queue[${service}:spring.consume:Receiver.receiveMessage]]]]") } else { - expected.add("[rabbitmq:amqp.deliver:amqp.deliver otherqueue[${service}:amqp.command:basic.deliver otherqueue[${service}:amqp.consume:amqp.consume otherqueue[${service}:trace.annotation:Receiver.receiveMessage[${service}:amqp.command:basic.publish -> queue]]]]]") + expected.add("[rabbitmq:amqp.deliver:amqp.deliver otherqueue[${service}:amqp.command:basic.deliver otherqueue[${service}:amqp.consume:amqp.consume otherqueue[${service}:spring.consume:Receiver.receiveMessage[${service}:amqp.command:basic.publish -> queue]]]]]") } return expected } diff --git a/settings.gradle b/settings.gradle index a12e2073525..27646c55ba3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -354,6 +354,7 @@ include ':dd-java-agent:instrumentation:spring-beans' include ':dd-java-agent:instrumentation:spring-cloud-zuul-2' include ':dd-java-agent:instrumentation:spring-data-1.8' include ':dd-java-agent:instrumentation:spring-jms-3.1' +include ':dd-java-agent:instrumentation:spring-messaging-4' include ':dd-java-agent:instrumentation:spring-rabbit' include ':dd-java-agent:instrumentation:spring-scheduling-3.1' include ':dd-java-agent:instrumentation:spring-security-5'