Skip to content

Commit

Permalink
AWS Payload observability PoC
Browse files Browse the repository at this point in the history
  • Loading branch information
ygree committed Jul 12, 2024
1 parent 8daf509 commit c9a799d
Show file tree
Hide file tree
Showing 12 changed files with 663 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ public AbstractAwsClientInstrumentation() {
@Override
public String[] helperClassNames() {
return new String[] {
packageName + ".AwsSdkClientDecorator", packageName + ".TracingExecutionInterceptor"
packageName + ".AwsSdkClientDecorator",
packageName + ".TracingExecutionInterceptor",
packageName + ".ResponseBodyStreamWrapper",
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString;
import datadog.trace.bootstrap.instrumentation.decorator.HttpClientDecorator;
import datadog.trace.core.datastreams.TagsProcessor;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
import java.time.Instant;
import java.util.Collections;
Expand All @@ -41,6 +43,7 @@
import software.amazon.awssdk.core.interceptor.ExecutionAttribute;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;

Expand Down Expand Up @@ -111,11 +114,17 @@ public AgentSpan onSdkRequest(
final AgentSpan span,
final SdkRequest request,
final SdkHttpRequest httpRequest,
final ExecutionAttributes attributes) {
final ExecutionAttributes attributes,
Optional<RequestBody> requestBody) {
final String awsServiceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
final String awsOperationName = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
onOperation(span, awsServiceName, awsOperationName);

if (requestBody.isPresent()) {
InputStream body = requestBody.get().contentStreamProvider().newStream();
AgentTracer.get().addTagsFromRequestBody(span, body);
}

// S3
request.getValueForField("Bucket", String.class).ifPresent(name -> setBucketName(span, name));
if ("s3".equalsIgnoreCase(awsServiceName) && span.traceConfig().isDataStreamsEnabled()) {
Expand Down Expand Up @@ -295,7 +304,18 @@ public AgentSpan onSdkResponse(
final AgentSpan span,
final SdkResponse response,
final SdkHttpResponse httpResponse,
final ExecutionAttributes attributes) {
final ExecutionAttributes attributes,
Optional<InputStream> responseBody) {

if (responseBody.isPresent()) {
InputStream body = responseBody.get();
if (body instanceof ResponseBodyStreamWrapper) {
Optional<ByteArrayInputStream> bodyStream =
((ResponseBodyStreamWrapper) body).toByteArrayInputStream();
bodyStream.ifPresent(bs -> AgentTracer.get().addTagsFromResponseBody(span, bs));
}
}

if (response instanceof AwsResponse) {
span.setTag(
InstrumentationTags.AWS_REQUEST_ID,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package datadog.trace.instrumentation.aws.v2;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Optional;

/**
* Buffers stream data that starts with '{' character meaning it is a JSON object. This is used to
* read the response body from AWS SDK after it has been read by the SDK.
*/
public class ResponseBodyStreamWrapper extends InputStream {

private final InputStream originalStream;
private ByteArrayOutputStream buffer;
private boolean hasBeenRead = false;

public ResponseBodyStreamWrapper(InputStream is) {
this.originalStream = is;
}

@Override
public int read() throws IOException {
int value = originalStream.read();

if (!hasBeenRead) {
if (value == '{') {
// Start buffering only if it starts with '{' to avoid buffering non-json data
buffer = new ByteArrayOutputStream();
}
hasBeenRead = true;
}

if (buffer != null) {
buffer.write(value);
}

return value;
}

public Optional<ByteArrayInputStream> toByteArrayInputStream() {
if (buffer == null) {
return Optional.empty();
}
return Optional.of(new ByteArrayInputStream(buffer.toByteArray()));
}

@Override
public void close() throws IOException {
originalStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import datadog.trace.bootstrap.InstanceStore;
import datadog.trace.bootstrap.instrumentation.api.AgentScope;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import java.io.InputStream;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -58,7 +59,12 @@ public void afterMarshalling(
if (span != null) {
try (AgentScope ignored = activateSpan(span)) {
DECORATE.onRequest(span, context.httpRequest());
DECORATE.onSdkRequest(span, context.request(), context.httpRequest(), executionAttributes);
DECORATE.onSdkRequest(
span,
context.request(),
context.httpRequest(),
executionAttributes,
context.requestBody());
}
}
}
Expand Down Expand Up @@ -97,14 +103,28 @@ public void beforeTransmission(
}
}

public Optional<InputStream> modifyHttpResponseContent(
Context.ModifyHttpResponse context, ExecutionAttributes executionAttributes) {
// Wrap the response so that it can be read again for tag extraction.
// TODO wrap only if tag extraction is enabled
return ExecutionInterceptor.super
.modifyHttpResponseContent(context, executionAttributes)
.map(ResponseBodyStreamWrapper::new);
}

@Override
public void afterExecution(
final Context.AfterExecution context, final ExecutionAttributes executionAttributes) {
final AgentSpan span = executionAttributes.getAttribute(SPAN_ATTRIBUTE);
if (span != null) {
executionAttributes.putAttribute(SPAN_ATTRIBUTE, null);
// Call onResponse on both types of responses:
DECORATE.onSdkResponse(span, context.response(), context.httpResponse(), executionAttributes);
DECORATE.onSdkResponse(
span,
context.response(),
context.httpResponse(),
executionAttributes,
context.responseBody());
DECORATE.onResponse(span, context.httpResponse());
DECORATE.beforeFinish(span);
span.finish();
Expand All @@ -127,7 +147,8 @@ public void onExecutionFailure(
Optional<SdkResponse> responseOpt = context.response();
if (responseOpt.isPresent()) {
SdkResponse response = responseOpt.get();
DECORATE.onSdkResponse(span, response, response.sdkHttpResponse(), executionAttributes);
DECORATE.onSdkResponse(
span, response, response.sdkHttpResponse(), executionAttributes, Optional.empty());
DECORATE.onResponse(span, response.sdkHttpResponse());
if (span.isError()) {
DECORATE.onError(span, context.exception());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,5 +135,10 @@ public final class TracerConfig {

public static final String TRACE_POST_PROCESSING_TIMEOUT = "trace.post-processing.timeout";

public static final String TRACE_CLOUD_REQUEST_PAYLOAD_TAGGING =
"trace.cloud.request.payload.tagging";
public static final String TRACE_CLOUD_RESPONSE_PAYLOAD_TAGGING =
"trace.cloud.response.payload.tagging";

private TracerConfig() {}
}
1 change: 1 addition & 0 deletions dd-trace-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ dependencies {
implementation libs.slf4j
implementation libs.moshi
implementation libs.jctools
implementation libs.jsonPath

implementation group: 'com.datadoghq', name: 'sketches-java', version: '0.8.3'

Expand Down
41 changes: 41 additions & 0 deletions dd-trace-core/src/main/java/datadog/trace/core/CoreTracer.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@
import datadog.trace.core.taginterceptor.RuleFlags;
import datadog.trace.core.taginterceptor.TagInterceptor;
import datadog.trace.lambda.LambdaHandler;
import datadog.trace.payloadtags.JsonToTags;
import datadog.trace.relocate.api.RatelimitedLogger;
import datadog.trace.util.AgentTaskScheduler;
import java.io.IOException;
import java.io.InputStream;
import java.lang.ref.WeakReference;
import java.math.BigInteger;
import java.util.ArrayList;
Expand Down Expand Up @@ -220,6 +222,9 @@ public static CoreTracerBuilder builder() {

private final PropagationTags.Factory propagationTagsFactory;

private final JsonToTags requestJsonToTags;
private final JsonToTags responseJsonToTags;

@Override
public ConfigSnapshot captureTraceConfig() {
return dynamicConfig.captureTraceConfig();
Expand Down Expand Up @@ -761,6 +766,20 @@ private CoreTracer(
} else {
this.localRootSpanTags = localRootSpanTags;
}

requestJsonToTags =
new JsonToTags.Builder()
// TODO add common expansion / redaction rules
.parseRedactionRules(config.getCloudRequestPayloadTagging())
.tagPrefix("aws.request.body")
.build();

responseJsonToTags =
new JsonToTags.Builder()
// TODO add common expansion / redaction rules
.parseRedactionRules(config.getCloudResponsePayloadTagging())
.tagPrefix("aws.response.body")
.build();
}

/** Used by AgentTestRunner to inject configuration into the test tracer. */
Expand Down Expand Up @@ -1180,6 +1199,28 @@ public TraceSegment getTraceSegment() {
return null;
}

@Override
public void addTagsFromResponseBody(AgentSpan span, InputStream body) {
if (responseJsonToTags != null) {
Map<String, Object> tags = responseJsonToTags.process(body);
setTags(span, tags);
}
}

@Override
public void addTagsFromRequestBody(AgentSpan span, InputStream body) {
if (requestJsonToTags != null) {
Map<String, Object> tags = requestJsonToTags.process(body);
setTags(span, tags);
}
}

private static void setTags(AgentSpan span, Map<String, Object> tags) {
for (Map.Entry<String, Object> entry : tags.entrySet()) {
span.setTag(entry.getKey(), entry.getValue());
}
}

public void addTracerReportToFlare(ZipOutputStream zip) throws IOException {
TracerFlare.addText(zip, "tracer_health.txt", healthMetrics.summary());
TracerFlare.addText(zip, "span_metrics.txt", SpanMetricRegistry.getInstance().summary());
Expand Down
Loading

0 comments on commit c9a799d

Please sign in to comment.