Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add aws sns instrumentation for AWS lambda #6908

Merged
merged 20 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions dd-java-agent/instrumentation/aws-java-sns-1.0/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
muzzle {
pass {
group = "com.amazonaws"
module = "aws-java-sdk-sns"
versions = "[1.12.113,2)"
assertInverse = true
}
}

apply from: "$rootDir/gradle/java.gradle"

addTestSuiteForDir('latestDepTest', 'test')
addTestSuiteExtendingForDir('latestDepForkedTest', 'latestDepTest', 'test')

dependencies {
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-sns', version: '1.12.710'

// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testImplementation project(':dd-java-agent:instrumentation:apache-httpclient-4')
testImplementation project(':dd-java-agent:instrumentation:aws-java-sdk-1.11.0')
testImplementation group: 'com.amazonaws', name: 'aws-java-sdk-sns', version: '1.12.710'
// SQS is used to act as the "Subscriber" of the SNS topic.
// There's a problem with sqs sdk v1 with localstack+testcontainers testing. so use sdk v2 for sqs
testImplementation 'software.amazon.awssdk:sqs:2.25.40'
testImplementation 'org.testcontainers:localstack:1.19.7'

latestDepTestImplementation group: 'com.amazonaws', name: 'aws-java-sdk-sns', version: '+'
joeyzhao2018 marked this conversation as resolved.
Show resolved Hide resolved
}

