Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ include::spring-boot:partial$starter.adoc[]

=== Spring Boot context propagation

The starter is in charge to autoconfigure the component. Additionally you will need to specify the concrete Propagation implementation by adding the dependency you wish to use (for example, `io.micrometer:micrometer-tracing-bridge-otel`, 'io.micrometer:micrometer-tracing-bridge-brave' or any other technology you wish to use). If none is provided, a "no-op" implementation will be defined as default.
The starter is in charge to autoconfigure the component. Additionally you will need to specify the concrete Propagation implementation by adding the dependency you wish to use (for example, `io.micrometer:micrometer-tracing-bridge-otel`, `io.micrometer:micrometer-tracing-bridge-brave` or any other technology you wish to use). If none is provided, a "no-op" implementation will be defined as default.

== Using with standalone Camel

Expand Down Expand Up @@ -121,7 +121,7 @@ import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
event -> {
},
new OtelBaggageManager(
currentTraceContext, List.of(MicrometerObservabilitySpanAdapter.BAGGAGE_CAMEL_FLAG), List.of()));
currentTraceContext, List.of(), List.of()));
OtelPropagator otelPropagator = new OtelPropagator(propagators, otelTracer);
getContext().getRegistry().bind("MicrometerObservabilityTracer", micrometerTracer);
getContext().getRegistry().bind("OpentelemetryPropagators", otelPropagator);
Expand All @@ -135,8 +135,6 @@ NOTE: this is an example that can be used as a reference. It may not work exactl

You can see that the configuration of this component may get a bit difficult, unless you are already familiar with the tracing technology you're going to implement.

NOTE: an important thing to do is to include a `BaggageManager` specifying the baggage properties to include during context propagation. The field provided in the example, `MicrometerObservabilitySpanAdapter.BAGGAGE_CAMEL_FLAG`, is necessary to handle consistency across asynchronous threads.

=== How to trace

Once the application is instrumented and configured, you can observe the traces produced with the tooling compatible to the concrete implementation you have in place. You are invited to follow the specific documentation of each technology.
Expand Down Expand Up @@ -173,3 +171,30 @@ public void process(Exchange exchange) throws Exception {
mySpan.end();
}
----

=== Baggage customization

`Baggage` is a way to attach key-value metadata to a request and carry it across service boundaries. In the context of telemetry technologies, baggage travels along with the context (like trace/span), but it's meant for custom data you define, not telemetry internals. Camel allows you to programmatically provide any `Baggage` information via Exchange property settings. Whenever the component finds a property defined as `CamelBaggage_xyz` it will consider it as a baggage variable named `xyz`. For example, in Java DSL:

[source,java]
----
from("direct:start")
.setProperty("CamelBaggage_myValue", constant("1234"))
.routeId("start")
.log("A message")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
// Baggage is available via the Micrometer Observability API
String val = tracer.getBaggage("myValue").get();
}
})
.to("log:info");
----

Any span executed after the `setProperty` will include a baggage variable named `myValue` with value `1234` which will be reflected in your telemetry result as well.

NOTE: an important thing to do is to include a `BaggageManager` specifying the baggage properties to include during context propagation. You may require to configure those baggages fields you want to propagate across the boards.

