diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java index 4912c89f15b..41d2814b6a4 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/main/java/datadog/trace/instrumentation/aws/v1/sns/SnsInterceptor.java @@ -15,6 +15,7 @@ import datadog.trace.bootstrap.instrumentation.api.AgentTracer; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; public class SnsInterceptor extends RequestHandler2 { @@ -46,7 +47,9 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request // 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore // the limit still applies here if (messageAttributes.size() < 10) { - messageAttributes.put( + HashMap modifiedMessageAttributes = + new HashMap<>(messageAttributes); + modifiedMessageAttributes.put( "_datadog", new MessageAttributeValue() .withDataType( @@ -54,6 +57,7 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request // with JSON strings // https://github.com/DataDog/datadog-lambda-js/pull/269 .withBinaryValue(this.getMessageAttributeValueToInject(request))); + pRequest.setMessageAttributes(modifiedMessageAttributes); } } else if (request instanceof PublishBatchRequest) { PublishBatchRequest pmbRequest = (PublishBatchRequest) request; @@ -61,9 +65,12 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request for (PublishBatchRequestEntry entry : pmbRequest.getPublishBatchRequestEntries()) { Map messageAttributes = entry.getMessageAttributes(); if (messageAttributes.size() < 10) { - messageAttributes.put( + HashMap modifiedMessageAttributes = + new HashMap<>(messageAttributes); + modifiedMessageAttributes.put( "_datadog", new MessageAttributeValue().withDataType("Binary").withBinaryValue(bytebuffer)); + entry.setMessageAttributes(modifiedMessageAttributes); } } } diff --git a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/test/groovy/SnsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/test/groovy/SnsClientTest.groovy index 20c1fa2d171..7465678e366 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-1.0/src/test/groovy/SnsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sns-1.0/src/test/groovy/SnsClientTest.groovy @@ -3,11 +3,15 @@ import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.sns.AmazonSNSClient import com.amazonaws.services.sns.AmazonSNSClientBuilder +import com.amazonaws.services.sns.model.MessageAttributeValue +import com.amazonaws.services.sns.model.PublishRequest import datadog.trace.agent.test.naming.VersionedNamingTestBase import datadog.trace.agent.test.utils.TraceUtils import datadog.trace.api.DDSpanTypes import datadog.trace.api.config.GeneralConfig import datadog.trace.bootstrap.instrumentation.api.Tags +import groovy.json.JsonSlurper +import org.testcontainers.containers.GenericContainer import org.testcontainers.utility.DockerImageName import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider @@ -15,10 +19,9 @@ import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sqs.SqsClient import software.amazon.awssdk.services.sqs.model.QueueAttributeName import spock.lang.Shared -import groovy.json.JsonSlurper import java.time.Duration -import org.testcontainers.containers.GenericContainer + import static datadog.trace.agent.test.utils.TraceUtils.basicSpan abstract class SnsClientTest extends VersionedNamingTestBase { @@ -79,6 +82,27 @@ abstract class SnsClientTest extends VersionedNamingTestBase { abstract String expectedOperation(String awsService, String awsOperation) abstract String expectedService(String awsService, String awsOperation) + def "trace details propagated when message attributes are readonly"() { + when: + TEST_WRITER.clear() + + def headers = new HashMap() + headers.put("mykey", new MessageAttributeValue().withStringValue("myvalue").withDataType("String")) + def readonlyHeaders = Collections.unmodifiableMap(headers) + snsClient.publish(new PublishRequest().withMessage("sometext").withTopicArn(testTopicARN).withMessageAttributes(readonlyHeaders)) + + def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0) + + def messageBody = new JsonSlurper().parseText(message.body()) + + then: + // injected value is here + String injectedValue = messageBody["MessageAttributes"]["_datadog"]["Value"] + injectedValue.length() > 0 + // original header value is still present + messageBody["MessageAttributes"]["mykey"] != null + } + def "trace details propagated via SNS system message attributes"() { when: TEST_WRITER.clear() diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java index 8bba0b41e48..24241319c0e 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/main/java/datadog/trace/instrumentation/aws/v2/sns/SnsInterceptor.java @@ -45,31 +45,34 @@ public SdkRequest modifyRequest( // Injecting the trace context into SNS messageAttributes. if (context.request() instanceof PublishRequest) { PublishRequest request = (PublishRequest) context.request(); - Map messageAttributes = - new HashMap<>(request.messageAttributes()); // 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore // the limit still applies here - if (messageAttributes.size() < 10) { - messageAttributes.put( + if (request.messageAttributes().size() < 10) { + Map modifiedMessageAttributes = + new HashMap<>(request.messageAttributes()); + modifiedMessageAttributes.put( "_datadog", // Use Binary since SNS subscription filter policies fail silently with JSON // strings https://github.com/DataDog/datadog-lambda-js/pull/269 MessageAttributeValue.builder() .dataType("Binary") .binaryValue(this.getMessageAttributeValueToInject(executionAttributes)) .build()); + return request.toBuilder().messageAttributes(modifiedMessageAttributes).build(); } - return request.toBuilder().messageAttributes(messageAttributes).build(); + return request; } else if (context.request() instanceof PublishBatchRequest) { PublishBatchRequest request = (PublishBatchRequest) context.request(); ArrayList entries = new ArrayList<>(); final SdkBytes sdkBytes = this.getMessageAttributeValueToInject(executionAttributes); for (PublishBatchRequestEntry entry : request.publishBatchRequestEntries()) { - Map messageAttributes = - new HashMap<>(entry.messageAttributes()); - messageAttributes.put( - "_datadog", - MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build()); - entries.add(entry.toBuilder().messageAttributes(messageAttributes).build()); + if (entry.messageAttributes().size() < 10) { + Map modifiedMessageAttributes = + new HashMap<>(entry.messageAttributes()); + modifiedMessageAttributes.put( + "_datadog", + MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build()); + entries.add(entry.toBuilder().messageAttributes(modifiedMessageAttributes).build()); + } } return request.toBuilder().publishBatchRequestEntries(entries).build(); } diff --git a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy index 5db1c312636..3cfb2914dcf 100644 --- a/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy +++ b/dd-java-agent/instrumentation/aws-java-sns-2.0/src/test/groovy/SnsClientTest.groovy @@ -1,20 +1,20 @@ - import datadog.trace.agent.test.naming.VersionedNamingTestBase import datadog.trace.agent.test.utils.TraceUtils import datadog.trace.api.DDSpanTypes import datadog.trace.api.config.GeneralConfig import datadog.trace.bootstrap.instrumentation.api.Tags +import groovy.json.JsonSlurper import org.testcontainers.containers.GenericContainer import org.testcontainers.utility.DockerImageName import software.amazon.awssdk.auth.credentials.AwsBasicCredentials import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider import software.amazon.awssdk.regions.Region import software.amazon.awssdk.services.sns.SnsClient +import software.amazon.awssdk.services.sns.model.MessageAttributeValue import software.amazon.awssdk.services.sns.model.PublishResponse import software.amazon.awssdk.services.sqs.SqsClient import software.amazon.awssdk.services.sqs.model.QueueAttributeName import spock.lang.Shared -import groovy.json.JsonSlurper import java.time.Duration @@ -77,6 +77,27 @@ abstract class SnsClientTest extends VersionedNamingTestBase { abstract String expectedOperation(String awsService, String awsOperation) abstract String expectedService(String awsService, String awsOperation) + def "trace details propagated when message attributes are readonly"() { + when: + TEST_WRITER.clear() + PublishResponse response + def headers = new HashMap() + headers.put("mykey", MessageAttributeValue.builder().stringValue("myvalue").dataType("String").build()) + def readonlyHeaders = Collections.unmodifiableMap(headers) + snsClient.publish(b -> b.message("sometext").topicArn(testTopicARN).messageAttributes(readonlyHeaders)) + + def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0) + + def messageBody = new JsonSlurper().parseText(message.body()) + + then: + // injected value is here + String injectedValue = messageBody["MessageAttributes"]["_datadog"]["Value"] + injectedValue.length() > 0 + // original header value is still present + messageBody["MessageAttributes"]["mykey"] != null + } + def "trace details propagated via SNS system message attributes"() { when: TEST_WRITER.clear()