diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/build.gradle b/dd-java-agent/instrumentation/aws-java-sns-1.0/build.gradle new file mode 100644 index 00000000000..480ea864e22 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/build.gradle @@ -0,0 +1,32 @@ +muzzle { + pass { + group = "com.amazonaws" + module = "aws-java-sdk-sns" + versions = "[1.12.113,2)" + assertInverse = true + } +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest', 'test') +addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test') + +dependencies { + compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-sns', version: '1.12.710' + + // Include httpclient instrumentation for testing because it is a dependency for aws-sdk. + testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4') + testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-1.11.0') + testImplementation group: 'com.amazonaws', name: 'aws-java-sdk-sns', version: '1.12.710' + // SQS is used to act as the "Subscriber" of the SNS topic. + // There's a problem with sqs sdk v1 with localstack+testcontainers testing. so use sdk v2 for sqs + testImplementation 'software.amazon.awssdk:sqs:2.25.40' + testImplementation 'org.testcontainers:localstack:1.19.7' + + latestDepTestImplementation group: 'com.amazonaws', name: 'aws-java-sdk-sns', version: '+' +} + +tasks.withType(Test).configureEach { + usesService(testcontainersLimit) +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsClientInstrumentation.java new file mode 100644 index 00000000000..b6565469b19 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsClientInstrumentation.java @@ -0,0 +1,64 @@ +package datadog.trace.instrumentation.aws.v1.sns; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.amazonaws.handlers.RequestHandler2; +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import datadog.trace.bootstrap.InstrumentationContext; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; + +/** AWS SDK v1 SNS instrumentation */ +@AutoService(InstrumenterModule.class) +public final class SnsClientInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + + public SnsClientInstrumentation() { + super("sns", "aws-sdk"); + } + + @Override + public String instrumentedType() { + return "com.amazonaws.handlers.HandlerChainFactory"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("newRequestHandler2Chain")), + SnsClientInstrumentation.class.getName() + "$HandlerChainAdvice"); + } + + @Override + public String[] helperClassNames() { + return new String[] {packageName + ".SnsInterceptor", packageName + ".TextMapInjectAdapter"}; + } + + @Override + public Map contextStore() { + return Collections.singletonMap( + "com.amazonaws.AmazonWebServiceRequest", + "datadog.trace.bootstrap.instrumentation.api.AgentSpan"); + } + + public static class HandlerChainAdvice { + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void addHandler(@Advice.Return final List handlers) { + for (RequestHandler2 interceptor : handlers) { + if (interceptor instanceof SnsInterceptor) { + return; // list already has our interceptor, return to builder + } + } + handlers.add( + new SnsInterceptor( + InstrumentationContext.get( + "com.amazonaws.AmazonWebServiceRequest", + "datadog.trace.bootstrap.instrumentation.api.AgentSpan"))); + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java new file mode 100644 index 00000000000..4912c89f15b --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java @@ -0,0 +1,80 @@ +package datadog.trace.instrumentation.aws.v1.sns; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.instrumentation.aws.v1.sns.TextMapInjectAdapter.SETTER; + +import com.amazonaws.AmazonWebServiceRequest; +import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.services.sns.model.MessageAttributeValue; +import com.amazonaws.services.sns.model.PublishBatchRequest; +import com.amazonaws.services.sns.model.PublishBatchRequestEntry; +import com.amazonaws.services.sns.model.PublishRequest; +import datadog.trace.api.TracePropagationStyle; +import datadog.trace.bootstrap.ContextStore; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.AgentTracer; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Map; + +public class SnsInterceptor extends RequestHandler2 { + + private final ContextStore contextStore; + + public SnsInterceptor(ContextStore contextStore) { + this.contextStore = contextStore; + } + + private ByteBuffer getMessageAttributeValueToInject(AmazonWebServiceRequest request) { + final AgentSpan span = newSpan(request); + StringBuilder jsonBuilder = new StringBuilder(); + jsonBuilder.append("{"); + propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG); + jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma + jsonBuilder.append("}"); + return ByteBuffer.wrap(jsonBuilder.toString().getBytes(StandardCharsets.UTF_8)); + } + + @Override + public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) { + // Injecting the trace context into SNS messageAttributes. + if (request instanceof PublishRequest) { + PublishRequest pRequest = (PublishRequest) request; + // note: modifying message attributes has to be done before marshalling, otherwise the changes + // are not reflected in the actual request (and the MD5 check on send will fail). + Map messageAttributes = pRequest.getMessageAttributes(); + // 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore + // the limit still applies here + if (messageAttributes.size() < 10) { + messageAttributes.put( + "_datadog", + new MessageAttributeValue() + .withDataType( + "Binary") // Use Binary since SNS subscription filter policies fail silently + // with JSON strings + // https://github.com/DataDog/datadog-lambda-js/pull/269 + .withBinaryValue(this.getMessageAttributeValueToInject(request))); + } + } else if (request instanceof PublishBatchRequest) { + PublishBatchRequest pmbRequest = (PublishBatchRequest) request; + final ByteBuffer bytebuffer = this.getMessageAttributeValueToInject(request); + for (PublishBatchRequestEntry entry : pmbRequest.getPublishBatchRequestEntries()) { + Map messageAttributes = entry.getMessageAttributes(); + if (messageAttributes.size() < 10) { + messageAttributes.put( + "_datadog", + new MessageAttributeValue().withDataType("Binary").withBinaryValue(bytebuffer)); + } + } + } + return request; + } + + private AgentSpan newSpan(AmazonWebServiceRequest request) { + final AgentSpan span = AgentTracer.startSpan("aws.sns.send"); + // pass the span to TracingRequestHandler in the sdk instrumentation where it'll be enriched & + // activated + contextStore.put(request, span); + return span; + } +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/TextMapInjectAdapter.java new file mode 100644 index 00000000000..6f2bb8f888b --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/TextMapInjectAdapter.java @@ -0,0 +1,13 @@ +package datadog.trace.instrumentation.aws.v1.sns; + +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; + +public class TextMapInjectAdapter implements AgentPropagation.Setter { + + public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter(); + + @Override + public void set(final StringBuilder builder, final String key, final String value) { + builder.append("\"").append(key).append("\":\"").append(value).append("\","); + } +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/test/groovy/SnsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/test/groovy/SnsClientTest.groovy new file mode 100644 index 00000000000..20c1fa2d171 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/test/groovy/SnsClientTest.groovy @@ -0,0 +1,206 @@ +import com.amazonaws.auth.AWSStaticCredentialsProvider +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.client.builder.AwsClientBuilder +import com.amazonaws.services.sns.AmazonSNSClient +import com.amazonaws.services.sns.AmazonSNSClientBuilder +import datadog.trace.agent.test.naming.VersionedNamingTestBase +import datadog.trace.agent.test.utils.TraceUtils +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.config.GeneralConfig +import datadog.trace.bootstrap.instrumentation.api.Tags +import org.testcontainers.utility.DockerImageName +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.sqs.SqsClient +import software.amazon.awssdk.services.sqs.model.QueueAttributeName +import spock.lang.Shared +import groovy.json.JsonSlurper + +import java.time.Duration +import org.testcontainers.containers.GenericContainer +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan + +abstract class SnsClientTest extends VersionedNamingTestBase { + + static final LOCALSTACK = new GenericContainer(DockerImageName.parse("localstack/localstack")) + .withExposedPorts(4566) // Default LocalStack port + .withEnv("SERVICES", "sns,sqs") // Enable SNS and SQS service + .withReuse(true) + .withStartupTimeout(Duration.ofSeconds(120)) + + @Shared AmazonSNSClient snsClient + @Shared SqsClient sqsClient + + @Shared String testQueueURL + @Shared String testQueueARN + @Shared String testTopicARN + + + def setupSpec() { + LOCALSTACK.start() + def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566) + snsClient = AmazonSNSClientBuilder.standard() + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, "us-east-1")) + .withCredentials( new AWSStaticCredentialsProvider(new BasicAWSCredentials("test", "test"))) + .build() + sqsClient = SqsClient.builder() + .endpointOverride(URI.create(endPoint)) + .region(Region.of("us-east-1")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) + .build() + testQueueURL = sqsClient.createQueue { it.queueName("testqueue") }.queueUrl() + testQueueARN = sqsClient.getQueueAttributes {it.queueUrl(testQueueURL).attributeNames(QueueAttributeName.QUEUE_ARN)}.attributes().get(QueueAttributeName.QUEUE_ARN) + testTopicARN = snsClient.createTopic("testtopic").topicArn + snsClient.subscribe(testTopicARN, "sqs", testQueueARN) + } + + def cleanupSpec() { + LOCALSTACK.stop() + } + + @Override + protected void configurePreAgent() { + super.configurePreAgent() + // Set a service name that gets sorted early with SORT_BY_NAMES + injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service") + } + + @Override + String operation() { + null + } + + @Override + String service() { + null + } + + abstract String expectedOperation(String awsService, String awsOperation) + abstract String expectedService(String awsService, String awsOperation) + + def "trace details propagated via SNS system message attributes"() { + when: + TEST_WRITER.clear() + TraceUtils.runUnderTrace('parent', { + snsClient.publish(testTopicARN, 'sometext') + }) + + def message = sqsClient.receiveMessage {it.queueUrl(testQueueURL).waitTimeSeconds(3)}.messages().get(0) + def jsonSlurper = new JsonSlurper() + def messageBody = jsonSlurper.parseText(message.body()) + def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566) + + then: + def sendSpan + assertTraces(2) { + trace(2) { + basicSpan(it, "parent") + span { + serviceName expectedService("SNS", "Publish") + operationName expectedOperation("SNS", "Publish") + resourceName "SNS.Publish" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(span(0)) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_URL" endPoint+'/' + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" LOCALSTACK.getMappedPort(4566) + "$Tags.PEER_HOSTNAME" LOCALSTACK.getHost() + "aws.service" "AmazonSNS" + "aws_service" "sns" + "aws.endpoint" endPoint + "aws.operation" "PublishRequest" + "aws.agent" "java-aws-sdk" + "aws.topic.name" "testtopic" + "topicname" "testtopic" + defaultTags() + } + } + sendSpan = span(1) + } + trace(1) { + span { + serviceName expectedService("None", "http.post") + operationName expectedOperation("None", "http.post") + resourceName "POST /" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + tags { + "$Tags.COMPONENT" "apache-httpclient" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_URL" endPoint+'/' + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" LOCALSTACK.getMappedPort(4566) + "$Tags.PEER_HOSTNAME" LOCALSTACK.getHost() + defaultTags(true) + } + } + } + } + + and: + messageBody["Message"] == "sometext" + String base64EncodedString = messageBody["MessageAttributes"]["_datadog"]["Value"] + byte[] decodedBytes = base64EncodedString.decodeBase64() + String decodedString = new String(decodedBytes, "UTF-8") + JsonSlurper slurper = new JsonSlurper() + Map traceContextInJson = slurper.parseText(decodedString) + traceContextInJson['x-datadog-trace-id'] == sendSpan.traceId.toString() + traceContextInJson['x-datadog-parent-id'] == sendSpan.spanId.toString() + traceContextInJson['x-datadog-sampling-priority'] == "1" + } +} + +class SnsClientV0Test extends SnsClientTest { + + @Override + String expectedOperation(String awsService, String awsOperation) { + if ("SNS" == awsService) { + return "aws.http" + } + return "http.request" + } + + @Override + String expectedService(String awsService, String awsOperation) { + if ("SNS" == awsService) { + return "sns" + } + return "A-service" + } + + @Override + int version() { + 0 + } +} + +class SnsClientV1ForkedTest extends SnsClientTest { + + @Override + String expectedOperation(String awsService, String awsOperation) { + if (awsService == "SNS" && awsOperation == "Publish") { + return "aws.sns.send" + } + return "http.client.request" + } + + @Override + String expectedService(String awsService, String awsOperation) { + "A-service" + } + + @Override + int version() { + 1 + } +} + diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/build.gradle b/dd-java-agent/instrumentation/aws-java-sns-2.0/build.gradle new file mode 100644 index 00000000000..41099604996 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/build.gradle @@ -0,0 +1,31 @@ +muzzle { + pass { + group = "software.amazon.awssdk" + module = "sns" + versions = "[2.17.84,3)" + assertInverse = true + } +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest', 'test') +addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test') + +dependencies { + compileOnly group: 'software.amazon.awssdk', name: 'sns', version: '2.25.40' + + // Include httpclient instrumentation for testing because it is a dependency for aws-sdk. + testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4') + testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-2.2') + testImplementation 'software.amazon.awssdk:sns:2.25.40' + // SQS is used to act as the "Subscriber" of the SNS topic. + testImplementation 'software.amazon.awssdk:sqs:2.25.40' + testImplementation 'org.testcontainers:localstack:1.19.7' + + latestDepTestImplementation group: 'software.amazon.awssdk', name: 'sns', version: '+' +} + +tasks.withType(Test).configureEach { + usesService(testcontainersLimit) +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsClientInstrumentation.java b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsClientInstrumentation.java new file mode 100644 index 00000000000..0e24d267baf --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsClientInstrumentation.java @@ -0,0 +1,58 @@ +package datadog.trace.instrumentation.aws.v2.sns; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; + +import com.google.auto.service.AutoService; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; + +/** AWS SDK v2 SNS instrumentation */ +@AutoService(InstrumenterModule.class) +public final class SnsClientInstrumentation extends InstrumenterModule.Tracing + implements Instrumenter.ForSingleType { + public SnsClientInstrumentation() { + super("sns", "aws-sdk"); + } + + @Override + public String instrumentedType() { + return "software.amazon.awssdk.core.client.builder.SdkDefaultClientBuilder"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice( + isMethod().and(named("resolveExecutionInterceptors")), + SnsClientInstrumentation.class.getName() + "$AwsSnsBuilderAdvice"); + } + + @Override + public String[] helperClassNames() { + return new String[] {packageName + ".SnsInterceptor", packageName + ".TextMapInjectAdapter"}; + } + + @Override + public Map contextStore() { + return Collections.singletonMap( + "com.amazonaws.AmazonWebServiceRequest", + "datadog.trace.bootstrap.instrumentation.api.AgentSpan"); + } + + public static class AwsSnsBuilderAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void addHandler(@Advice.Return final List interceptors) { + for (ExecutionInterceptor interceptor : interceptors) { + if (interceptor instanceof SnsInterceptor) { + return; // list already has our interceptor, return to builder + } + } + interceptors.add(new SnsInterceptor()); + } + } +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java new file mode 100644 index 00000000000..8bba0b41e48 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java @@ -0,0 +1,78 @@ +package datadog.trace.instrumentation.aws.v2.sns; + +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate; +import static datadog.trace.instrumentation.aws.v2.sns.TextMapInjectAdapter.SETTER; + +import datadog.trace.api.TracePropagationStyle; +import datadog.trace.bootstrap.InstanceStore; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.interceptor.Context; +import software.amazon.awssdk.core.interceptor.ExecutionAttribute; +import software.amazon.awssdk.core.interceptor.ExecutionAttributes; +import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; +import software.amazon.awssdk.services.sns.model.MessageAttributeValue; +import software.amazon.awssdk.services.sns.model.PublishBatchRequest; +import software.amazon.awssdk.services.sns.model.PublishBatchRequestEntry; +import software.amazon.awssdk.services.sns.model.PublishRequest; + +public class SnsInterceptor implements ExecutionInterceptor { + + public static final ExecutionAttribute SPAN_ATTRIBUTE = + InstanceStore.of(ExecutionAttribute.class) + .putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan")); + + private SdkBytes getMessageAttributeValueToInject(ExecutionAttributes executionAttributes) { + final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE); + StringBuilder jsonBuilder = new StringBuilder(); + jsonBuilder.append("{"); + propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG); + jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma + jsonBuilder.append("}"); + return SdkBytes.fromString(jsonBuilder.toString(), StandardCharsets.UTF_8); + } + + public SnsInterceptor() {} + + @Override + public SdkRequest modifyRequest( + Context.ModifyRequest context, ExecutionAttributes executionAttributes) { + // Injecting the trace context into SNS messageAttributes. + if (context.request() instanceof PublishRequest) { + PublishRequest request = (PublishRequest) context.request(); + Map messageAttributes = + new HashMap<>(request.messageAttributes()); + // 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore + // the limit still applies here + if (messageAttributes.size() < 10) { + messageAttributes.put( + "_datadog", // Use Binary since SNS subscription filter policies fail silently with JSON + // strings https://github.com/DataDog/datadog-lambda-js/pull/269 + MessageAttributeValue.builder() + .dataType("Binary") + .binaryValue(this.getMessageAttributeValueToInject(executionAttributes)) + .build()); + } + return request.toBuilder().messageAttributes(messageAttributes).build(); + } else if (context.request() instanceof PublishBatchRequest) { + PublishBatchRequest request = (PublishBatchRequest) context.request(); + ArrayList entries = new ArrayList<>(); + final SdkBytes sdkBytes = this.getMessageAttributeValueToInject(executionAttributes); + for (PublishBatchRequestEntry entry : request.publishBatchRequestEntries()) { + Map messageAttributes = + new HashMap<>(entry.messageAttributes()); + messageAttributes.put( + "_datadog", + MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build()); + entries.add(entry.toBuilder().messageAttributes(messageAttributes).build()); + } + return request.toBuilder().publishBatchRequestEntries(entries).build(); + } + return context.request(); + } +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/TextMapInjectAdapter.java b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/TextMapInjectAdapter.java new file mode 100644 index 00000000000..cfe0368e298 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/TextMapInjectAdapter.java @@ -0,0 +1,13 @@ +package datadog.trace.instrumentation.aws.v2.sns; + +import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; + +public class TextMapInjectAdapter implements AgentPropagation.Setter { + + public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter(); + + @Override + public void set(final StringBuilder builder, final String key, final String value) { + builder.append("\"").append(key).append("\":\"").append(value).append("\","); + } +} diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy new file mode 100644 index 00000000000..5db1c312636 --- /dev/null +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy @@ -0,0 +1,185 @@ + +import datadog.trace.agent.test.naming.VersionedNamingTestBase +import datadog.trace.agent.test.utils.TraceUtils +import datadog.trace.api.DDSpanTypes +import datadog.trace.api.config.GeneralConfig +import datadog.trace.bootstrap.instrumentation.api.Tags +import org.testcontainers.containers.GenericContainer +import org.testcontainers.utility.DockerImageName +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider +import software.amazon.awssdk.regions.Region +import software.amazon.awssdk.services.sns.SnsClient +import software.amazon.awssdk.services.sns.model.PublishResponse +import software.amazon.awssdk.services.sqs.SqsClient +import software.amazon.awssdk.services.sqs.model.QueueAttributeName +import spock.lang.Shared +import groovy.json.JsonSlurper + +import java.time.Duration + +import static datadog.trace.agent.test.utils.TraceUtils.basicSpan + +abstract class SnsClientTest extends VersionedNamingTestBase { + static final LOCALSTACK = new GenericContainer(DockerImageName.parse("localstack/localstack")) + .withExposedPorts(4566) // Default LocalStack port + .withEnv("SERVICES", "sns,sqs") // Enable SNS and SQS service + .withReuse(true) + .withStartupTimeout(Duration.ofSeconds(120)) + + @Shared SnsClient snsClient + @Shared SqsClient sqsClient + + @Shared String testQueueURL + @Shared String testQueueARN + @Shared String testTopicARN + + def setupSpec() { + LOCALSTACK.start() + def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566) + snsClient = SnsClient.builder() + .endpointOverride(URI.create(endPoint)) + .region(Region.of("us-east-1")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) + .build() + sqsClient = SqsClient.builder() + .endpointOverride(URI.create(endPoint)) + .region(Region.of("us-east-1")) + .credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test"))) + .build() + testQueueURL = sqsClient.createQueue { it.queueName("testqueue") }.queueUrl() + testQueueARN = sqsClient.getQueueAttributes {it.queueUrl(testQueueURL).attributeNames(QueueAttributeName.QUEUE_ARN)}.attributes().get(QueueAttributeName.QUEUE_ARN) + testTopicARN = snsClient.createTopic { it.name("testtopic") }.topicArn() + snsClient.subscribe {it.topicArn(testTopicARN).protocol("sqs").endpoint(testQueueARN)} + } + + def cleanupSpec() { + LOCALSTACK.stop() + } + + @Override + protected void configurePreAgent() { + super.configurePreAgent() + // Set a service name that gets sorted early with SORT_BY_NAMES + injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service") + } + + @Override + String operation() { + null + } + + @Override + String service() { + null + } + + abstract String expectedOperation(String awsService, String awsOperation) + abstract String expectedService(String awsService, String awsOperation) + + def "trace details propagated via SNS system message attributes"() { + when: + TEST_WRITER.clear() + PublishResponse response + TraceUtils.runUnderTrace('parent', { + response = snsClient.publish { it.message("sometext").topicArn(testTopicARN)} + }) + + def message = sqsClient.receiveMessage {it.queueUrl(testQueueURL).waitTimeSeconds(3)}.messages().get(0) + def jsonSlurper = new JsonSlurper() + def messageBody = jsonSlurper.parseText(message.body()) + def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566) + + then: + def sendSpan + assertTraces(1) { + trace(2) { + basicSpan(it, "parent") + span { + serviceName expectedService("SNS", "Publish") + operationName expectedOperation("SNS", "Publish") + resourceName "Sns.Publish" + spanType DDSpanTypes.HTTP_CLIENT + errored false + measured true + childOf(span(0)) + tags { + "$Tags.COMPONENT" "java-aws-sdk" + "$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT + "$Tags.HTTP_URL" endPoint+'/' + "$Tags.HTTP_METHOD" "POST" + "$Tags.HTTP_STATUS" 200 + "$Tags.PEER_PORT" LOCALSTACK.getMappedPort(4566) + "$Tags.PEER_HOSTNAME" LOCALSTACK.getHost() + "aws.service" "Sns" + "aws_service" "Sns" + "aws.operation" "Publish" + "aws.agent" "java-aws-sdk" + "aws.topic.name" "testtopic" + "topicname" "testtopic" + "aws.requestId" response.responseMetadata().requestId() + defaultTags() + } + } + sendSpan = span(1) + } + } + + and: + messageBody["Message"] == "sometext" + String base64EncodedString = messageBody["MessageAttributes"]["_datadog"]["Value"] + byte[] decodedBytes = base64EncodedString.decodeBase64() + String decodedString = new String(decodedBytes, "UTF-8") + JsonSlurper slurper = new JsonSlurper() + Map traceContextInJson = slurper.parseText(decodedString) + traceContextInJson['x-datadog-trace-id'] == sendSpan.traceId.toString() + traceContextInJson['x-datadog-parent-id'] == sendSpan.spanId.toString() + traceContextInJson['x-datadog-sampling-priority'] == "1" + } +} + +class SnsClientV0Test extends SnsClientTest { + + @Override + String expectedOperation(String awsService, String awsOperation) { + if ("SNS" == awsService) { + return "aws.http" + } + return "http.request" + } + + @Override + String expectedService(String awsService, String awsOperation) { + if ("SNS" == awsService) { + return "sns" + } + return "A-service" + } + + @Override + int version() { + 0 + } +} + +class SnsClientV1ForkedTest extends SnsClientTest { + + @Override + String expectedOperation(String awsService, String awsOperation) { + if (awsService == "SNS" && awsOperation == "Publish") { + return "aws.sns.send" + } + return "http.client.request" + } + + @Override + String expectedService(String awsService, String awsOperation) { + "A-service" + } + + @Override + int version() { + 1 + } +} + diff --git a/settings.gradle b/settings.gradle index 83f67f043d4..8632adc8d4c 100644 --- a/settings.gradle +++ b/settings.gradle @@ -171,6 +171,8 @@ include ':dd-java-agent:instrumentation:armeria-grpc' include ':dd-java-agent:instrumentation:armeria-jetty' include ':dd-java-agent:instrumentation:aws-java-sdk-1.11.0' include ':dd-java-agent:instrumentation:aws-java-sdk-2.2' +include ':dd-java-agent:instrumentation:aws-java-sns-1.0' +include ':dd-java-agent:instrumentation:aws-java-sns-2.0' include ':dd-java-agent:instrumentation:aws-java-sqs-1.0' include ':dd-java-agent:instrumentation:aws-java-sqs-2.0' include ':dd-java-agent:instrumentation:aws-lambda-handler'