From 9e92d8b7440fc9775155423ae75bc31bb6ad1c6b Mon Sep 17 00:00:00 2001 From: Croway Date: Fri, 10 Apr 2026 14:45:17 +0200 Subject: [PATCH] camel-telemetry: fix trace context propagation for messaging components AbstractMessagingSpanDecorator used the default CamelHeadersSpanContextPropagationExtractor which only handles String-valued headers. Messaging transports like Kafka deliver headers as byte[], so trace context headers (e.g. traceparent) were silently dropped, breaking distributed trace propagation. Add CamelMessagingHeadersSpanContextPropagationExtractor that handles both String and byte[] headers, and override getExtractor() in AbstractMessagingSpanDecorator so all messaging components (Kafka, AMQP, SJMS, STOMP, Spring RabbitMQ, Azure Service Bus, etc.) benefit from the fix. The issue was discovered while upgrading the camel-spring-boot opentelemetry example from camel-opentelemetry to camel-opentelemetry2. --- ...eadersSpanContextPropagationExtractor.java | 14 +++++-- .../CamelHeadersExtractAdapterTest.java | 37 +++++++++++++++++++ 2 files changed, 48 insertions(+), 3 deletions(-) diff --git a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/propagation/CamelHeadersSpanContextPropagationExtractor.java b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/propagation/CamelHeadersSpanContextPropagationExtractor.java index b49bd40e31b28..a2df59633f502 100644 --- a/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/propagation/CamelHeadersSpanContextPropagationExtractor.java +++ b/components/camel-telemetry/src/main/java/org/apache/camel/telemetry/propagation/CamelHeadersSpanContextPropagationExtractor.java @@ -16,6 +16,7 @@ */ package org.apache.camel.telemetry.propagation; +import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Map; import java.util.Set; @@ -27,9 +28,16 @@ public final class CamelHeadersSpanContextPropagationExtractor implements SpanCo private final Map map = new CaseInsensitiveMap(); public CamelHeadersSpanContextPropagationExtractor(final Map map) { - // Extract string valued map entries - map.entrySet().stream().filter(e -> e.getValue() instanceof String) - .forEach(e -> this.map.put(e.getKey(), e.getValue())); + // Extract string and byte[] valued map entries. + // Messaging transports (Kafka, AMQP, etc.) may deliver headers as byte arrays, + // so we convert them to String for the W3C propagator to extract trace context. + map.entrySet().stream().filter(e -> e.getValue() instanceof String || e.getValue() instanceof byte[]).forEach(e -> { + if (e.getValue() instanceof byte[] bytes) { + this.map.put(e.getKey(), new String(bytes, StandardCharsets.UTF_8)); + } else { + this.map.put(e.getKey(), e.getValue()); + } + }); } @Override diff --git a/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/propagation/CamelHeadersExtractAdapterTest.java b/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/propagation/CamelHeadersExtractAdapterTest.java index b3c609ef9375f..dc4d5b3b33861 100644 --- a/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/propagation/CamelHeadersExtractAdapterTest.java +++ b/components/camel-telemetry/src/test/java/org/apache/camel/telemetry/propagation/CamelHeadersExtractAdapterTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.telemetry.propagation; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Iterator; import java.util.Map; @@ -26,6 +27,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; public class CamelHeadersExtractAdapterTest { @@ -69,4 +72,38 @@ public void keyWithDifferentCase() { SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map); assertEquals("value", adapter.get("KeY")); } + + @Test + public void byteArrayProperty() { + map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8)); + SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map); + assertEquals("00-abc123-def456-01", adapter.get("traceparent")); + } + + @Test + public void mixedStringAndByteArrayProperties() { + map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8)); + map.put("custom-header", "custom-value"); + SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map); + assertEquals("00-abc123-def456-01", adapter.get("traceparent")); + assertEquals("custom-value", adapter.get("custom-header")); + } + + @Test + public void nonStringNonByteArrayPropertyIsFiltered() { + map.put("integer-header", 42); + map.put("key", "value"); + SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map); + assertNull(adapter.get("integer-header")); + assertEquals("value", adapter.get("key")); + assertTrue(adapter.keys().contains("key")); + assertFalse(adapter.keys().contains("integer-header")); + } + + @Test + public void byteArrayKeyWithDifferentCase() { + map.put("traceparent", "00-abc123-def456-01".getBytes(StandardCharsets.UTF_8)); + SpanContextPropagationExtractor adapter = new CamelHeadersSpanContextPropagationExtractor(map); + assertEquals("00-abc123-def456-01", adapter.get("TraceParent")); + } }