Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix OpenTelemetry context propagation and duplication issues #25012

Merged
merged 6 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,13 @@
<Bug pattern="DE_MIGHT_IGNORE"/>
</Match>

<!-- Exception in AutoCloseable.close is ignored -->
<Match>
<Class name="com.azure.core.tracing.opentelemetry.OpenTelemetryHttpPolicy$ScalarPropagatingMono"/>
<Method name="subscribe"/>
<Bug pattern="DE_MIGHT_IGNORE"/>
</Match>

<!-- Exception is ignored by design which indicate that non-parsable id -->
<Match>
<Class name="com.azure.cosmos.implementation.ResourceId"/>
Expand Down
47 changes: 25 additions & 22 deletions sdk/core/azure-core-tracing-opentelemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ The following sections provides examples of using the azure-core-tracing-opentel

- Synchronously create a secret using [azure-security-keyvault-secrets][azure-security-keyvault-secrets] with tracing enabled.

Users can additionally pass the value of the current tracing span to the SDKs using key **PARENT_SPAN_KEY** on the [Context][context] parameter of the calling method.
The plugin package creates a root span to encapsulate all the child spans created in the calling methods when no parent span is passed in the context.
The plugin package creates a logical span representing public API call to encapsulate all the underlying HTTP calls. By default OpenTelemetry
`Context.current()` will be used as a parent context - check out [OpenTelemetry documentation](https://opentelemetry.io/docs/java/manual_instrumentation/#tracing) for more info.
Users can *optionally* pass the instance of `io.opentelemetry.context.Context` to the SDKs using key **PARENT_TRACE_CONTEXT_KEY** on the [Context][context] parameter of the calling method
to provide explicit parent context.
This [sample][sample_key_vault] provides an example when no user parent span is passed.

```java
Expand All @@ -59,31 +61,32 @@ public static void main(String[] args) {
}

public static void doClientWork() {
SecretClient client = new SecretClientBuilder()
.endpoint("<your-vault-url>")
.credential(new DefaultAzureCredentialBuilder().build())
.buildClient();

Span span = TRACER.spanBuilder("user-parent-span").startSpan();
try (Scope scope = TRACER.withSpan(span)) {

// Thread bound (sync) calls will automatically pick up the parent span and you don't need to pass it explicitly.
secretClient.setSecret(new Secret("secret_name", "secret_value));

// Optionally, to specify the context you can use
// final Context traceContext = new Context(PARENT_SPAN_KEY, span);
// secretClient.setSecretWithResponse(new Secret("secret_name", "secret_value", traceContext));
} finally {
span.end();
}
SecretClient client = new SecretClientBuilder()
.endpoint("<your-vault-url>")
.credential(new DefaultAzureCredentialBuilder().build())
.buildClient();

Span span = TRACER.spanBuilder("user-span").startSpan();
try (Scope scope = TRACER.withSpan(span)) {
// Thread bound (sync) calls will automatically pick up the parent span and you don't need to pass it explicitly.
secretClient.setSecret(new Secret("secret_name", "secret_value"));
} finally {
span.end();
}

// alternatively, you can pass context explicitly
// Span span = TRACER.spanBuilder("user-span").startSpan();
// Context traceContext = new Context(PARENT_TRACE_CONTEXT_KEY, io.opentelemetry.context.Context.current().with(span));
// secretClient.setSecretWithResponse(new Secret("secret_name", "secret_value"), traceContext);
// span.end();
}
```

### Using the plugin package with AMQP client libraries

Send a single event/message using [azure-messaging-eventhubs][azure-messaging-eventhubs] with tracing enabled.

Users can additionally pass the value of the current tracing span to the EventData object with key **PARENT_SPAN_KEY** on the [Context][context] object:
Users can additionally pass the value of the current tracing span to the EventData object with key **PARENT_TRACE_CONTEXT_KEY** on the [Context][context] object:

```java
// Get the Tracer Provider
Expand All @@ -95,10 +98,10 @@ private static void doClientWork() {
.connectionString(CONNECTION_STRING)
.buildProducerClient();

Span span = TRACER.spanBuilder("user-parent-span").startSpan();
Span span = TRACER.spanBuilder("user-span").startSpan();
try (Scope scope = TRACER.withSpan(span)) {
EventData event1 = new EventData("1".getBytes(UTF_8));
event1.addContext(PARENT_SPAN_KEY, span);
event1.addContext(PARENT_TRACE_CONTEXT_KEY, context.current());
lmolkova marked this conversation as resolved.
Show resolved Hide resolved

EventDataBatch eventDataBatch = producer.createBatch();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,34 @@
import com.azure.core.http.policy.AfterRetryPolicyProvider;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.tracing.opentelemetry.implementation.HttpTraceUtil;
import com.azure.core.tracing.opentelemetry.implementation.OpenTelemetrySpanSuppressionHelper;
import com.azure.core.util.CoreUtils;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapSetter;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.core.publisher.Signal;
import reactor.util.context.Context;
import reactor.util.context.ContextView;

import java.util.Optional;

import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.core.util.tracing.Tracer.DISABLE_TRACING_KEY;
import static com.azure.core.util.tracing.Tracer.PARENT_SPAN_KEY;
import static com.azure.core.util.tracing.Tracer.PARENT_TRACE_CONTEXT_KEY;

/**
* Pipeline policy that creates an OpenTelemetry span which traces the service request.
*/
public class OpenTelemetryHttpPolicy implements AfterRetryPolicyProvider, HttpPipelinePolicy {

/**
* @return a OpenTelemetry HTTP policy.
*/
Expand Down Expand Up @@ -77,7 +78,7 @@ public OpenTelemetryHttpPolicy() {

private static final String CLIENT_REQUEST_ID_HEADER = "x-ms-client-request-id";
private static final String CLIENT_REQUEST_ID_ATTRIBUTE = "requestId";

private static final String REACTOR_PARENT_TRACE_CONTEXT_KEY = "otel-context-key";

// This helper class implements W3C distributed tracing protocol and injects SpanContext into the outgoing http
// request
Expand All @@ -89,39 +90,41 @@ public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineN
return next.process();
}

io.opentelemetry.context.Context currentContext = io.opentelemetry.context.Context.current();
Span parentSpan = (Span) context.getData(PARENT_SPAN_KEY).orElse(Span.current());
HttpRequest request = context.getHttpRequest();
// OpenTelemetry reactor instrumentation needs a bit of help
// to pick up Azure SDK context. While we're working on explicit
// context propagation, ScalarPropagatingMono.INSTANCE is the workaround
return ScalarPropagatingMono.INSTANCE
.flatMap(ignored -> next.process())
.doOnEach(OpenTelemetryHttpPolicy::handleResponse)
.contextWrite(reactor.util.context.Context.of(REACTOR_PARENT_TRACE_CONTEXT_KEY, startSpan(context)));
}

private Context startSpan(HttpPipelineCallContext azContext) {
Context parentContext = getTraceContextOrCurrent(azContext);

// Build new child span representing this outgoing request
// provide sampling-relevant attributes (users make sampling decisions based on this)
HttpRequest request = azContext.getHttpRequest();

// Build new child span representing this outgoing request.
String methodName = request.getHttpMethod().toString();
SpanBuilder spanBuilder = tracer.spanBuilder("HTTP " + methodName)
Span span = tracer.spanBuilder("HTTP " + methodName)
.setAttribute(HTTP_METHOD, methodName)
.setAttribute(HTTP_URL, request.getUrl().toString())
.setParent(currentContext.with(parentSpan));

// A span's kind can be SERVER (incoming request) or CLIENT (outgoing request);
spanBuilder.setSpanKind(SpanKind.CLIENT);

// Starting the span makes the sampling decision (nothing is logged at this time)
Span span = spanBuilder.startSpan();
.setParent(parentContext)
.setSpanKind(SpanKind.CLIENT)
.startSpan();

// If span is sampled in, add additional attributes
if (span.isRecording()) {
addPostSamplingAttributes(span, request, context);
addPostSamplingAttributes(span, request, azContext);
}

// For no-op tracer, SpanContext is INVALID; inject valid span headers onto outgoing request
SpanContext spanContext = span.getSpanContext();
if (spanContext.isValid()) {
lmolkova marked this conversation as resolved.
Show resolved Hide resolved
traceContextFormat.inject(currentContext.with(span), request, contextSetter);
traceContextFormat.inject(parentContext.with(span), request, contextSetter);
}

// run the next policy and handle success and error
return next.process()
.doOnEach(OpenTelemetryHttpPolicy::handleResponse)
.contextWrite(Context.of("TRACING_SPAN", span, "REQUEST", request));
return parentContext.with(span);
lmolkova marked this conversation as resolved.
Show resolved Hide resolved
}

private static void addPostSamplingAttributes(Span span, HttpRequest request,
Expand Down Expand Up @@ -156,13 +159,12 @@ private static void handleResponse(Signal<? extends HttpResponse> signal) {

// Get the context that was added to the mono, this will contain the information needed to end the span.
ContextView context = signal.getContextView();
Optional<Span> tracingSpan = context.getOrEmpty("TRACING_SPAN");

if (!tracingSpan.isPresent()) {
Optional<io.opentelemetry.context.Context> traceContext = context.getOrEmpty(REACTOR_PARENT_TRACE_CONTEXT_KEY);
if (!traceContext.isPresent()) {
return;
}

Span span = tracingSpan.get();
Span span = Span.fromContext(traceContext.get());
HttpResponse httpResponse = null;
Throwable error = null;
if (signal.isOnNext()) {
Expand Down Expand Up @@ -202,7 +204,56 @@ private static void spanEnd(Span span, HttpResponse response, Throwable error) {
span.end();
}

/**
* Returns OpenTelemetry trace context from given com.azure.core.Context under PARENT_TRACE_CONTEXT_KEY
* or {@link io.opentelemetry.context.Context#current()}
*/
private static io.opentelemetry.context.Context getTraceContextOrCurrent(HttpPipelineCallContext azContext) {
final Optional<Object> traceContextOpt = azContext.getData(PARENT_TRACE_CONTEXT_KEY);
if (traceContextOpt.isPresent() && Context.class.isAssignableFrom(traceContextOpt.get().getClass())) {
lmolkova marked this conversation as resolved.
Show resolved Hide resolved
return (io.opentelemetry.context.Context) traceContextOpt.get();
}

// no need for back-compat with PARENT_SPAN_KEY - OpenTelemetryTracer will always set
// PARENT_TRACE_CONTEXT_KEY

return io.opentelemetry.context.Context.current();
}

// lambda that actually injects arbitrary header into the request
private final TextMapSetter<HttpRequest> contextSetter =
(request, key, value) -> request.getHeaders().set(key, value);

/**
* Helper class allowing to run Mono subscription and any hot path
* in scope of trace context. This enables OpenTelemetry auto-collection
* to pick it up and correlate lower levels of instrumentation and logs
* to logical/HTTP spans.
*
* OpenTelemetry reactor auto-instrumentation will take care of the cold path.
*/
static final class ScalarPropagatingMono extends Mono<Object> {
public static final Mono<Object> INSTANCE = new ScalarPropagatingMono();

private final Object value = new Object();

private ScalarPropagatingMono() {
}

@Override
public void subscribe(CoreSubscriber<? super Object> actual) {
Context traceContext = actual.currentContext().getOrDefault(REACTOR_PARENT_TRACE_CONTEXT_KEY, null);
if (traceContext != null) {
Object agentContext = OpenTelemetrySpanSuppressionHelper.registerClientSpan(traceContext);
AutoCloseable closeable = OpenTelemetrySpanSuppressionHelper.makeCurrent(agentContext, traceContext);
actual.onSubscribe(Operators.scalarSubscription(actual, value));
try {
trask marked this conversation as resolved.
Show resolved Hide resolved
closeable.close();
} catch (Throwable ignored) {
}
} else {
actual.onSubscribe(Operators.scalarSubscription(actual, value));
}
}
}
}
Loading