diff --git a/sentry-kafka/README.md b/sentry-kafka/README.md index ef4b531985..1b1b69238e 100644 --- a/sentry-kafka/README.md +++ b/sentry-kafka/README.md @@ -2,4 +2,4 @@ This module provides Kafka-native queue instrumentation for applications using `kafka-clients` directly. -Spring users should use `sentry-spring-boot-jakarta` / `sentry-spring-jakarta`, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks. +Spring users should use the Sentry Spring (Boot) SDKs, which provide higher-fidelity consumer instrumentation via Spring Kafka hooks. diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java index 1231cae15e..dbce760de9 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java @@ -143,8 +143,8 @@ private boolean isIgnored() { } final @NotNull TransactionContext txContext = - continued != null ? continued : new TransactionContext("queue.process", "queue.process"); - txContext.setName("queue.process"); + continued != null ? continued : new TransactionContext(record.topic(), "queue.process"); + txContext.setName(record.topic()); txContext.setOperation("queue.process"); final @NotNull TransactionOptions txOptions = new TransactionOptions(); @@ -204,7 +204,6 @@ private void finishTransaction( } transaction.finish(); } catch (Throwable t) { - // Instrumentation must never break customer processing. scopes .getOptions() .getLogger() diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java index 7400e5ba2c..1b682edf15 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java @@ -29,13 +29,8 @@ import org.jetbrains.annotations.Nullable; /** - * Wraps a Kafka {@link Producer} via {@link Proxy} to record a {@code queue.publish} span around - * each {@code send} and to inject Sentry trace propagation headers into the produced record. - * - *

Only the two {@code send} overloads are intercepted; every other {@link Producer} method is - * forwarded directly to the delegate. Because the wrapper is a dynamic proxy, it is compatible with - * any Kafka client version — new methods added to the {@link Producer} interface in future Kafka - * releases are forwarded automatically without recompilation. + * Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send} + * and to inject Sentry trace propagation headers into the produced record. * *

For raw Kafka usage: * @@ -44,9 +39,8 @@ * SentryKafkaProducer.wrap(new KafkaProducer<>(props)); * } * - *

For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code - * sentry-spring-jakarta} installs this wrapper automatically via {@code - * ProducerFactory.addPostProcessor(...)}. + *

