Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not modify readonly message attributes map on SNS integration #7150

Merged
merged 3 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;

public class SnsInterceptor extends RequestHandler2 {
Expand Down Expand Up @@ -46,24 +47,30 @@ public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
// the limit still applies here
if (messageAttributes.size() < 10) {
messageAttributes.put(
HashMap<String, MessageAttributeValue> modifiedMessageAttributes =
new HashMap<>(messageAttributes);
modifiedMessageAttributes.put(
"_datadog",
new MessageAttributeValue()
.withDataType(
"Binary") // Use Binary since SNS subscription filter policies fail silently
// with JSON strings
// https://github.com/DataDog/datadog-lambda-js/pull/269
.withBinaryValue(this.getMessageAttributeValueToInject(request)));
pRequest.setMessageAttributes(modifiedMessageAttributes);
}
} else if (request instanceof PublishBatchRequest) {
PublishBatchRequest pmbRequest = (PublishBatchRequest) request;
final ByteBuffer bytebuffer = this.getMessageAttributeValueToInject(request);
for (PublishBatchRequestEntry entry : pmbRequest.getPublishBatchRequestEntries()) {
Map<String, MessageAttributeValue> messageAttributes = entry.getMessageAttributes();
if (messageAttributes.size() < 10) {
messageAttributes.put(
HashMap<String, MessageAttributeValue> modifiedMessageAttributes =
new HashMap<>(messageAttributes);
modifiedMessageAttributes.put(
"_datadog",
new MessageAttributeValue().withDataType("Binary").withBinaryValue(bytebuffer));
entry.setMessageAttributes(modifiedMessageAttributes);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,25 @@ import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.sns.AmazonSNSClient
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import com.amazonaws.services.sns.model.MessageAttributeValue
import com.amazonaws.services.sns.model.PublishRequest
import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.config.GeneralConfig
import datadog.trace.bootstrap.instrumentation.api.Tags
import groovy.json.JsonSlurper
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import spock.lang.Shared
import groovy.json.JsonSlurper

import java.time.Duration
import org.testcontainers.containers.GenericContainer

import static datadog.trace.agent.test.utils.TraceUtils.basicSpan

abstract class SnsClientTest extends VersionedNamingTestBase {
Expand Down Expand Up @@ -79,6 +82,27 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
abstract String expectedOperation(String awsService, String awsOperation)
abstract String expectedService(String awsService, String awsOperation)

def "trace details propagated when message attributes are readonly"() {
when:
TEST_WRITER.clear()

def headers = new HashMap<String, MessageAttributeValue>()
headers.put("mykey", new MessageAttributeValue().withStringValue("myvalue"))
def readonlyHeaders = Collections.unmodifiableMap(headers)
snsClient.publish(new PublishRequest().withMessage("sometext").withTopicArn(testTopicARN).withMessageAttributes(readonlyHeaders))

def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0)

def messageBody = new JsonSlurper().parseText(message.body())

then:
// injected value is here
String injectedValue = messageBody["MessageAttributes"]["_datadog"]["Value"]
injectedValue.length() > 0
// original header value is still present
messageBody["MessageAttributes"]["mykey"] != null
}

def "trace details propagated via SNS system message attributes"() {
when:
TEST_WRITER.clear()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,31 +45,34 @@ public SdkRequest modifyRequest(
// Injecting the trace context into SNS messageAttributes.
if (context.request() instanceof PublishRequest) {
PublishRequest request = (PublishRequest) context.request();
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
// the limit still applies here
if (messageAttributes.size() < 10) {
messageAttributes.put(
if (request.messageAttributes().size() < 10) {
Map<String, MessageAttributeValue> modifiedMessageAttributes =
new HashMap<>(request.messageAttributes());
modifiedMessageAttributes.put(
"_datadog", // Use Binary since SNS subscription filter policies fail silently with JSON
// strings https://github.com/DataDog/datadog-lambda-js/pull/269
MessageAttributeValue.builder()
.dataType("Binary")
.binaryValue(this.getMessageAttributeValueToInject(executionAttributes))
.build());
return request.toBuilder().messageAttributes(modifiedMessageAttributes).build();
}
return request.toBuilder().messageAttributes(messageAttributes).build();
return request;
} else if (context.request() instanceof PublishBatchRequest) {
PublishBatchRequest request = (PublishBatchRequest) context.request();
ArrayList<PublishBatchRequestEntry> entries = new ArrayList<>();
final SdkBytes sdkBytes = this.getMessageAttributeValueToInject(executionAttributes);
for (PublishBatchRequestEntry entry : request.publishBatchRequestEntries()) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());
messageAttributes.put(
"_datadog",
MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build());
entries.add(entry.toBuilder().messageAttributes(messageAttributes).build());
if (entry.messageAttributes().size() < 10) {
Map<String, MessageAttributeValue> modifiedMessageAttributes =
new HashMap<>(entry.messageAttributes());
modifiedMessageAttributes.put(
"_datadog",
MessageAttributeValue.builder().dataType("Binary").binaryValue(sdkBytes).build());
entries.add(entry.toBuilder().messageAttributes(modifiedMessageAttributes).build());
}
}
return request.toBuilder().publishBatchRequestEntries(entries).build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@

import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.config.GeneralConfig
import datadog.trace.bootstrap.instrumentation.api.Tags
import groovy.json.JsonSlurper
import org.testcontainers.containers.GenericContainer
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sns.SnsClient
import software.amazon.awssdk.services.sns.model.MessageAttributeValue
import software.amazon.awssdk.services.sns.model.PublishResponse
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import spock.lang.Shared
import groovy.json.JsonSlurper

import java.time.Duration

Expand Down Expand Up @@ -77,6 +77,27 @@ abstract class SnsClientTest extends VersionedNamingTestBase {
abstract String expectedOperation(String awsService, String awsOperation)
abstract String expectedService(String awsService, String awsOperation)

def "trace details propagated when message attributes are readonly"() {
when:
TEST_WRITER.clear()
PublishResponse response
def headers = new HashMap<String, MessageAttributeValue>()
headers.put("mykey", MessageAttributeValue.builder().stringValue("myvalue").dataType("String").build())
def readonlyHeaders = Collections.unmodifiableMap(headers)
snsClient.publish(b -> b.message("sometext").topicArn(testTopicARN).messageAttributes(readonlyHeaders))

def message = sqsClient.receiveMessage { it.queueUrl(testQueueURL).waitTimeSeconds(3) }.messages().get(0)

def messageBody = new JsonSlurper().parseText(message.body())

then:
// injected value is here
String injectedValue = messageBody["MessageAttributes"]["_datadog"]["Value"]
injectedValue.length() > 0
// original header value is still present
messageBody["MessageAttributes"]["mykey"] != null
}

def "trace details propagated via SNS system message attributes"() {
when:
TEST_WRITER.clear()
Expand Down
Loading