From 17fbbc21d3bcebb8bccb9ac3a470fbe042f24388 Mon Sep 17 00:00:00 2001 From: Pasquale Congiusti Date: Mon, 4 May 2026 12:59:35 +0200 Subject: [PATCH] fix(component): remove scope wrapping It remove the concept of "dirty" context which was a potential cause of leakage. It limit the access to the Otel context exclusively to custom processors Closes CAMEL-23380 --- .../OpenTelemetrySpanAdapter.java | 20 +--- .../opentelemetry2/OpenTelemetryTracer.java | 14 +-- .../TraceProcessorsOtelInterceptStrategy.java | 101 ++++++++++++++++++ .../BaggageInjectionInternalTest.java | 94 ---------------- .../opentelemetry2/BaggageInjectionTest.java | 2 + .../OpenTelemetryTracerTestSupport.java | 10 ++ .../pages/camel-4x-upgrade-guide-4_21.adoc | 11 +- proposals/tracing.adoc | 16 ++- 8 files changed, 146 insertions(+), 122 deletions(-) create mode 100644 components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java delete mode 100644 components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionInternalTest.java diff --git a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetrySpanAdapter.java b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetrySpanAdapter.java index b6c7a07202153..214c1dce81733 100644 --- a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetrySpanAdapter.java +++ b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetrySpanAdapter.java @@ -23,24 +23,18 @@ import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.StatusCode; -import io.opentelemetry.context.Scope; import org.apache.camel.telemetry.TagConstants; public class OpenTelemetrySpanAdapter implements org.apache.camel.telemetry.Span { private static final String DEFAULT_EVENT_NAME = "log"; - static final String BAGGAGE_CAMEL_FLAG = "camelScope"; private final Span otelSpan; private final Baggage baggage; - private Scope scope; - private Scope baggageScope; protected OpenTelemetrySpanAdapter(Span otelSpan, Baggage baggage) { this.otelSpan = otelSpan; - // We store an important flag in the baggage in order to verify if the - // root span was generated internally or from a third party dependency. - this.baggage = baggage.toBuilder().put(BAGGAGE_CAMEL_FLAG, "true").build(); + this.baggage = baggage; } protected Span getSpan() { @@ -48,8 +42,8 @@ protected Span getSpan() { } protected void makeCurrent() { - this.scope = this.otelSpan.makeCurrent(); - this.baggageScope = this.baggage.makeCurrent(); + // NOTE: we had changed the implementation not to depend + // any longer by thread scopes. } protected void end() { @@ -57,12 +51,8 @@ protected void end() { } protected void close() { - if (baggageScope != null) { - this.baggageScope.close(); - } - if (scope != null) { - this.scope.close(); - } + // NOTE: we had changed the implementation not to depend + // any longer by thread scopes } protected Baggage getBaggage() { diff --git a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryTracer.java b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryTracer.java index 257e5afd250bc..ed810c4f66e4f 100644 --- a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryTracer.java +++ b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/OpenTelemetryTracer.java @@ -30,6 +30,7 @@ import org.apache.camel.RuntimeCamelException; import org.apache.camel.api.management.ManagedResource; import org.apache.camel.spi.Configurer; +import org.apache.camel.spi.InterceptStrategy; import org.apache.camel.spi.annotations.JdkService; import org.apache.camel.support.CamelContextHelper; import org.apache.camel.telemetry.Span; @@ -73,6 +74,9 @@ protected void initTracer() { } this.setSpanLifecycleManager(new OpentelemetrySpanLifecycleManager(tracer, contextPropagators)); + + InterceptStrategy interceptStrategy = new TraceProcessorsOtelInterceptStrategy(); + getCamelContext().getCamelContextExtension().addInterceptStrategy(interceptStrategy); } void setTracer(Tracer tracer) { @@ -113,15 +117,7 @@ public Span create(String spanName, String spanKind, Span parent, SpanContextPro baggage = otelParentSpan.getBaggage(); } } else { - Context current = Context.root(); - // If the current span was generated by Camel, then, this is a "dirty" context. - // A "dirty" context happens when the Camel thread local is reused and - // due to the way Camel async works, can't reliably clean its context before reusing it. - if (Baggage.current().getEntryValue(OpenTelemetrySpanAdapter.BAGGAGE_CAMEL_FLAG) == null) { - // Not "dirty" context. In this case a Span exists and the current span was generated by some third party dependency (ie, vertx) - // therefore we need to consider this span as the root on such a trace. - current = Context.current(); - } + Context current = Context.current(); // Try to get parent from context propagation (upstream traces) Context ctx = contextPropagators.getTextMapPropagator().extract(current, extractor, new TextMapGetter() { diff --git a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java new file mode 100644 index 0000000000000..813ce637d16f7 --- /dev/null +++ b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.opentelemetry2; + +import java.util.concurrent.CompletableFuture; + +import io.opentelemetry.context.Scope; +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.NamedNode; +import org.apache.camel.Processor; +import org.apache.camel.spi.InterceptStrategy; +import org.apache.camel.support.AsyncCallbackToCompletableFutureAdapter; +import org.apache.camel.support.processor.DelegateAsyncProcessor; +import org.apache.camel.telemetry.Span; +import org.apache.camel.telemetry.SpanStorageManagerExchange; + +/** + * TraceProcessorsOtelInterceptStrategy is used to wrap each processor calls and generate the scope required for any + * custom Opentelemetry activity in the processor to be available. + */ +public class TraceProcessorsOtelInterceptStrategy implements InterceptStrategy { + + // NOTE: this is an implementation detail that the interceptor should not know. + // We are temporarily using this to evaluate as a patch for a context leak problem we're suffering. + // Once we are clear this is correctly fixed and no more corner cases, then, we should change the TraceProcessorsInterceptStrategy + // class in camel-telemetry component in order to be able to retrieve the span before executing the processor and perform + // a similar logic of what we're doing here. + private SpanStorageManagerExchange spanStorage = new SpanStorageManagerExchange(); + + @Override + public Processor wrapProcessorInInterceptors( + CamelContext camelContext, + NamedNode processorDefinition, Processor target, Processor nextTarget) + throws Exception { + return new TraceProcessor(target); + } + + private class TraceProcessor extends DelegateAsyncProcessor { + + public TraceProcessor(Processor target) { + super(target); + } + + @Override + public void process(Exchange exchange) throws Exception { + Span activeSpan = spanStorage.peek(exchange); + if (activeSpan != null) { + OpenTelemetrySpanAdapter otelSpan = (OpenTelemetrySpanAdapter) activeSpan; + try (Scope scope = otelSpan.getSpan().makeCurrent(); + Scope baggageScope = otelSpan.getBaggage().makeCurrent()) { + processor.process(exchange); + } + } else { + processor.process(exchange); + } + } + + @Override + public boolean process(Exchange exchange, AsyncCallback callback) { + Span activeSpan = spanStorage.peek(exchange); + if (activeSpan != null) { + OpenTelemetrySpanAdapter otelSpan = (OpenTelemetrySpanAdapter) activeSpan; + try (Scope scope = otelSpan.getSpan().makeCurrent(); + Scope baggageScope = otelSpan.getBaggage().makeCurrent()) { + return processor.process(exchange, doneSync -> { + callback.done(doneSync); + }); + } + } else { + return processor.process(exchange, doneSync -> { + callback.done(doneSync); + }); + } + } + + @Override + public CompletableFuture processAsync(Exchange exchange) { + AsyncCallbackToCompletableFutureAdapter callback + = new AsyncCallbackToCompletableFutureAdapter<>(exchange); + process(exchange, callback); + return callback.getFuture(); + } + } + +} diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionInternalTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionInternalTest.java deleted file mode 100644 index bbf6fe135f0cf..0000000000000 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionInternalTest.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.opentelemetry2; - -import java.io.IOException; -import java.util.Map; - -import io.opentelemetry.api.baggage.Baggage; -import io.opentelemetry.api.trace.Tracer; -import io.opentelemetry.context.Scope; -import org.apache.camel.CamelContext; -import org.apache.camel.CamelContextAware; -import org.apache.camel.Exchange; -import org.apache.camel.Processor; -import org.apache.camel.RoutesBuilder; -import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.opentelemetry2.CamelOpenTelemetryExtension.OtelTrace; -import org.junit.jupiter.api.Test; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class BaggageInjectionInternalTest extends OpenTelemetryTracerTestSupport { - - Tracer tracer = otelExtension.getOpenTelemetry().getTracer("spanInjection"); - - @Override - protected CamelContext createCamelContext() throws Exception { - OpenTelemetryTracer tst = new OpenTelemetryTracer(); - tst.setTracer(tracer); - tst.setContextPropagators(otelExtension.getOpenTelemetry().getPropagators()); - tst.setTraceProcessors(true); - CamelContext context = super.createCamelContext(); - CamelContextAware.trySetCamelContext(tst, context); - tst.init(context); - return context; - } - - @Test - void testRouteExternalBaggage() throws IOException { - template.sendBody("direct:start", "my-body"); - Map traces = otelExtension.getTraces(); - assertEquals(1, traces.size()); - } - - @Override - protected RoutesBuilder createRouteBuilder() { - return new RouteBuilder() { - - Scope baggageScope; - - @Override - public void configure() { - from("direct:start") - .process(new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - // Open the scope to propagate some baggage info - baggageScope = Baggage.current().toBuilder().put("my.id", "9876").build().makeCurrent(); - } - }) - .routeId("start") - .log("A message") - .process(new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - assertEquals("9876", Baggage.current().getEntryValue("my.id")); - } - }) - .to("log:info") - .process(new Processor() { - @Override - public void process(Exchange exchange) throws Exception { - // Close the scope previously opened - baggageScope.close(); - } - }); - } - }; - } -} diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionTest.java index a66f8995583cc..23a20c2f12a70 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageInjectionTest.java @@ -66,6 +66,8 @@ void testRouteExternalBaggage() throws IOException { Map traces = otelExtension.getTraces(); assertEquals(1, traces.size()); checkTrace(traces.values().iterator().next()); + } finally { + span.end(); } } } diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/OpenTelemetryTracerTestSupport.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/OpenTelemetryTracerTestSupport.java index 7976bf8fe3619..c8cf5ec572668 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/OpenTelemetryTracerTestSupport.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/OpenTelemetryTracerTestSupport.java @@ -19,10 +19,14 @@ import java.util.List; import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.context.Context; import io.opentelemetry.sdk.trace.data.SpanData; import org.apache.camel.telemetry.Op; import org.apache.camel.telemetry.TagConstants; import org.apache.camel.test.junit6.ExchangeTestSupport; +import org.junit.jupiter.api.AfterEach; + +import static org.junit.jupiter.api.Assertions.assertEquals; public class OpenTelemetryTracerTestSupport extends ExchangeTestSupport { @@ -41,4 +45,10 @@ protected static SpanData getSpan(List trace, String uri, Op op) { throw new IllegalArgumentException("Trying to get a non existing span!"); } + @AfterEach + public void assertCurrentIsRoot() { + // We must guarantee no context leaking + assertEquals(Context.root(), Context.current()); + } + } diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc index 517cc570bfc85..8029964694ed0 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_21.adoc @@ -158,4 +158,13 @@ extend from: - `AbstractSpanDecorator` (returns `INTERNAL` for all operations) - `AbstractHttpSpanDecorator` (returns `CLIENT` for `EVENT_SENT`, `SERVER` for `EVENT_RECEIVED`) -- `AbstractMessagingSpanDecorator` (returns `PRODUCER` for `EVENT_SENT`, `CONSUMER` for `EVENT_RECEIVED`) \ No newline at end of file +- `AbstractMessagingSpanDecorator` (returns `PRODUCER` for `EVENT_SENT`, `CONSUMER` for `EVENT_RECEIVED`) + +=== camel-opentelemetry2 + +In order to prevent a potential leak when running asynchronous components we need to rethink the implementation details of `camel-opentelemetry2` and remove the `Scope` wrapping that, when asynchronous, was opening the `Scope` in a thread and closing in another (what we had called "dirty" context). We are now removing this wrapping and moving this part exclusively in the custom Camel `Processors`. Here Camel will take care to open the Opentelemetry scope and close it within the same thread. + +What it means is that, from now on, the final user or any third party dependency can only "control" the Opentelemetry context within the boundary of a Processor execution. + +This is just an informative note, there is not action expected by the final user. + diff --git a/proposals/tracing.adoc b/proposals/tracing.adoc index b0cdd1f53799b..9d91978df7e21 100644 --- a/proposals/tracing.adoc +++ b/proposals/tracing.adoc @@ -9,7 +9,7 @@ approvers: - "@zbendhiba" - "@davsclaus" creation-date: 2025-01-08 -last-updated: 2026-03-10 +last-updated: 2026-05-04 status: implemented see-also: [] replaces: [] @@ -98,7 +98,9 @@ The **Context propagation** is a way to correlate distributed traces between eac The Exchange stack storage already exists and it may suffice to this proposal goals. Again, we need to remove the implementation specific details from the abstraction and make sure that we don't slip any implementation detail in the future by design. Some concern we may have would be about the correct handling of opening and closure of spans which may be different according the each implementation specific. However, if the lifecycle we have in place takes care of consistency, this should not be a problem at all: each implementation should be in charge to do the needful when each lifecycle method is called. The Exchange stack storage can be used to store a span wrapper and maintain a state for it: this is something already available. -In order to clarify this aspect, let's take `camel-opentelemetry` as an example. When we call the *activation* method, then, we must make sure that the span passed is correctly activated, calling therefore the `span.makeCurrent()` method. The generated scope has therefore to be kept in the same span wrapper in order to be later closed when the *closure* method is called via `scope.close()`. As each span wrapper is stored in the Exchange, then we can use this approach to maintain the state of each wrapper regardless how its specific implementation works. +=== Thread local scope management + +Certain implementations (i.e., Opentelemetry) may leverage the `ThreadLocal` Java API. This is an implementation details that we don't want to manage directly in the abstraction and has to be managed in each concrete implementation. In the specific case of Opentelemetry, since this is materialized by the concept of a `Scope` and that the `Scope` must be opened and closed within the same Thread to maintain consistency and avoid leakage, we will implement it exclusively in the process via some `InterceptStrategy`. This means that the access to the given Opentelemetry context can be only available within a custom Processor execution (for example to provide any additional custom Span to your trace execution). === Context propagation @@ -156,4 +158,12 @@ Added a chapter to clarify the adoption and compliancy of W3C trace context of C === Deprecation of older `camel-tracing` components (2026-03-10) -Added a deprecation notice for `camel-tracing` and related components. Also, identified the custom logic previously required by these components into core dependencies. Also deprecated that part for future removals. \ No newline at end of file +Added a deprecation notice for `camel-tracing` and related components. Also, identified the custom logic previously required by these components into core dependencies. Also deprecated that part for future removals. + +=== Removal of explicit `camel-opentelemetry2` Scope wrapping to avoid leakage (2026-05-04) + +As described in https://issues.apache.org/jira/browse/CAMEL-23380 any version before 4.21 was suffering from potential leaks. This was due to the assumption that it was fine to deal with "dirty" context (context that could be reused by asynchronous threads). Although the final solution was consistent, we realized that, in the long run, the leak could provoke disruptions. + +In order to prevent this problem we need to rethink the implementation details of `camel-opentelemetry2` and remove the explicit `Scope` management that, when asynchronous, was opening the `Scope` in a thread and closing in another (what we had called "dirty" context). We are now removing this explicit management and moving this part exclusively in the custom Camel `Processors`. Here Camel will take care to open the Opentelemetry scope and close it within the same thread. + +What it means is that, from now on we get rid of the leak but the final user or any third party dependency can only access the Opentelemetry context within the boundary of a Processor execution.