diff --git a/components/camel-micrometer-observability/src/main/docs/micrometer-observability.adoc b/components/camel-micrometer-observability/src/main/docs/micrometer-observability.adoc index 59b4f3957ce54..81cf62cf40f31 100644 --- a/components/camel-micrometer-observability/src/main/docs/micrometer-observability.adoc +++ b/components/camel-micrometer-observability/src/main/docs/micrometer-observability.adoc @@ -40,7 +40,7 @@ include::spring-boot:partial$starter.adoc[] === Spring Boot context propagation -The starter is in charge to autoconfigure the component. Additionally you will need to specify the concrete Propagation implementation by adding the dependency you wish to use (for example, `io.micrometer:micrometer-tracing-bridge-otel`, 'io.micrometer:micrometer-tracing-bridge-brave' or any other technology you wish to use). If none is provided, a "no-op" implementation will be defined as default. +The starter is in charge to autoconfigure the component. Additionally you will need to specify the concrete Propagation implementation by adding the dependency you wish to use (for example, `io.micrometer:micrometer-tracing-bridge-otel`, `io.micrometer:micrometer-tracing-bridge-brave` or any other technology you wish to use). If none is provided, a "no-op" implementation will be defined as default. == Using with standalone Camel @@ -121,7 +121,7 @@ import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator; event -> { }, new OtelBaggageManager( - currentTraceContext, List.of(MicrometerObservabilitySpanAdapter.BAGGAGE_CAMEL_FLAG), List.of())); + currentTraceContext, List.of(), List.of())); OtelPropagator otelPropagator = new OtelPropagator(propagators, otelTracer); getContext().getRegistry().bind("MicrometerObservabilityTracer", micrometerTracer); getContext().getRegistry().bind("OpentelemetryPropagators", otelPropagator); @@ -135,8 +135,6 @@ NOTE: this is an example that can be used as a reference. It may not work exactl You can see that the configuration of this component may get a bit difficult, unless you are already familiar with the tracing technology you're going to implement. -NOTE: an important thing to do is to include a `BaggageManager` specifying the baggage properties to include during context propagation. The field provided in the example, `MicrometerObservabilitySpanAdapter.BAGGAGE_CAMEL_FLAG`, is necessary to handle consistency across asynchronous threads. - === How to trace Once the application is instrumented and configured, you can observe the traces produced with the tooling compatible to the concrete implementation you have in place. You are invited to follow the specific documentation of each technology. @@ -173,3 +171,30 @@ public void process(Exchange exchange) throws Exception { mySpan.end(); } ---- + +=== Baggage customization + +`Baggage` is a way to attach key-value metadata to a request and carry it across service boundaries. In the context of telemetry technologies, baggage travels along with the context (like trace/span), but it's meant for custom data you define, not telemetry internals. Camel allows you to programmatically provide any `Baggage` information via Exchange property settings. Whenever the component finds a property defined as `CamelBaggage_xyz` it will consider it as a baggage variable named `xyz`. For example, in Java DSL: + +[source,java] +---- + from("direct:start") + .setProperty("CamelBaggage_myValue", constant("1234")) + .routeId("start") + .log("A message") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + // Baggage is available via the Micrometer Observability API + String val = tracer.getBaggage("myValue").get(); + } + }) + .to("log:info"); +---- + +Any span executed after the `setProperty` will include a baggage variable named `myValue` with value `1234` which will be reflected in your telemetry result as well. + +NOTE: an important thing to do is to include a `BaggageManager` specifying the baggage properties to include during context propagation. You may require to configure those baggages fields you want to propagate across the boards. + +Any baggage setting defined externally (i.e., calling the Camel process into another external process where you're setting some baggage) is normally going to be propagated in Camel logic. + diff --git a/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/TraceProcessorsMicrometerObsInterceptStrategy.java b/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/TraceProcessorsMicrometerObsInterceptStrategy.java index 347547cbc5a4a..a1c6baa0a9b4a 100644 --- a/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/TraceProcessorsMicrometerObsInterceptStrategy.java +++ b/components/camel-micrometer-observability/src/main/java/org/apache/camel/micrometer/observability/TraceProcessorsMicrometerObsInterceptStrategy.java @@ -16,8 +16,11 @@ */ package org.apache.camel.micrometer.observability; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; +import io.micrometer.tracing.BaggageInScope; import io.micrometer.tracing.Tracer; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -67,8 +70,9 @@ public TraceProcessor(Processor target) { public void process(Exchange exchange) throws Exception { Span activeSpan = spanStorage.peek(exchange); if (activeSpan != null) { - MicrometerObservabilitySpanAdapter otelSpan = (MicrometerObservabilitySpanAdapter) activeSpan; - try (Tracer.SpanInScope scope = tracer.withSpan(otelSpan.getSpan())) { + MicrometerObservabilitySpanAdapter microObsSpan = (MicrometerObservabilitySpanAdapter) activeSpan; + try (Tracer.SpanInScope scope = tracer.withSpan(microObsSpan.getSpan()); + ScopedBaggages scopedBaggages = new ScopedBaggages(getBaggageFromProperties(exchange))) { processor.process(exchange); } } else { @@ -80,8 +84,9 @@ public void process(Exchange exchange) throws Exception { public boolean process(Exchange exchange, AsyncCallback callback) { Span activeSpan = spanStorage.peek(exchange); if (activeSpan != null) { - MicrometerObservabilitySpanAdapter otelSpan = (MicrometerObservabilitySpanAdapter) activeSpan; - try (Tracer.SpanInScope scope = tracer.withSpan(otelSpan.getSpan())) { + MicrometerObservabilitySpanAdapter microObsSpan = (MicrometerObservabilitySpanAdapter) activeSpan; + try (Tracer.SpanInScope scope = tracer.withSpan(microObsSpan.getSpan()); + ScopedBaggages scopedBaggages = new ScopedBaggages(getBaggageFromProperties(exchange))) { return processor.process(exchange, doneSync -> { callback.done(doneSync); }); @@ -100,6 +105,59 @@ public CompletableFuture processAsync(Exchange exchange) { process(exchange, callback); return callback.getFuture(); } + + // We inspect the exchange in order to find any baggage variable + private List getBaggageFromProperties(Exchange exchange) { + List baggages = new ArrayList<>(); + + for (String propertyKey : exchange.getProperties().keySet()) { + String key = getBaggageVar(propertyKey); + if (key != null) { + String value = exchange.getProperty(propertyKey) == null + ? null : exchange.getProperty(propertyKey).toString(); + baggages.add(tracer.createBaggageInScope(key, value)); + } + } + + return baggages; + } + + private String getBaggageVar(String key) { + if (key == null || !key.startsWith(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY)) { + return null; + } + + return key.substring(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY.length()); + } + } +} + +class ScopedBaggages implements AutoCloseable { + + private final List baggages; + + public ScopedBaggages(List baggages) { + this.baggages = new ArrayList<>(baggages); } + @Override + public void close() { + RuntimeException failure = null; + for (int i = baggages.size() - 1; i >= 0; i--) { + try { + baggages.get(i).close(); + } catch (Exception e) { + if (failure == null) { + failure = new RuntimeException( + "Failed to close baggage scopes", e); + } else { + failure.addSuppressed(e); + } + } + } + + if (failure != null) { + throw failure; + } + } } diff --git a/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/BaggageSettingTest.java b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/BaggageSettingTest.java new file mode 100644 index 0000000000000..a4042f1b340ea --- /dev/null +++ b/components/camel-micrometer-observability/src/test/java/org/apache/camel/micrometer/observability/BaggageSettingTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.micrometer.observability; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import io.opentelemetry.sdk.trace.data.SpanData; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.RoutesBuilder; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.micrometer.observability.CamelOpenTelemetryExtension.OtelTrace; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class BaggageSettingTest extends MicrometerObservabilityTracerPropagationTestSupport { + + @Override + protected CamelContext createCamelContext() throws Exception { + CamelContext ctx = super.createCamelContext(); + tst.setTraceProcessors(true); + tst.setDisableCoreProcessors(false); + + return ctx; + } + + @Test + void testRouteProgrammaticBaggage() throws IOException { + template.sendBody("direct:start", "my-body"); + Map traces = otelExtension.getTraces(); + assertEquals(1, traces.size()); + checkTrace(traces.values().iterator().next()); + } + + private void checkTrace(OtelTrace trace) { + List spans = trace.getSpans(); + assertEquals(7, spans.size()); + SpanData testProducer = spans.get(0); + SpanData direct = spans.get(1); + SpanData setHeaders = spans.get(2); + SpanData innerLog = spans.get(3); + SpanData innerProcessor = spans.get(4); + SpanData log = spans.get(5); + SpanData innerToLog = spans.get(6); + + // Validate span completion + assertTrue(testProducer.hasEnded()); + assertTrue(direct.hasEnded()); + assertTrue(setHeaders.hasEnded()); + assertTrue(innerLog.hasEnded()); + assertTrue(innerProcessor.hasEnded()); + assertTrue(log.hasEnded()); + assertTrue(innerToLog.hasEnded()); + } + + @Override + protected RoutesBuilder createRouteBuilder() { + return new RouteBuilder() { + + @Override + public void configure() { + from("direct:start") + .setProperty("CamelBaggage_tenant.id", constant("1234")) + .routeId("start") + .log("A message") + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + assertEquals("1234", tracer.getBaggage("tenant.id").get()); + } + }) + .to("log:info"); + } + }; + } +} diff --git a/components/camel-opentelemetry2/src/main/docs/opentelemetry2.adoc b/components/camel-opentelemetry2/src/main/docs/opentelemetry2.adoc index ef42bad2eaa49..5d66034680c46 100644 --- a/components/camel-opentelemetry2/src/main/docs/opentelemetry2.adoc +++ b/components/camel-opentelemetry2/src/main/docs/opentelemetry2.adoc @@ -241,18 +241,17 @@ For this reason, whenever you need to provide custom telemetry information, it i === Baggage customization -`Baggage` is a way to attach key-value metadata to a request and carry it across service boundaries. In the context of OpenTelemetry, baggage travels along with the context (like trace/span), but it's meant for custom data you define, not telemetry internals. Camel allows you to programmatically provide any `Baggage` information via header settings. Whenever the component finds an header defined as `OTEL_BAGGAGE_xyz` it will consider it as a baggage variable named `xyz`. For example, in Java DSL: +`Baggage` is a way to attach key-value metadata to a request and carry it across service boundaries. In the context of OpenTelemetry, baggage travels along with the context (like trace/span), but it's meant for custom data you define, not telemetry internals. Camel allows you to programmatically provide any `Baggage` information via Exchange property settings. Whenever the component finds a property defined as `CamelBaggage_xyz` it will consider it as a baggage variable named `xyz`. For example, in Java DSL: [source,java] ---- from("direct:start") - .setHeader("OTEL_BAGGAGE_myValue", constant("1234")) + .setProperty("CamelBaggage_myValue", constant("1234")) .routeId("start") .log("A message") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader("operation", "fake"); // Baggage is available via the OpenTelemetry API String val = Baggage.current().getEntryValue("myValue"); } @@ -260,6 +259,6 @@ For this reason, whenever you need to provide custom telemetry information, it i .to("log:info"); ---- -Any span executed after the `setHeader` will include a baggage variable named `myValue` with value `1234` which will be reflected in your telemetry result. +Any span executed after the `setProperty` will include a baggage variable named `myValue` with value `1234` which will be reflected in your telemetry result. NOTE: any baggage setting defined externally (i.e., calling the Camel process with a context propagation) is normally going to be propagated in Camel logic. diff --git a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java index 813ce637d16f7..23e0165cdb199 100644 --- a/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java +++ b/components/camel-opentelemetry2/src/main/java/org/apache/camel/opentelemetry2/TraceProcessorsOtelInterceptStrategy.java @@ -18,6 +18,7 @@ import java.util.concurrent.CompletableFuture; +import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.context.Scope; import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; @@ -62,8 +63,9 @@ public void process(Exchange exchange) throws Exception { Span activeSpan = spanStorage.peek(exchange); if (activeSpan != null) { OpenTelemetrySpanAdapter otelSpan = (OpenTelemetrySpanAdapter) activeSpan; + Baggage baggage = getBaggageFromProperties(otelSpan.getBaggage(), exchange); try (Scope scope = otelSpan.getSpan().makeCurrent(); - Scope baggageScope = otelSpan.getBaggage().makeCurrent()) { + Scope baggageScope = baggage.makeCurrent()) { processor.process(exchange); } } else { @@ -76,8 +78,9 @@ public boolean process(Exchange exchange, AsyncCallback callback) { Span activeSpan = spanStorage.peek(exchange); if (activeSpan != null) { OpenTelemetrySpanAdapter otelSpan = (OpenTelemetrySpanAdapter) activeSpan; + Baggage baggage = getBaggageFromProperties(otelSpan.getBaggage(), exchange); try (Scope scope = otelSpan.getSpan().makeCurrent(); - Scope baggageScope = otelSpan.getBaggage().makeCurrent()) { + Scope baggageScope = baggage.makeCurrent()) { return processor.process(exchange, doneSync -> { callback.done(doneSync); }); @@ -98,4 +101,26 @@ public CompletableFuture processAsync(Exchange exchange) { } } + // We inspect the exchange in order to find any baggage variable + private Baggage getBaggageFromProperties(Baggage baggage, Exchange exchange) { + for (String propertyKey : exchange.getProperties().keySet()) { + String key = getBaggageVar(propertyKey); + if (key != null) { + String value = exchange.getProperty(propertyKey) == null + ? null : exchange.getProperty(propertyKey).toString(); + baggage = baggage.toBuilder().put(key, value).build(); + } + } + + return baggage; + } + + private String getBaggageVar(String key) { + if (key == null || !key.startsWith(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY)) { + return null; + } + + return key.substring(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY.length()); + } + } diff --git a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageSettingTest.java b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageSettingTest.java index dbbc1fd0710fe..f953f00985643 100644 --- a/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageSettingTest.java +++ b/components/camel-opentelemetry2/src/test/java/org/apache/camel/opentelemetry2/BaggageSettingTest.java @@ -87,14 +87,12 @@ protected RoutesBuilder createRouteBuilder() { @Override public void configure() { from("direct:start") - .setHeader("OTEL_BAGGAGE_tenant.id", constant("1234")) + .setProperty("CamelBaggage_tenant.id", constant("1234")) .routeId("start") .log("A message") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { - exchange.getIn().setHeader("operation", "fake"); - assertEquals("1234", Baggage.current().getEntryValue("tenant.id")); } }) diff --git a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/Tracer.java b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/Tracer.java index 940b9878befc3..f3bf066d0bcb3 100644 --- a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/Tracer.java +++ b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/Tracer.java @@ -47,6 +47,7 @@ public abstract class Tracer extends ServiceSupport implements CamelTracingServi public static final String TRACE_HEADER = "CAMEL_TRACE_ID"; public static final String SPAN_HEADER = "CAMEL_SPAN_ID"; + public static final String BAGGAGE_PROPERTY = "CamelBaggage_"; private static final Logger LOG = LoggerFactory.getLogger(Tracer.class);