Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
joeyzhao2018 committed May 31, 2024
1 parent 42a0ee3 commit 345f066
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,55 +20,58 @@
public class SnsInterceptor extends RequestHandler2 {

private final ContextStore<AmazonWebServiceRequest, AgentSpan> contextStore;
private ByteBuffer messageAttributeValueToInject;

public SnsInterceptor(ContextStore<AmazonWebServiceRequest, AgentSpan> contextStore) {
this.contextStore = contextStore;
}

private ByteBuffer getMessageAttributeValueToInject(AmazonWebServiceRequest request) {
if (this.messageAttributeValueToInject == null) {
final AgentSpan span = newSpan(request);
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append("}");
this.messageAttributeValueToInject =
ByteBuffer.wrap(jsonBuilder.toString().getBytes(StandardCharsets.UTF_8));
}

return this.messageAttributeValueToInject;
}

@Override
public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) {
// Injecting the trace context into SNS messageAttributes.
if (request instanceof PublishRequest) {
PublishRequest pRequest = (PublishRequest) request;
final AgentSpan span = newSpan(request);
// note: modifying message attributes has to be done before marshalling, otherwise the changes
// are not reflected in the actual request (and the MD5 check on send will fail).
Map<String, MessageAttributeValue> messageAttributes = pRequest.getMessageAttributes();
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
// the limit still applies here
if (messageAttributes.size() < 10) {
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append("}");
messageAttributes.put(
"_datadog",
new MessageAttributeValue()
.withDataType(
"Binary") // Use Binary since SNS subscription filter policies fail silently
// with JSON strings
// https://github.com/DataDog/datadog-lambda-js/pull/269
.withBinaryValue(
ByteBuffer.wrap(jsonBuilder.toString().getBytes(StandardCharsets.UTF_8))));
.withBinaryValue(this.getMessageAttributeValueToInject(request)));
}
} else if (request instanceof PublishBatchRequest) {
PublishBatchRequest pmbRequest = (PublishBatchRequest) request;

final AgentSpan span = newSpan(request);
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append("}");
ByteBuffer binaryValue =
ByteBuffer.wrap(jsonBuilder.toString().getBytes(StandardCharsets.UTF_8));
for (PublishBatchRequestEntry entry : pmbRequest.getPublishBatchRequestEntries()) {
Map<String, MessageAttributeValue> messageAttributes = entry.getMessageAttributes();
if (messageAttributes.size() < 10) {
messageAttributes.put(
"_datadog",
new MessageAttributeValue().withDataType("Binary").withBinaryValue(binaryValue));
new MessageAttributeValue()
.withDataType("Binary")
.withBinaryValue(this.getMessageAttributeValueToInject(request)));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,21 @@ public class SnsInterceptor implements ExecutionInterceptor {
public static final ExecutionAttribute<AgentSpan> SPAN_ATTRIBUTE =
InstanceStore.of(ExecutionAttribute.class)
.putIfAbsent("DatadogSpan", () -> new ExecutionAttribute<>("DatadogSpan"));
private SdkBytes messageAttributeValueToInject;

private SdkBytes getMessageAttributeValueToInject(ExecutionAttributes executionAttributes) {
if (this.messageAttributeValueToInject == null) {
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append("}");
this.messageAttributeValueToInject =
SdkBytes.fromString(jsonBuilder.toString(), StandardCharsets.UTF_8);
}
return this.messageAttributeValueToInject;
}

public SnsInterceptor() {}

Expand All @@ -37,41 +52,30 @@ public SdkRequest modifyRequest(
PublishRequest request = (PublishRequest) context.request();
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
// 10 messageAttributes is a limit from SQS, which is often used as a subscriber, therefore
// the limit still applies here
if (messageAttributes.size() < 10) {
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append("}");

messageAttributes.put(
"_datadog", // Use Binary since SNS subscription filter policies fail silently with JSON
// strings https://github.com/DataDog/datadog-lambda-js/pull/269
MessageAttributeValue.builder()
.dataType("Binary")
.binaryValue(SdkBytes.fromString(jsonBuilder.toString(), StandardCharsets.UTF_8))
.binaryValue(this.getMessageAttributeValueToInject(executionAttributes))
.build());
}
return request.toBuilder().messageAttributes(messageAttributes).build();
} else if (context.request() instanceof PublishBatchRequest) {
PublishBatchRequest request = (PublishBatchRequest) context.request();
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
ArrayList<PublishBatchRequestEntry> entries = new ArrayList<>();
StringBuilder jsonBuilder = new StringBuilder();
jsonBuilder.append("{");
propagate().inject(span, jsonBuilder, SETTER, TracePropagationStyle.DATADOG);
jsonBuilder.setLength(jsonBuilder.length() - 1); // Remove the last comma
jsonBuilder.append("}");
SdkBytes binaryValue = SdkBytes.fromString(jsonBuilder.toString(), StandardCharsets.UTF_8);
for (PublishBatchRequestEntry entry : request.publishBatchRequestEntries()) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());
messageAttributes.put(
"_datadog",
MessageAttributeValue.builder().dataType("Binary").binaryValue(binaryValue).build());
MessageAttributeValue.builder()
.dataType("Binary")
.binaryValue(this.getMessageAttributeValueToInject(executionAttributes))
.build());
entries.add(entry.toBuilder().messageAttributes(messageAttributes).build());
}
return request.toBuilder().publishBatchRequestEntries(entries).build();
Expand Down

0 comments on commit 345f066

Please sign in to comment.