Any baggage setting defined externally (i.e., calling the Camel process into another external process where you're setting some baggage) is normally going to be propagated in Camel logic.

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@
*/
package org.apache.camel.micrometer.observability;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

import io.micrometer.tracing.BaggageInScope;
import io.micrometer.tracing.Tracer;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
Expand Down Expand Up @@ -67,8 +70,9 @@ public TraceProcessor(Processor target) {
public void process(Exchange exchange) throws Exception {
Span activeSpan = spanStorage.peek(exchange);
if (activeSpan != null) {
MicrometerObservabilitySpanAdapter otelSpan = (MicrometerObservabilitySpanAdapter) activeSpan;
try (Tracer.SpanInScope scope = tracer.withSpan(otelSpan.getSpan())) {
MicrometerObservabilitySpanAdapter microObsSpan = (MicrometerObservabilitySpanAdapter) activeSpan;
try (Tracer.SpanInScope scope = tracer.withSpan(microObsSpan.getSpan());
ScopedBaggages scopedBaggages = new ScopedBaggages(getBaggageFromProperties(exchange))) {
processor.process(exchange);
}
} else {
Expand All @@ -80,8 +84,9 @@ public void process(Exchange exchange) throws Exception {
public boolean process(Exchange exchange, AsyncCallback callback) {
Span activeSpan = spanStorage.peek(exchange);
if (activeSpan != null) {
MicrometerObservabilitySpanAdapter otelSpan = (MicrometerObservabilitySpanAdapter) activeSpan;
try (Tracer.SpanInScope scope = tracer.withSpan(otelSpan.getSpan())) {
MicrometerObservabilitySpanAdapter microObsSpan = (MicrometerObservabilitySpanAdapter) activeSpan;
try (Tracer.SpanInScope scope = tracer.withSpan(microObsSpan.getSpan());
ScopedBaggages scopedBaggages = new ScopedBaggages(getBaggageFromProperties(exchange))) {
return processor.process(exchange, doneSync -> {
callback.done(doneSync);
});
Expand All @@ -100,6 +105,59 @@ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
process(exchange, callback);
return callback.getFuture();
}

// We inspect the exchange in order to find any baggage variable
private List<BaggageInScope> getBaggageFromProperties(Exchange exchange) {
List<BaggageInScope> baggages = new ArrayList<>();

for (String propertyKey : exchange.getProperties().keySet()) {
String key = getBaggageVar(propertyKey);
if (key != null) {
String value = exchange.getProperty(propertyKey) == null
? null : exchange.getProperty(propertyKey).toString();
baggages.add(tracer.createBaggageInScope(key, value));
}
}

return baggages;
}

private String getBaggageVar(String key) {
if (key == null || !key.startsWith(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY)) {
return null;
}

return key.substring(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY.length());
}
}
}

class ScopedBaggages implements AutoCloseable {

private final List<BaggageInScope> baggages;

public ScopedBaggages(List<BaggageInScope> baggages) {
this.baggages = new ArrayList<>(baggages);
}

@Override
public void close() {
RuntimeException failure = null;
for (int i = baggages.size() - 1; i >= 0; i--) {
try {
baggages.get(i).close();
} catch (Exception e) {
if (failure == null) {
failure = new RuntimeException(
"Failed to close baggage scopes", e);
} else {
failure.addSuppressed(e);
}
}
}

if (failure != null) {
throw failure;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.micrometer.observability;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import io.opentelemetry.sdk.trace.data.SpanData;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.micrometer.observability.CamelOpenTelemetryExtension.OtelTrace;
import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class BaggageSettingTest extends MicrometerObservabilityTracerPropagationTestSupport {

@Override
protected CamelContext createCamelContext() throws Exception {
CamelContext ctx = super.createCamelContext();
tst.setTraceProcessors(true);
tst.setDisableCoreProcessors(false);

return ctx;
}

@Test
void testRouteProgrammaticBaggage() throws IOException {
template.sendBody("direct:start", "my-body");
Map<String, OtelTrace> traces = otelExtension.getTraces();
assertEquals(1, traces.size());
checkTrace(traces.values().iterator().next());
}

private void checkTrace(OtelTrace trace) {
List<SpanData> spans = trace.getSpans();
assertEquals(7, spans.size());
SpanData testProducer = spans.get(0);
SpanData direct = spans.get(1);
SpanData setHeaders = spans.get(2);
SpanData innerLog = spans.get(3);
SpanData innerProcessor = spans.get(4);
SpanData log = spans.get(5);
SpanData innerToLog = spans.get(6);

// Validate span completion
assertTrue(testProducer.hasEnded());
assertTrue(direct.hasEnded());
assertTrue(setHeaders.hasEnded());
assertTrue(innerLog.hasEnded());
assertTrue(innerProcessor.hasEnded());
assertTrue(log.hasEnded());
assertTrue(innerToLog.hasEnded());
}

@Override
protected RoutesBuilder createRouteBuilder() {
return new RouteBuilder() {

@Override
public void configure() {
from("direct:start")
.setProperty("CamelBaggage_tenant.id", constant("1234"))
.routeId("start")
.log("A message")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
assertEquals("1234", tracer.getBaggage("tenant.id").get());
}
})
.to("log:info");
}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -241,25 +241,24 @@ For this reason, whenever you need to provide custom telemetry information, it i

=== Baggage customization

`Baggage` is a way to attach key-value metadata to a request and carry it across service boundaries. In the context of OpenTelemetry, baggage travels along with the context (like trace/span), but it's meant for custom data you define, not telemetry internals. Camel allows you to programmatically provide any `Baggage` information via header settings. Whenever the component finds an header defined as `OTEL_BAGGAGE_xyz` it will consider it as a baggage variable named `xyz`. For example, in Java DSL:
`Baggage` is a way to attach key-value metadata to a request and carry it across service boundaries. In the context of OpenTelemetry, baggage travels along with the context (like trace/span), but it's meant for custom data you define, not telemetry internals. Camel allows you to programmatically provide any `Baggage` information via Exchange property settings. Whenever the component finds a property defined as `CamelBaggage_xyz` it will consider it as a baggage variable named `xyz`. For example, in Java DSL:

[source,java]
----
from("direct:start")
.setHeader("OTEL_BAGGAGE_myValue", constant("1234"))
.setProperty("CamelBaggage_myValue", constant("1234"))
.routeId("start")
.log("A message")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader("operation", "fake");
// Baggage is available via the OpenTelemetry API
String val = Baggage.current().getEntryValue("myValue");
}
})
.to("log:info");
----

Any span executed after the `setHeader` will include a baggage variable named `myValue` with value `1234` which will be reflected in your telemetry result.
Any span executed after the `setProperty` will include a baggage variable named `myValue` with value `1234` which will be reflected in your telemetry result.

NOTE: any baggage setting defined externally (i.e., calling the Camel process with a context propagation) is normally going to be propagated in Camel logic.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.concurrent.CompletableFuture;

import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.context.Scope;
import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelContext;
Expand Down Expand Up @@ -62,8 +63,9 @@ public void process(Exchange exchange) throws Exception {
Span activeSpan = spanStorage.peek(exchange);
if (activeSpan != null) {
OpenTelemetrySpanAdapter otelSpan = (OpenTelemetrySpanAdapter) activeSpan;
Baggage baggage = getBaggageFromProperties(otelSpan.getBaggage(), exchange);
try (Scope scope = otelSpan.getSpan().makeCurrent();
Scope baggageScope = otelSpan.getBaggage().makeCurrent()) {
Scope baggageScope = baggage.makeCurrent()) {
processor.process(exchange);
}
} else {
Expand All @@ -76,8 +78,9 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
Span activeSpan = spanStorage.peek(exchange);
if (activeSpan != null) {
OpenTelemetrySpanAdapter otelSpan = (OpenTelemetrySpanAdapter) activeSpan;
Baggage baggage = getBaggageFromProperties(otelSpan.getBaggage(), exchange);
try (Scope scope = otelSpan.getSpan().makeCurrent();
Scope baggageScope = otelSpan.getBaggage().makeCurrent()) {
Scope baggageScope = baggage.makeCurrent()) {
return processor.process(exchange, doneSync -> {
callback.done(doneSync);
});
Expand All @@ -98,4 +101,26 @@ public CompletableFuture<Exchange> processAsync(Exchange exchange) {
}
}

// We inspect the exchange in order to find any baggage variable
private Baggage getBaggageFromProperties(Baggage baggage, Exchange exchange) {
for (String propertyKey : exchange.getProperties().keySet()) {
String key = getBaggageVar(propertyKey);
if (key != null) {
String value = exchange.getProperty(propertyKey) == null
? null : exchange.getProperty(propertyKey).toString();
baggage = baggage.toBuilder().put(key, value).build();
}
}

return baggage;
}

private String getBaggageVar(String key) {
if (key == null || !key.startsWith(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY)) {
return null;
}

return key.substring(org.apache.camel.telemetry.Tracer.BAGGAGE_PROPERTY.length());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@ protected RoutesBuilder createRouteBuilder() {
@Override
public void configure() {
from("direct:start")
.setHeader("OTEL_BAGGAGE_tenant.id", constant("1234"))
.setProperty("CamelBaggage_tenant.id", constant("1234"))
.routeId("start")
.log("A message")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
exchange.getIn().setHeader("operation", "fake");

assertEquals("1234", Baggage.current().getEntryValue("tenant.id"));
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class Tracer extends ServiceSupport implements CamelTracingServi

public static final String TRACE_HEADER = "CAMEL_TRACE_ID";
public static final String SPAN_HEADER = "CAMEL_SPAN_ID";
public static final String BAGGAGE_PROPERTY = "CamelBaggage_";

private static final Logger LOG = LoggerFactory.getLogger(Tracer.class);

Expand Down