For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} installs this wrapper + * automatically. */ @ApiStatus.Experimental public final class SentryKafkaProducer { @@ -57,7 +51,7 @@ public final class SentryKafkaProducer { private SentryKafkaProducer() {} /** - * Wraps the given producer with Sentry instrumentation using the global scopes. + * Wraps the given producer with Sentry instrumentation. * * @param delegate the Kafka producer to wrap * @return an instrumented producer that records {@code queue.publish} spans diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt index 3bd992e8c8..5529e42c71 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt @@ -91,7 +91,7 @@ class SentryKafkaConsumerTracingTest { verify(forkedScopes).continueTrace(eq(sentryTraceValue), eq(listOf(baggageValue))) verify(forkedScopes).startTransaction(txContextCaptor.capture(), txOptionsCaptor.capture()) - assertEquals("queue.process", txContextCaptor.firstValue.name) + assertEquals("my-topic", txContextCaptor.firstValue.name) assertEquals("queue.process", txContextCaptor.firstValue.operation) assertEquals(SentryKafkaConsumerTracing.TRACE_ORIGIN, txOptionsCaptor.firstValue.origin) assertTrue(txOptionsCaptor.firstValue.isBindToScope) diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt index 15ea2d104e..a662039768 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt @@ -73,12 +73,6 @@ class SentryKafkaProducerTest { Sentry.close() } - private fun createTransaction(): SentryTracer { - val tx = SentryTracer(TransactionContext("tx", "op"), scopes) - whenever(scopes.span).thenReturn(tx) - return tx - } - @Test fun `creates queue publish span and injects headers`() { val tx = createTransaction() @@ -358,4 +352,10 @@ class SentryKafkaProducerTest { val producer = SentryKafkaProducer.wrap(delegate, scopes) assertTrue(producer.toString().startsWith("SentryKafkaProducer[delegate=")) } + + private fun createTransaction(): SentryTracer { + val tx = SentryTracer(TransactionContext("tx", "op"), scopes) + whenever(scopes.span).thenReturn(tx) + return tx + } } diff --git a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java index e7fc873908..2583f4a046 100644 --- a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java +++ b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SentrySpanExporter.java @@ -362,10 +362,6 @@ private void transferSpanDetails( maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_ID); maybeTransferOtelAttribute(span, sentryTransaction, ThreadIncubatingAttributes.THREAD_NAME); - // Root transactions don't bulk-copy OTel attributes into span data (unlike child spans). - // The Sentry Queues product reads `trace.data.messaging.*`, so messaging attributes must - // be explicitly transferred for consumer root transactions to show up correctly. These are - // operational metadata (no payload contents) and are safe to transfer unconditionally. maybeTransferOtelAttribute( span, sentryTransaction, MessagingIncubatingAttributes.MESSAGING_SYSTEM); maybeTransferOtelAttribute( diff --git a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java index 90db227505..3af3d8f96f 100644 --- a/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java +++ b/sentry-opentelemetry/sentry-opentelemetry-core/src/main/java/io/sentry/opentelemetry/SpanDescriptionExtractor.java @@ -116,9 +116,6 @@ private OtelSpanInfo descriptionForMessagingSystem(final @NotNull SpanData otelS @SuppressWarnings("deprecation") private @NotNull String opForMessaging(final @NotNull SpanData otelSpan) { final @NotNull Attributes attributes = otelSpan.getAttributes(); - // Prefer `messaging.operation.type` (current OTel semconv), fall back to legacy - // `messaging.operation`. OTel's SpanKind.CONSUMER is overloaded for both `receive` and - // `process`, so attribute-first mapping is required. SpanKind is used only as a last resort. @Nullable String operationType = attributes.get(MessagingIncubatingAttributes.MESSAGING_OPERATION_TYPE); if (operationType == null) { @@ -139,7 +136,6 @@ private OtelSpanInfo descriptionForMessagingSystem(final @NotNull SpanData otelS case "settle": return "queue.settle"; default: - // fall through to SpanKind mapping break; } } diff --git a/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaRecordInterceptor.java index a49e8473c4..b2b4d20b94 100644 --- a/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-7/src/main/java/io/sentry/spring7/kafka/SentryKafkaRecordInterceptor.java @@ -159,8 +159,8 @@ private boolean isIgnored() { final @NotNull TransactionContext txContext = transactionContext != null ? transactionContext - : new TransactionContext("queue.process", "queue.process"); - txContext.setName("queue.process"); + : new TransactionContext(record.topic(), "queue.process"); + txContext.setName(record.topic()); txContext.setOperation("queue.process"); final @NotNull TransactionOptions txOptions = new TransactionOptions(); diff --git a/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaRecordInterceptorTest.kt index 82174fa80e..2738f99f4d 100644 --- a/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-7/src/test/kotlin/io/sentry/spring7/kafka/SentryKafkaRecordInterceptorTest.kt @@ -136,6 +136,14 @@ class SentryKafkaRecordInterceptorTest { verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor") verify(forkedScopes).makeCurrent() + verify(forkedScopes) + .startTransaction( + org.mockito.kotlin.check { + assertEquals("my-topic", it.name) + assertEquals("queue.process", it.operation) + }, + any(), + ) } @Test diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index 3f5da4947d..7253571269 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -159,8 +159,8 @@ private boolean isIgnored() { final @NotNull TransactionContext txContext = transactionContext != null ? transactionContext - : new TransactionContext("queue.process", "queue.process"); - txContext.setName("queue.process"); + : new TransactionContext(record.topic(), "queue.process"); + txContext.setName(record.topic()); txContext.setOperation("queue.process"); final @NotNull TransactionOptions txOptions = new TransactionOptions(); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 16373a8be0..b09d4f5e14 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -136,6 +136,14 @@ class SentryKafkaRecordInterceptorTest { verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor") verify(forkedScopes).makeCurrent() + verify(forkedScopes) + .startTransaction( + org.mockito.kotlin.check { + assertEquals("my-topic", it.name) + assertEquals("queue.process", it.operation) + }, + any(), + ) } @Test diff --git a/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaRecordInterceptor.java index 8d848e4073..d1ad308609 100644 --- a/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring/src/main/java/io/sentry/spring/kafka/SentryKafkaRecordInterceptor.java @@ -165,8 +165,8 @@ private boolean isIgnored() { final @NotNull TransactionContext txContext = transactionContext != null ? transactionContext - : new TransactionContext("queue.process", "queue.process"); - txContext.setName("queue.process"); + : new TransactionContext(record.topic(), "queue.process"); + txContext.setName(record.topic()); txContext.setOperation("queue.process"); final @NotNull TransactionOptions txOptions = new TransactionOptions(); diff --git a/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaRecordInterceptorTest.kt index 6c50b9e675..17df004d40 100644 --- a/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring/src/test/kotlin/io/sentry/spring/kafka/SentryKafkaRecordInterceptorTest.kt @@ -136,6 +136,14 @@ class SentryKafkaRecordInterceptorTest { verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor") verify(forkedScopes).makeCurrent() + verify(forkedScopes) + .startTransaction( + org.mockito.kotlin.check { + assertEquals("my-topic", it.name) + assertEquals("queue.process", it.operation) + }, + any(), + ) } @Test