Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DSM support to SQS v1 #6259

Merged
merged 16 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datadog.trace.instrumentation.aws.v0;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import datadog.trace.api.Functions;
import datadog.trace.api.cache.DDCache;
Expand All @@ -25,17 +26,24 @@ public class AwsNameCache {
AMAZON_PATTERN.matcher(String.valueOf(serviceName)).replaceAll("").trim()));

public static CharSequence spanName(final Request<?> awsRequest) {
return getFromCache(getQualifiedName(awsRequest).toString(), awsRequest.getServiceName());
}

public static CharSequence spanName(final AmazonWebServiceRequest request, String serviceName) {
return getFromCache(
CLASS_NAME_CACHE.getQualifiedName(request.getClass(), serviceName).toString(), serviceName);
}

private static CharSequence getFromCache(String cacheKey, String serviceName) {
return CACHE.computeIfAbsent(
getQualifiedName(awsRequest).toString(),
cacheKey,
key ->
UTF8BytesString.create(
SpanNaming.instance()
.namingSchema()
.cloud()
.operationForRequest(
"aws",
AMAZON_PATTERN.matcher(awsRequest.getServiceName()).replaceAll("").trim(),
key)));
"aws", AMAZON_PATTERN.matcher(serviceName).replaceAll("").trim(), key)));
}

