Skip to content

Commit

Permalink
OpenTelemetry Bridge Baggage support (#3249)
Browse files Browse the repository at this point in the history
  • Loading branch information
JonasKunz committed Jul 25, 2023
1 parent c1ad071 commit eee7e77
Show file tree
Hide file tree
Showing 11 changed files with 200 additions and 48 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Expand Up @@ -34,6 +34,7 @@ Use subheadings with the "=====" level for adding notes for unreleased changes:
[float]
===== Features
* Added W3C baggage propagation - {pull}3236[#3236]
* Added support for baggage in OpenTelemetry bridge - {pull}3249[#3249]
[[release-notes-1.x]]
=== Java Agent version 1.x
Expand Down
Expand Up @@ -245,21 +245,32 @@ public void onChange(ConfigurationOption<?> configurationOption, Double oldValue
@Override
@Nullable
public Transaction startRootTransaction(@Nullable ClassLoader initiatingClassLoader) {
return startRootTransaction(sampler, -1, initiatingClassLoader);
return startRootTransaction(sampler, -1, currentContext().getBaggage(), initiatingClassLoader);
}

@Override
@Nullable
public Transaction startRootTransaction(@Nullable ClassLoader initiatingClassLoader, long epochMicro) {
return startRootTransaction(sampler, epochMicro, initiatingClassLoader);
return startRootTransaction(sampler, epochMicro, currentContext().getBaggage(), initiatingClassLoader);
}

@Nullable
@Override
public Transaction startRootTransaction(@Nullable ClassLoader initiatingClassLoader, Baggage baseBaggage, long epochMicro) {
return startRootTransaction(sampler, epochMicro, baseBaggage, initiatingClassLoader);
}


@Nullable
public Transaction startRootTransaction(Sampler sampler, long epochMicros, @Nullable ClassLoader initiatingClassLoader) {
return startRootTransaction(sampler, epochMicros, currentContext().getBaggage(), initiatingClassLoader);
}

@Override
@Nullable
public Transaction startRootTransaction(Sampler sampler, long epochMicros, Baggage baseBaggage, @Nullable ClassLoader initiatingClassLoader) {
Transaction transaction = null;
if (isRunning()) {
Baggage baseBaggage = currentContext().getBaggage();
transaction = createTransaction().startRoot(epochMicros, sampler, baseBaggage);
afterTransactionStart(initiatingClassLoader, transaction);
}
Expand All @@ -274,17 +285,21 @@ public <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHead

@Override
@Nullable
public <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader, long epochMicros) {
public <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader, Baggage baseBaggage, long epochMicros) {
return startChildTransaction(headerCarrier, textHeadersGetter, sampler, epochMicros, initiatingClassLoader);
}

@Override
@Nullable
public <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, Sampler sampler,
long epochMicros, @Nullable ClassLoader initiatingClassLoader) {
return startChildTransaction(headerCarrier, textHeadersGetter, sampler, epochMicros, currentContext().getBaggage(), initiatingClassLoader);
}

private <C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, Sampler sampler,
long epochMicros, Baggage baseBaggage, @Nullable ClassLoader initiatingClassLoader) {
Transaction transaction = null;
if (isRunning()) {
Baggage baseBaggage = currentContext().getBaggage();
transaction = createTransaction().start(TraceContext.<C>getFromTraceContextTextHeaders(), headerCarrier,
textHeadersGetter, epochMicros, sampler, baseBaggage);
afterTransactionStart(initiatingClassLoader, transaction);
Expand Down
Expand Up @@ -19,6 +19,7 @@
package co.elastic.apm.agent.impl;

import co.elastic.apm.agent.configuration.ServiceInfo;
import co.elastic.apm.agent.impl.baggage.Baggage;
import co.elastic.apm.agent.impl.error.ErrorCapture;
import co.elastic.apm.agent.impl.sampling.Sampler;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
Expand All @@ -40,6 +41,9 @@ public interface Tracer extends co.elastic.apm.agent.tracer.Tracer {
@Nullable
Transaction startRootTransaction(@Nullable ClassLoader initiatingClassLoader, long epochMicro);

@Nullable
Transaction startRootTransaction(@Nullable ClassLoader initiatingClassLoader, Baggage baseBaggage, long epochMicro);

/**
* Starts a trace-root transaction with a specified sampler and start timestamp
*
Expand All @@ -51,14 +55,14 @@ public interface Tracer extends co.elastic.apm.agent.tracer.Tracer {
* @return a transaction that will be the root of the current trace if the agent is currently RUNNING; null otherwise
*/
@Nullable
Transaction startRootTransaction(Sampler sampler, long epochMicros, @Nullable ClassLoader initiatingClassLoader);
Transaction startRootTransaction(Sampler sampler, long epochMicros, Baggage baseBaggage, @Nullable ClassLoader initiatingClassLoader);

@Override
@Nullable
<C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader);

@Nullable
<C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader, long epochMicros);
<C> Transaction startChildTransaction(@Nullable C headerCarrier, TextHeaderGetter<C> textHeadersGetter, @Nullable ClassLoader initiatingClassLoader, Baggage baseBaggage, long epochMicros);

/**
* Starts a transaction as a child of the context headers obtained through the provided {@link HeaderGetter}.
Expand Down
Expand Up @@ -81,6 +81,11 @@ public Builder put(String key, @Nullable String value) {
return this;
}

public Builder put(String key, @Nullable String value, @Nullable String metadata) {
baggageBuilder.put(key, value, metadata);
return this;
}

@Override
public Builder remove(String key) {
baggageBuilder.put(key, null);
Expand Down
Expand Up @@ -22,7 +22,6 @@
import co.elastic.apm.agent.impl.baggage.Baggage;
import co.elastic.apm.agent.impl.baggage.BaggageContext;
import co.elastic.apm.agent.impl.baggage.W3CBaggagePropagation;
import co.elastic.apm.agent.tracer.BaggageContextBuilder;
import co.elastic.apm.agent.tracer.Scope;
import co.elastic.apm.agent.tracer.dispatch.BinaryHeaderSetter;
import co.elastic.apm.agent.tracer.dispatch.HeaderUtils;
Expand Down Expand Up @@ -79,7 +78,7 @@ public Scope activateInScope() {
}

@Override
public BaggageContextBuilder withUpdatedBaggage() {
public BaggageContext.Builder withUpdatedBaggage() {
return BaggageContext.createBuilder(this);
}

Expand Down
@@ -0,0 +1,58 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package co.elastic.apm.agent.opentelemetry.baggage;

import co.elastic.apm.agent.sdk.weakconcurrent.WeakConcurrent;
import co.elastic.apm.agent.sdk.weakconcurrent.WeakMap;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.baggage.BaggageBuilder;
import io.opentelemetry.api.baggage.BaggageEntryMetadata;

public class OtelBaggage {

private static final WeakMap<Baggage, co.elastic.apm.agent.impl.baggage.Baggage> translationCache = WeakConcurrent.buildMap();

public static Baggage fromElasticBaggage(co.elastic.apm.agent.impl.baggage.Baggage elasticBaggage) {
BaggageBuilder builder = Baggage.builder();
for (String key : elasticBaggage.keys()) {
builder.put(key, elasticBaggage.get(key), BaggageEntryMetadata.create(elasticBaggage.getMetadata(key)));
}
Baggage result = builder.build();
// remember the translation for future toElasticBaggage calls
translationCache.put(result, elasticBaggage);
return result;
}

public static co.elastic.apm.agent.impl.baggage.Baggage toElasticBaggage(Baggage otelBaggage) {
if (otelBaggage == null || otelBaggage.isEmpty()) {
return co.elastic.apm.agent.impl.baggage.Baggage.EMPTY;
}
co.elastic.apm.agent.impl.baggage.Baggage translated = translationCache.get(otelBaggage);
if (translated == null) {
co.elastic.apm.agent.impl.baggage.Baggage.Builder builder = co.elastic.apm.agent.impl.baggage.Baggage.builder();
otelBaggage.forEach((key, value) -> {
String metadata = value.getMetadata().getValue();
builder.put(key, value.getValue(), metadata.isEmpty() ? null : metadata);
});
translated = builder.build();
translationCache.put(otelBaggage, translated);
}
return translated;
}
}
Expand Up @@ -19,7 +19,6 @@
package co.elastic.apm.agent.opentelemetry.context;

import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.ElasticContext;
import co.elastic.apm.agent.opentelemetry.tracing.OTelBridgeContext;
import co.elastic.apm.agent.sdk.logging.Logger;
Expand Down Expand Up @@ -77,12 +76,6 @@ public Context current() {
return (Context) current;
}

AbstractSpan<?> currentSpan = current.getSpan();
if (currentSpan == null) {
// OTel context without an active span is not supported yet
return null;
}

// Ensure that root context is being accessed at least once to capture the original root
// OTel 1.0 directly calls ArrayBasedContext.root() which is not publicly accessible, later versions delegate
// to ContextStorage.root() which we can't call from here either.
Expand All @@ -91,6 +84,6 @@ public Context current() {
// Current context hasn't been created with this OTel instance, but with another OTel plugin instance
// (one per external plugin) or is an Elastic context (span or transaction), thus needs wrapping to make it visible
// to this OTel context.
return tracer.wrapActiveContextIfRequired(OTelBridgeContext.class, () -> OTelBridgeContext.wrapElasticActiveSpan(tracer, currentSpan));
return tracer.wrapActiveContextIfRequired(OTelBridgeContext.class, () -> OTelBridgeContext.wrapElasticActiveSpan(tracer, current));
}
}
Expand Up @@ -22,6 +22,7 @@
import co.elastic.apm.agent.impl.baggage.Baggage;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.ElasticContext;
import co.elastic.apm.agent.opentelemetry.baggage.OtelBaggage;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
Expand Down Expand Up @@ -81,11 +82,11 @@ public static OTelBridgeContext bridgeRootContext(ElasticApmTracer tracer, Conte
/**
* Bridges an active elastic span to an active OTel span context
*
* @param tracer tracer
* @param span elastic (currently active) span
* @param tracer tracer
* @param currentContext elastic (currently active) context
* @return bridged context with span as active
*/
public static OTelBridgeContext wrapElasticActiveSpan(ElasticApmTracer tracer, AbstractSpan<?> span) {
public static OTelBridgeContext wrapElasticActiveSpan(ElasticApmTracer tracer, ElasticContext<?> currentContext) {
if (root == null) {
// Ensure that root context is being accessed at least once to capture the original root
// OTel 1.0 directly calls ArrayBasedContext.root() which is not publicly accessible, later versions delegate
Expand All @@ -94,8 +95,15 @@ public static OTelBridgeContext wrapElasticActiveSpan(ElasticApmTracer tracer, A
}
Objects.requireNonNull(originalRootContext, "OTel original context must be set through bridgeRootContext first");

OTelSpan otelSpan = new OTelSpan(span);
return new OTelBridgeContext(tracer, originalRootContext.with(otelSpan));
Context result = originalRootContext;
if (currentContext.getSpan() != null) {
result = result.with(new OTelSpan(currentContext.getSpan()));
}
if (!currentContext.getBaggage().isEmpty()) {
result = result.with(OtelBaggage.fromElasticBaggage(currentContext.getBaggage()));
}

return new OTelBridgeContext(tracer, result);
}

@Nullable
Expand All @@ -111,9 +119,12 @@ public AbstractSpan<?> getSpan() {

@Override
public Baggage getBaggage() {
//TODO: correctly implement baggage
AbstractSpan<?> currentSpan = getSpan();
return currentSpan != null ? currentSpan.getBaggage() : Baggage.EMPTY;
io.opentelemetry.api.baggage.Baggage otelBaggage = io.opentelemetry.api.baggage.Baggage.fromContext(otelContext);

if (otelBaggage == null || otelBaggage.isEmpty()) {
return Baggage.EMPTY;
}
return OtelBaggage.toElasticBaggage(otelBaggage);
}


Expand Down
Expand Up @@ -19,12 +19,14 @@
package co.elastic.apm.agent.opentelemetry.tracing;

import co.elastic.apm.agent.impl.ElasticApmTracer;
import co.elastic.apm.agent.impl.baggage.Baggage;
import co.elastic.apm.agent.impl.transaction.AbstractSpan;
import co.elastic.apm.agent.impl.transaction.MultiValueMapAccessor;
import co.elastic.apm.agent.impl.transaction.OTelSpanKind;
import co.elastic.apm.agent.tracer.Outcome;
import co.elastic.apm.agent.impl.transaction.TraceContext;
import co.elastic.apm.agent.impl.transaction.Transaction;
import co.elastic.apm.agent.opentelemetry.baggage.OtelBaggage;
import co.elastic.apm.agent.sdk.logging.Logger;
import co.elastic.apm.agent.sdk.logging.LoggerFactory;
import co.elastic.apm.agent.sdk.internal.util.LoggerUtils;
Expand Down Expand Up @@ -58,9 +60,7 @@ class OTelSpanBuilder implements SpanBuilder {
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
private long epochMicros = -1;
@Nullable
private AbstractSpan<?> parent;
@Nullable
private Context remoteContext;
private Context parent;

private List<SpanContext> links = new ArrayList<>();

Expand All @@ -74,19 +74,13 @@ public OTelSpanBuilder(String spanName, ElasticApmTracer elasticApmTracer) {

@Override
public SpanBuilder setParent(Context context) {
Span span = Span.fromContext(context);
if (span.getSpanContext().isRemote()) {
remoteContext = context;
} else if (span instanceof OTelSpan) {
parent = ((OTelSpan) span).getInternalSpan();
}
parent = context;
return this;
}

@Override
public SpanBuilder setNoParent() {
parent = null;
remoteContext = null;
return this;
}

Expand Down Expand Up @@ -151,19 +145,32 @@ public SpanBuilder setStartTimestamp(long startTimestamp, TimeUnit unit) {
public Span startSpan() {
AbstractSpan<?> span;

if (parent == null) {
Baggage parentBaggage;
AbstractSpan<?> parentSpan = null;
Context remoteContext = null;

if (parent != null) {
Span parentOtelSpan = Span.fromContext(parent);
if (parentOtelSpan.getSpanContext().isRemote()) {
remoteContext = parent;
} else if (parentOtelSpan instanceof OTelSpan) {
parentSpan = ((OTelSpan) parentOtelSpan).getInternalSpan();
}
parentBaggage = OtelBaggage.toElasticBaggage(io.opentelemetry.api.baggage.Baggage.fromContext(parent));
} else {
// when parent is not explicitly set, the currently active parent is used as fallback
parent = elasticApmTracer.getActive();
parentSpan = elasticApmTracer.currentContext().getSpan();
parentBaggage = elasticApmTracer.currentContext().getBaggage();
}
//TODO: correctly implement baggage: we need to use the baggage from the parent context instead of from the parent span

if (remoteContext != null) {
PotentiallyMultiValuedMap headers = new PotentiallyMultiValuedMap(2);
W3CTraceContextPropagator.getInstance().inject(remoteContext, headers, PotentiallyMultiValuedMap::add);
span = elasticApmTracer.startChildTransaction(headers, MultiValueMapAccessor.INSTANCE, PrivilegedActionUtils.getClassLoader(getClass()), epochMicros);
} else if (parent == null) {
span = elasticApmTracer.startRootTransaction(PrivilegedActionUtils.getClassLoader(getClass()), epochMicros);
span = elasticApmTracer.startChildTransaction(headers, MultiValueMapAccessor.INSTANCE, PrivilegedActionUtils.getClassLoader(getClass()), parentBaggage, epochMicros);
} else if (parentSpan == null) {
span = elasticApmTracer.startRootTransaction(PrivilegedActionUtils.getClassLoader(getClass()), parentBaggage, epochMicros);
} else {
span = elasticApmTracer.startSpan(parent, parent.getBaggage(), epochMicros);
span = elasticApmTracer.startSpan(parentSpan, parentBaggage, epochMicros);
}
if (span == null) {
return Span.getInvalid();
Expand Down

0 comments on commit eee7e77

Please sign in to comment.