Skip to content

Commit

Permalink
SNS integration: do not modify readonly message attributes map (#7150)
Browse files Browse the repository at this point in the history
* fix bug when SNS message attr are readonly + test

* fix extra bug in v2 on message attr size
  • Loading branch information
vandonr authored and mcculls committed Jun 12, 2024
1 parent 97065ed commit 9756f28
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class SnsInterceptor extends RequestHandler2 {
Expand Down Expand Up @@ -46,24 +47,30 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
// the limit still applies here
if (messageAttributes.size() < 10) {
messageAttributes.put(
HashMap<String, MessageAttributeValue> modifiedMessageAttributes =
new HashMap<>(messageAttributes);
modifiedMessageAttributes.put(
"_datadog",
new MessageAttributeValue()
.withDataType(
"Binary") // Use Binary since SNS subscription filter policies fail silently
// with JSON strings
// https://github.com/DataDog/datadog-lambda-js/pull/269
.withBinaryValue(this.getMessageAttributeValueToInject(request)));
pRequest.setMessageAttributes(modifiedMessageAttributes);
}
} else if (request instanceof PublishBatchRequest) {
PublishBatchRequest pmbRequest = (PublishBatchRequest) request;
final ByteBuffer bytebuffer = this.getMessageAttributeValueToInject(request);
for (PublishBatchRequestEntry entry : pmbRequest.getPublishBatchRequestEntries()) {
Map<String, MessageAttributeValue> messageAttributes = entry.getMessageAttributes();
if (messageAttributes.size() < 10) {
messageAttributes.put(
HashMap<String, MessageAttributeValue> modifiedMessageAttributes =
new HashMap<>(messageAttributes);
modifiedMessageAttributes.put(
"_datadog",
new MessageAttributeValue().withDataType("Binary").withBinaryValue(bytebuffer));
entry.setMessageAttributes(modifiedMessageAttributes);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@ import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.sns.AmazonSNSClient
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.amazonaws.services.sns.model.MessageAttributeValue
import com.amazonaws.services.sns.model.PublishRequest
import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.config.GeneralConfig
import datadog.trace.bootstrap.instrumentation.api.Tags
import groovy.json.JsonSlurper
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import spock.lang.Shared
import groovy.json.JsonSlurper

import java.time.Duration
import org.testcontainers.containers.GenericContainer

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan

abstract class SnsClientTest extends VersionedNamingTestBase {
Expand Down Expand Up @@ -79,6 +82,27 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
abstract String expectedOperation(String awsService, String awsOperation)
abstract String expectedService(String awsService, String awsOperation)

def "trace details propagated when message attributes are readonly"() {
when:
TEST_WRITER.clear()

def headers = new HashMap<String, MessageAttributeValue>()
headers.put("mykey", new MessageAttributeValue().withStringValue("myvalue").withDataType("String"))
def readonlyHeaders = Collections.unmodifiableMap(headers)
snsClient.publish(new PublishRequest().withMessage("sometext").withTopicArn(testTopicARN).withMessageAttributes(readonlyHeaders))

def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0)

def messageBody = new JsonSlurper().parseText(message.body())

then:
// injected value is here
String injectedValue = messageBody["MessageAttributes"]["_datadog"]["Value"]
injectedValue.length() > 0
// original header value is still present
messageBody["MessageAttributes"]["mykey"] != null
}

def "trace details propagated via SNS system message attributes"() {
when:
TEST_WRITER.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,34 @@ public SdkRequest modifyRequest(
// Injecting the trace context into SNS messageAttributes.
if (context.request() instanceof PublishRequest) {
PublishRequest request = (PublishRequest) context.request();
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
// the limit still applies here
if (messageAttributes.size() < 10) {
messageAttributes.put(
if (request.messageAttributes().size() < 10) {
Map<String, MessageAttributeValue> modifiedMessageAttributes =
new HashMap<>(request.messageAttributes());
modifiedMessageAttributes.put(
"_datadog", // Use Binary since SNS subscription filter policies fail silently with JSON
// strings https://github.com/DataDog/datadog-lambda-js/pull/269
MessageAttributeValue.builder()
.dataType("Binary")
.binaryValue(this.getMessageAttributeValueToInject(executionAttributes))
.build());
return request.toBuilder().messageAttributes(modifiedMessageAttributes).build();
}
return request.toBuilder().messageAttributes(messageAttributes).build();
return request;
} else if (context.request() instanceof PublishBatchRequest) {
PublishBatchRequest request = (PublishBatchRequest) context.request();
ArrayList<PublishBatchRequestEntry> entries = new ArrayList<>();
final SdkBytes sdkBytes = this.getMessageAttributeValueToInject(executionAttributes);
for (PublishBatchRequestEntry entry : request.publishBatchRequestEntries()) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());
messageAttributes.put(
"_datadog",
MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build());
entries.add(entry.toBuilder().messageAttributes(messageAttributes).build());
if (entry.messageAttributes().size() < 10) {
Map<String, MessageAttributeValue> modifiedMessageAttributes =
new HashMap<>(entry.messageAttributes());
modifiedMessageAttributes.put(
"_datadog",
MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build());
entries.add(entry.toBuilder().messageAttributes(modifiedMessageAttributes).build());
}
}
return request.toBuilder().publishBatchRequestEntries(entries).build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@

import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.config.GeneralConfig
import datadog.trace.bootstrap.instrumentation.api.Tags
import groovy.json.JsonSlurper
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sns.model.MessageAttributeValue
import software.amazon.awssdk.services.sns.model.PublishResponse
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import spock.lang.Shared
import groovy.json.JsonSlurper

import java.time.Duration

Expand Down Expand Up @@ -77,6 +77,27 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
abstract String expectedOperation(String awsService, String awsOperation)
abstract String expectedService(String awsService, String awsOperation)

def "trace details propagated when message attributes are readonly"() {
when:
TEST_WRITER.clear()
PublishResponse response
def headers = new HashMap<String, MessageAttributeValue>()
headers.put("mykey", MessageAttributeValue.builder().stringValue("myvalue").dataType("String").build())
def readonlyHeaders = Collections.unmodifiableMap(headers)
snsClient.publish(b -> b.message("sometext").topicArn(testTopicARN).messageAttributes(readonlyHeaders))

def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0)

def messageBody = new JsonSlurper().parseText(message.body())

then:
// injected value is here
String injectedValue = messageBody["MessageAttributes"]["_datadog"]["Value"]
injectedValue.length() > 0
// original header value is still present
messageBody["MessageAttributes"]["mykey"] != null
}

def "trace details propagated via SNS system message attributes"() {
when:
TEST_WRITER.clear()
Expand Down

0 comments on commit 9756f28

Please sign in to comment.