tasks.withType(Test).configureEach {
usesService(testcontainersLimit)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package datadog.trace.instrumentation.aws.v1.sns;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.amazonaws.handlers.RequestHandler2;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.agent.tooling.InstrumenterModule;
import datadog.trace.bootstrap.InstrumentationContext;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;

/** AWS SDK v1 SNS instrumentation */
@AutoService(InstrumenterModule.class)
public final class SnsClientInstrumentation extends InstrumenterModule.Tracing
implements Instrumenter.ForSingleType {

public SnsClientInstrumentation() {
super("sns", "aws-sdk");
}

@Override
public String instrumentedType() {
return "com.amazonaws.handlers.HandlerChainFactory";
}

@Override
public void methodAdvice(MethodTransformer transformer) {
transformer.applyAdvice(
isMethod().and(named("newRequestHandler2Chain")),
SnsClientInstrumentation.class.getName() + "$HandlerChainAdvice");
}

@Override
public String[] helperClassNames() {
return new String[] {packageName + ".SnsInterceptor", packageName + ".TextMapInjectAdapter"};
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
"com.amazonaws.AmazonWebServiceRequest",
"datadog.trace.bootstrap.instrumentation.api.AgentSpan");
}

public static class HandlerChainAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void addHandler(@Advice.Return final List<RequestHandler2> handlers) {
for (RequestHandler2 interceptor : handlers) {
if (interceptor instanceof SnsInterceptor) {
return; // list already has our interceptor, return to builder
}
}
handlers.add(
new SnsInterceptor(
InstrumentationContext.get(
"com.amazonaws.AmazonWebServiceRequest",
"datadog.trace.bootstrap.instrumentation.api.AgentSpan")));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package datadog.trace.instrumentation.aws.v1.sns;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.instrumentation.aws.v1.sns.TextMapInjectAdapter.SETTER;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.sns.model.MessageAttributeValue;
import com.amazonaws.services.sns.model.PublishBatchRequest;
import com.amazonaws.services.sns.model.PublishBatchRequestEntry;
import com.amazonaws.services.sns.model.PublishRequest;
import datadog.trace.api.TracePropagationStyle;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class SnsInterceptor extends RequestHandler2 {

private final ContextStore<AmazonWebServiceRequest, AgentSpan> contextStore;

public SnsInterceptor(ContextStore<AmazonWebServiceRequest, AgentSpan> contextStore) {
this.contextStore = contextStore;
}

private ByteBuffer getMessageAttributeValueToInject(AmazonWebServiceRequest request) {
final AgentSpan span = newSpan(request);
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append("}");
return ByteBuffer.wrap(jsonBuilder.toString().getBytes(StandardCharsets.UTF_8));
}

@Override
public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) {
// Injecting the trace context into SNS messageAttributes.
if (request instanceof PublishRequest) {
PublishRequest pRequest = (PublishRequest) request;
// note: modifying message attributes has to be done before marshalling, otherwise the changes
// are not reflected in the actual request (and the MD5 check on send will fail).
Map<String, MessageAttributeValue> messageAttributes = pRequest.getMessageAttributes();
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
// the limit still applies here
if (messageAttributes.size() < 10) {
messageAttributes.put(
"_datadog",
new MessageAttributeValue()
.withDataType(
"Binary") // Use Binary since SNS subscription filter policies fail silently
// with JSON strings
// https://github.com/DataDog/datadog-lambda-js/pull/269
.withBinaryValue(this.getMessageAttributeValueToInject(request)));
}
} else if (request instanceof PublishBatchRequest) {
PublishBatchRequest pmbRequest = (PublishBatchRequest) request;
final ByteBuffer bytebuffer = this.getMessageAttributeValueToInject(request);
for (PublishBatchRequestEntry entry : pmbRequest.getPublishBatchRequestEntries()) {
Map<String, MessageAttributeValue> messageAttributes = entry.getMessageAttributes();
if (messageAttributes.size() < 10) {
messageAttributes.put(
"_datadog",
new MessageAttributeValue().withDataType("Binary").withBinaryValue(bytebuffer));
}
}
}
return request;
}

private AgentSpan newSpan(AmazonWebServiceRequest request) {
final AgentSpan span = AgentTracer.startSpan("aws.sns.send");
joeyzhao2018 marked this conversation as resolved.
Show resolved Hide resolved
// pass the span to TracingRequestHandler in the sdk instrumentation where it'll be enriched &
// activated
contextStore.put(request, span);
return span;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package datadog.trace.instrumentation.aws.v1.sns;

import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;

public class TextMapInjectAdapter implements AgentPropagation.Setter<StringBuilder> {

public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter();

@Override
public void set(final StringBuilder builder, final String key, final String value) {
builder.append("\"").append(key).append("\":\"").append(value).append("\",");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.sns.AmazonSNSClient
import com.amazonaws.services.sns.AmazonSNSClientBuilder
import datadog.trace.agent.test.naming.VersionedNamingTestBase
import datadog.trace.agent.test.utils.TraceUtils
import datadog.trace.api.DDSpanTypes
import datadog.trace.api.config.GeneralConfig
import datadog.trace.bootstrap.instrumentation.api.Tags
import org.testcontainers.utility.DockerImageName
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.QueueAttributeName
import spock.lang.Shared
import groovy.json.JsonSlurper

import java.time.Duration
import org.testcontainers.containers.GenericContainer
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan

abstract class SnsClientTest extends VersionedNamingTestBase {

static final LOCALSTACK = new GenericContainer(DockerImageName.parse("localstack/localstack"))
.withExposedPorts(4566) // Default LocalStack port
.withEnv("SERVICES", "sns,sqs") // Enable SNS and SQS service
.withReuse(true)
.withStartupTimeout(Duration.ofSeconds(120))

@Shared AmazonSNSClient snsClient
@Shared SqsClient sqsClient

@Shared String testQueueURL
@Shared String testQueueARN
@Shared String testTopicARN


def setupSpec() {
LOCALSTACK.start()
def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566)
snsClient = AmazonSNSClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endPoint, "us-east-1"))
.withCredentials( new AWSStaticCredentialsProvider(new BasicAWSCredentials("test", "test")))
.build()
sqsClient = SqsClient.builder()
.endpointOverride(URI.create(endPoint))
.region(Region.of("us-east-1"))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials.create("test", "test")))
.build()
testQueueURL = sqsClient.createQueue { it.queueName("testqueue") }.queueUrl()
testQueueARN = sqsClient.getQueueAttributes {it.queueUrl(testQueueURL).attributeNames(QueueAttributeName.QUEUE_ARN)}.attributes().get(QueueAttributeName.QUEUE_ARN)
testTopicARN = snsClient.createTopic("testtopic").topicArn
snsClient.subscribe(testTopicARN, "sqs", testQueueARN)
}

def cleanupSpec() {
LOCALSTACK.stop()
}

@Override
protected void configurePreAgent() {
super.configurePreAgent()
// Set a service name that gets sorted early with SORT_BY_NAMES
injectSysConfig(GeneralConfig.SERVICE_NAME, "A-service")
}

@Override
String operation() {
null
}

@Override
String service() {
null
}

abstract String expectedOperation(String awsService, String awsOperation)
abstract String expectedService(String awsService, String awsOperation)

def "trace details propagated via SNS system message attributes"() {
when:
TEST_WRITER.clear()
TraceUtils.runUnderTrace('parent', {
snsClient.publish(testTopicARN, 'sometext')
})

def message = sqsClient.receiveMessage {it.queueUrl(testQueueURL).waitTimeSeconds(3)}.messages().get(0)
def jsonSlurper = new JsonSlurper()
def messageBody = jsonSlurper.parseText(message.body())
def endPoint = "http://" + LOCALSTACK.getHost() + ":" + LOCALSTACK.getMappedPort(4566)

then:
def sendSpan
assertTraces(2) {
trace(2) {
basicSpan(it, "parent")
span {
serviceName expectedService("SNS", "Publish")
operationName expectedOperation("SNS", "Publish")
resourceName "SNS.Publish"
spanType DDSpanTypes.HTTP_CLIENT
errored false
measured true
childOf(span(0))
tags {
"$Tags.COMPONENT" "java-aws-sdk"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" endPoint+'/'
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" LOCALSTACK.getMappedPort(4566)
"$Tags.PEER_HOSTNAME" LOCALSTACK.getHost()
"aws.service" "AmazonSNS"
"aws_service" "sns"
"aws.endpoint" endPoint
"aws.operation" "PublishRequest"
"aws.agent" "java-aws-sdk"
"aws.topic.name" "testtopic"
"topicname" "testtopic"
defaultTags()
}
}
sendSpan = span(1)
}
trace(1) {
span {
serviceName expectedService("None", "http.post")
operationName expectedOperation("None", "http.post")
resourceName "POST /"
spanType DDSpanTypes.HTTP_CLIENT
errored false
measured true
tags {
"$Tags.COMPONENT" "apache-httpclient"
"$Tags.SPAN_KIND" Tags.SPAN_KIND_CLIENT
"$Tags.HTTP_URL" endPoint+'/'
"$Tags.HTTP_METHOD" "POST"
"$Tags.HTTP_STATUS" 200
"$Tags.PEER_PORT" LOCALSTACK.getMappedPort(4566)
"$Tags.PEER_HOSTNAME" LOCALSTACK.getHost()
defaultTags(true)
}
}
}
}

and:
messageBody["Message"] == "sometext"
String base64EncodedString = messageBody["MessageAttributes"]["_datadog"]["Value"]
byte[] decodedBytes = base64EncodedString.decodeBase64()
String decodedString = new String(decodedBytes, "UTF-8")
JsonSlurper slurper = new JsonSlurper()
Map traceContextInJson = slurper.parseText(decodedString)
traceContextInJson['x-datadog-trace-id'] == sendSpan.traceId.toString()
traceContextInJson['x-datadog-parent-id'] == sendSpan.spanId.toString()
traceContextInJson['x-datadog-sampling-priority'] == "1"
}
}

class SnsClientV0Test extends SnsClientTest {

@Override
String expectedOperation(String awsService, String awsOperation) {
if ("SNS" == awsService) {
return "aws.http"
}
return "http.request"
}

@Override
String expectedService(String awsService, String awsOperation) {
if ("SNS" == awsService) {
return "sns"
}
return "A-service"
}

@Override
int version() {
0
}
}

class SnsClientV1ForkedTest extends SnsClientTest {

@Override
String expectedOperation(String awsService, String awsOperation) {
if (awsService == "SNS" && awsOperation == "Publish") {
return "aws.sns.send"
}
return "http.client.request"
}

@Override
String expectedService(String awsService, String awsOperation) {
"A-service"
}

@Override
int version() {
1
}
}

Loading
Loading