public static CharSequence getQualifiedName(final Request<?> awsRequest) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package datadog.trace.instrumentation.aws.v0;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.RequestHandler2;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
Expand Down Expand Up @@ -42,8 +43,10 @@ public String[] helperClassNames() {

@Override
public Map<String, String> contextStore() {
return singletonMap(
"com.amazonaws.services.sqs.model.ReceiveMessageResult", "java.lang.String");
Map<String, String> map = new java.util.HashMap<>();
map.put("com.amazonaws.services.sqs.model.ReceiveMessageResult", "java.lang.String");
map.put(AmazonWebServiceRequest.class.getName(), AgentSpan.class.getName());
vandonr marked this conversation as resolved.
Show resolved Hide resolved
return map;
}

@Override
Expand All @@ -64,7 +67,8 @@ public static void addHandler(@Advice.Return final List<RequestHandler2> handler
handlers.add(
new TracingRequestHandler(
InstrumentationContext.get(
"com.amazonaws.services.sqs.model.ReceiveMessageResult", "java.lang.String")));
"com.amazonaws.services.sqs.model.ReceiveMessageResult", "java.lang.String"),
InstrumentationContext.get(AmazonWebServiceRequest.class, AgentSpan.class)));
vandonr marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,25 +38,32 @@ public class TracingRequestHandler extends RequestHandler2 {
private static final Logger log = LoggerFactory.getLogger(TracingRequestHandler.class);

private final ContextStore<Object, String> responseQueueStore;
private final ContextStore<AmazonWebServiceRequest, AgentSpan> requestSpanStore;

public TracingRequestHandler(ContextStore<Object, String> responseQueueStore) {
public TracingRequestHandler(
ContextStore<Object, String> responseQueueStore,
ContextStore<AmazonWebServiceRequest, AgentSpan> requestSpanStore) {
this.responseQueueStore = responseQueueStore;
}

@Override
public AmazonWebServiceRequest beforeMarshalling(final AmazonWebServiceRequest request) {
return request;
this.requestSpanStore = requestSpanStore;
}

@Override
public void beforeRequest(final Request<?> request) {
final AgentSpan span;
AgentSpan span;
if (!AWS_LEGACY_TRACING && isPollingRequest(request.getOriginalRequest())) {
// SQS messages spans are created by aws-java-sqs-1.0 - replace client scope with no-op,
// so we can tell when receive call is complete without affecting the rest of the trace
span = noopSpan();
} else {
span = startSpan(AwsNameCache.spanName(request));
span = requestSpanStore.remove(request.getOriginalRequest());
if (span != null) {
// we'll land here for SQS send requests, where we create the span in SqsInterceptor to
// inject DSM tags.
span.setOperationName(AwsNameCache.spanName(request));
} else {
// this is the most common code path
span = startSpan(AwsNameCache.spanName(request));
}
DECORATE.afterStart(span);
DECORATE.onRequest(span, request);
request.addHandlerContext(SPAN_CONTEXT_KEY, span);
Expand Down Expand Up @@ -124,7 +131,12 @@ public void afterResponse(final Request<?> request, final Response<?> response)

@Override
public void afterError(final Request<?> request, final Response<?> response, final Exception e) {
final AgentSpan span = request.getHandlerContext(SPAN_CONTEXT_KEY);
AgentSpan span = request.getHandlerContext(SPAN_CONTEXT_KEY);
if (span == null) {
// also try getting the span from the context store, if the error happened early
span = requestSpanStore.remove(request.getOriginalRequest());
}
Comment on lines +135 to +138
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this after a discussion with java folks to try to avoid leaking the spans that are created in beforeMarshalling


if (span != null) {
request.addHandlerContext(SPAN_CONTEXT_KEY, null);
if (response != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.trace.bootstrap.instrumentation.api.PathwayContext.DATADOG_KEY;

import com.amazonaws.services.sqs.model.MessageAttributeValue;
import datadog.trace.bootstrap.instrumentation.api.AgentPropagation;
import java.util.Map;

public class MessageAttributeInjector
implements AgentPropagation.Setter<Map<String, MessageAttributeValue>> {

public static final MessageAttributeInjector SETTER = new MessageAttributeInjector();

@Override
public void set(
final Map<String, MessageAttributeValue> carrier, final String key, final String value) {
if (carrier.size() < 10 && !carrier.containsKey(DATADOG_KEY)) {
String jsonPathway = String.format("{\"%s\": \"%s\"}", key, value);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don’t you fear some key or value value might break the JSON? Like some"thing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's a good point, but also this line of code was taken from


so... idk, maybe I can fix both at the same time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the dotnet implementation, it's implemented that way as well.
https://github.com/DataDog/dd-trace-dotnet/blob/e8735bc26f63534356e7f40b650c5e91a43b111a/tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/AWS/SQS/ContextPropagation.cs#L84
I'd say we keep it like this, and it may be worth it to fix it if we get an escalation for it.
Also, AWS is usually pretty restrictive on what it accepts for names, so we have that on our side.

carrier.put(
DATADOG_KEY,
new MessageAttributeValue().withDataType("String").withStringValue(jsonPathway));
}
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote this by mixing the V2 instrumentation and what was done in the aws sdk v1 instrumentation

Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.RequestHandler2;
import com.google.auto.service.AutoService;
import datadog.trace.agent.tooling.Instrumenter;
import datadog.trace.bootstrap.InstrumentationContext;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;

/** AWS SDK v1 instrumentation */
@AutoService(Instrumenter.class)
public final class SqsClientInstrumentation extends Instrumenter.Tracing
implements Instrumenter.ForSingleType {
private static final String INSTRUMENTATION_NAME = "aws-sdk";

public SqsClientInstrumentation() {
super(INSTRUMENTATION_NAME);
vandonr marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public String instrumentedType() {
return "com.amazonaws.handlers.HandlerChainFactory";
}

@Override
public void adviceTransformations(AdviceTransformation transformation) {
transformation.applyAdvice(
isMethod().and(named("newRequestHandler2Chain")),
SqsClientInstrumentation.class.getName() + "$HandlerChainAdvice");
}

@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".SqsInterceptor", packageName + ".MessageAttributeInjector"
};
}

@Override
public Map<String, String> contextStore() {
return Collections.singletonMap(
AmazonWebServiceRequest.class.getName(), AgentSpan.class.getName());
vandonr marked this conversation as resolved.
Show resolved Hide resolved
}

public static class HandlerChainAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void addHandler(@Advice.Return final List<RequestHandler2> handlers) {
for (RequestHandler2 interceptor : handlers) {
if (interceptor instanceof SqsInterceptor) {
return; // list already has our interceptor, return to builder
}
}
handlers.add(
vandonr marked this conversation as resolved.
Show resolved Hide resolved
new SqsInterceptor(
InstrumentationContext.get(AmazonWebServiceRequest.class, AgentSpan.class)));
}
}
}
vandonr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package datadog.trace.instrumentation.aws.v1.sqs;

import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.PathwayContext.DATADOG_KEY;
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_OUT;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.aws.v1.sqs.MessageAttributeInjector.SETTER;

import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import datadog.trace.bootstrap.ContextStore;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.util.LinkedHashMap;

public class SqsInterceptor extends RequestHandler2 {

private final ContextStore<AmazonWebServiceRequest, AgentSpan> contextStore;

public SqsInterceptor(ContextStore<AmazonWebServiceRequest, AgentSpan> contextStore) {
this.contextStore = contextStore;
}

@Override
public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) {
vandonr marked this conversation as resolved.
Show resolved Hide resolved
if (request instanceof SendMessageRequest) {
SendMessageRequest smRequest = (SendMessageRequest) request;

String queueUrl = smRequest.getQueueUrl();
if (queueUrl == null) return request;

LinkedHashMap<String, String> sortedTags = getTags(queueUrl);

final AgentSpan span = newSpan(request);
// note: modifying message attributes has to be done before marshalling, otherwise the changes
// are not reflected in the actual request (and the MD5 check on send will fail).
propagate().injectPathwayContext(span, smRequest.getMessageAttributes(), SETTER, sortedTags);
} else if (request instanceof SendMessageBatchRequest) {
SendMessageBatchRequest smbRequest = (SendMessageBatchRequest) request;

String queueUrl = smbRequest.getQueueUrl();
if (queueUrl == null) return request;

LinkedHashMap<String, String> sortedTags = getTags(queueUrl);

final AgentSpan span = newSpan(request);
if (span.traceConfig()
.isDataStreamsEnabled()) { // avoid iterating on all entries if it's to noop on each
for (SendMessageBatchRequestEntry entry : smbRequest.getEntries()) {
propagate().injectPathwayContext(span, entry.getMessageAttributes(), SETTER, sortedTags);
}
}
} else if (request instanceof ReceiveMessageRequest) {
ReceiveMessageRequest rmRequest = (ReceiveMessageRequest) request;
if (rmRequest.getMessageAttributeNames().size() < 10
&& !rmRequest.getMessageAttributeNames().contains(DATADOG_KEY)) {
rmRequest.getMessageAttributeNames().add(DATADOG_KEY);
}
}
return request;
}

private AgentSpan newSpan(AmazonWebServiceRequest request) {
final AgentSpan span = startSpan("aws.sqs.send");
// pass the span to TracingRequestHandler in the sdk instrumentation where it'll be enriched &
// activated
contextStore.put(request, span);
return span;
}

private static LinkedHashMap<String, String> getTags(String queueUrl) {
LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_OUT);
sortedTags.put(TOPIC_TAG, urlFileName(queueUrl));
sortedTags.put(TYPE_TAG, "sqs");
return sortedTags;
}
}
vandonr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.closePrevious;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.propagate;
import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan;
import static datadog.trace.bootstrap.instrumentation.api.URIUtils.urlFileName;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_IN;
import static datadog.trace.core.datastreams.TagsProcessor.DIRECTION_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TOPIC_TAG;
import static datadog.trace.core.datastreams.TagsProcessor.TYPE_TAG;
import static datadog.trace.instrumentation.aws.v1.sqs.MessageExtractAdapter.GETTER;
import static datadog.trace.instrumentation.aws.v1.sqs.SqsDecorator.BROKER_DECORATE;
import static datadog.trace.instrumentation.aws.v1.sqs.SqsDecorator.CONSUMER_DECORATE;
Expand All @@ -15,7 +20,9 @@
import com.amazonaws.services.sqs.model.Message;
import datadog.trace.api.Config;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import java.util.Iterator;
import java.util.LinkedHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -77,6 +84,13 @@ protected void startNewMessageSpan(Message message) {
batchContext = spanContext;
}
AgentSpan span = startSpan(SQS_INBOUND_OPERATION, batchContext);

LinkedHashMap<String, String> sortedTags = new LinkedHashMap<>();
sortedTags.put(DIRECTION_TAG, DIRECTION_IN);
sortedTags.put(TOPIC_TAG, urlFileName(queueUrl));
sortedTags.put(TYPE_TAG, "sqs");
AgentTracer.get().getDataStreamsMonitoring().setCheckpoint(span, sortedTags, 0, 0);

CONSUMER_DECORATE.afterStart(span);
CONSUMER_DECORATE.onConsume(span, queueUrl);
activateNext(span);
Expand Down
Loading
Loading