diff --git a/CHANGELOG.md b/CHANGELOG.md index 68dd4433f7..be35e99df6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ ### Fixes +- Inject Kafka trace headers even without an active span so distributed tracing works for background workers and `@Scheduled` jobs ([#5338](https://github.com/getsentry/sentry-java/pull/5338)) - Write the `sentry-task-enqueued-time` Kafka header as a plain decimal so cross-SDK consumers (e.g. sentry-python) can parse it ([#5328](https://github.com/getsentry/sentry-java/pull/5328)) ## 8.37.1 diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index ce5b0efb66..64bb34a229 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -9,15 +9,27 @@ public final class io/sentry/kafka/SentryKafkaConsumerTracing { public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object; } -public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { +public final class io/sentry/kafka/SentryKafkaProducer : org/apache/kafka/clients/producer/Producer { public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String; public static final field TRACE_ORIGIN Ljava/lang/String; - public fun ()V - public fun (Lio/sentry/IScopes;)V - public fun (Lio/sentry/IScopes;Ljava/lang/String;)V + public fun (Lorg/apache/kafka/clients/producer/Producer;)V + public fun (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;)V + public fun (Lorg/apache/kafka/clients/producer/Producer;Lio/sentry/IScopes;Ljava/lang/String;)V + public fun abortTransaction ()V + public fun beginTransaction ()V + public fun clientInstanceId (Ljava/time/Duration;)Lorg/apache/kafka/common/Uuid; public fun close ()V - public fun configure (Ljava/util/Map;)V - public fun onAcknowledgement (Lorg/apache/kafka/clients/producer/RecordMetadata;Ljava/lang/Exception;)V - public fun onSend (Lorg/apache/kafka/clients/producer/ProducerRecord;)Lorg/apache/kafka/clients/producer/ProducerRecord; + public fun close (Ljava/time/Duration;)V + public fun commitTransaction ()V + public fun flush ()V + public fun getDelegate ()Lorg/apache/kafka/clients/producer/Producer; + public fun initTransactions ()V + public fun metrics ()Ljava/util/Map; + public fun partitionsFor (Ljava/lang/String;)Ljava/util/List; + public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;)Ljava/util/concurrent/Future; + public fun send (Lorg/apache/kafka/clients/producer/ProducerRecord;Lorg/apache/kafka/clients/producer/Callback;)Ljava/util/concurrent/Future; + public fun sendOffsetsToTransaction (Ljava/util/Map;Ljava/lang/String;)V + public fun sendOffsetsToTransaction (Ljava/util/Map;Lorg/apache/kafka/clients/consumer/ConsumerGroupMetadata;)V + public fun toString ()Ljava/lang/String; } diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java index 37c7073038..1231cae15e 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java @@ -241,7 +241,7 @@ private void finishTransaction( private @Nullable Long receiveLatency(final @NotNull ConsumerRecord record) { final @Nullable String enqueuedTimeStr = - headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); + headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr == null) { return null; } diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java new file mode 100644 index 0000000000..500e2bc90e --- /dev/null +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducer.java @@ -0,0 +1,281 @@ +package io.sentry.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.DateUtils; +import io.sentry.IScopes; +import io.sentry.ISpan; +import io.sentry.ScopesAdapter; +import io.sentry.SentryLevel; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanOptions; +import io.sentry.SpanStatus; +import io.sentry.util.SpanUtils; +import io.sentry.util.TracingUtils; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; +import org.apache.kafka.clients.consumer.ConsumerGroupMetadata; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.Metric; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * Wraps a Kafka {@link Producer} to record a {@code queue.publish} span around each {@code send} + * and to inject Sentry trace propagation headers into the produced record. + * + *

Unlike a {@link org.apache.kafka.clients.producer.ProducerInterceptor}, the wrapper keeps the + * span open until the send callback fires, so the span reflects the actual broker-ack lifecycle. + * + *

For raw Kafka usage: + * + *

{@code
+ * Producer producer =
+ *     new SentryKafkaProducer<>(new KafkaProducer<>(props));
+ * }
+ * + *

For Spring Kafka, the {@code SentryKafkaProducerBeanPostProcessor} in {@code + * sentry-spring-jakarta} installs this wrapper automatically via {@code + * ProducerFactory.addPostProcessor(...)}. + */ +@ApiStatus.Experimental +public final class SentryKafkaProducer implements Producer { + + public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer"; + public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; + + private final @NotNull Producer delegate; + private final @NotNull IScopes scopes; + private final @NotNull String traceOrigin; + + public SentryKafkaProducer(final @NotNull Producer delegate) { + this(delegate, ScopesAdapter.getInstance(), TRACE_ORIGIN); + } + + public SentryKafkaProducer( + final @NotNull Producer delegate, final @NotNull IScopes scopes) { + this(delegate, scopes, TRACE_ORIGIN); + } + + public SentryKafkaProducer( + final @NotNull Producer delegate, + final @NotNull IScopes scopes, + final @NotNull String traceOrigin) { + this.delegate = delegate; + this.scopes = scopes; + this.traceOrigin = traceOrigin; + } + + /** Returns the wrapped producer. */ + public @NotNull Producer getDelegate() { + return delegate; + } + + @Override + public @NotNull Future send(final @NotNull ProducerRecord record) { + return send(record, null); + } + + @Override + public @NotNull Future send( + final @NotNull ProducerRecord record, final @Nullable Callback callback) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { + return delegate.send(record, callback); + } + + final @Nullable ISpan activeSpan = scopes.getSpan(); + if (activeSpan == null || activeSpan.isNoOp()) { + maybeInjectHeaders(record.headers(), null); + return delegate.send(record, callback); + } + + final @NotNull SpanOptions spanOptions = new SpanOptions(); + spanOptions.setOrigin(traceOrigin); + final @NotNull ISpan span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); + + span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + maybeInjectHeaders(record.headers(), span); + + try { + return delegate.send(record, wrapCallback(callback, span)); + } catch (Throwable t) { + finishWithError(span, t); + throw t; + } + } + + private @NotNull Callback wrapCallback( + final @Nullable Callback userCallback, final @NotNull ISpan span) { + return (metadata, exception) -> { + try { + if (exception != null) { + span.setThrowable(exception); + span.setStatus(SpanStatus.INTERNAL_ERROR); + } else { + span.setStatus(SpanStatus.OK); + } + } catch (Throwable t) { + scopes + .getOptions() + .getLogger() + .log(SentryLevel.ERROR, "Failed to set status on Kafka producer span.", t); + } finally { + span.finish(); + if (userCallback != null) { + userCallback.onCompletion(metadata, exception); + } + } + }; + } + + private void finishWithError(final @NotNull ISpan span, final @NotNull Throwable t) { + span.setThrowable(t); + span.setStatus(SpanStatus.INTERNAL_ERROR); + span.finish(); + } + + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); + } + + private void maybeInjectHeaders(final @NotNull Headers headers, final @Nullable ISpan span) { + try { + final @Nullable List existingBaggageHeaders = + readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); + final @Nullable TracingUtils.TracingHeaders tracingHeaders = + TracingUtils.trace(scopes, existingBaggageHeaders, span); + if (tracingHeaders != null) { + final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); + headers.remove(sentryTraceHeader.getName()); + headers.add( + sentryTraceHeader.getName(), + sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); + + final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); + if (baggageHeader != null) { + headers.remove(baggageHeader.getName()); + headers.add( + baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); + } + } + + headers.remove(SENTRY_ENQUEUED_TIME_HEADER); + headers.add( + SENTRY_ENQUEUED_TIME_HEADER, + DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis())) + .toString() + .getBytes(StandardCharsets.UTF_8)); + } catch (Throwable t) { + scopes + .getOptions() + .getLogger() + .log(SentryLevel.ERROR, "Failed to inject Sentry headers into Kafka record.", t); + } + } + + private static @Nullable List readHeaderValues( + final @NotNull Headers headers, final @NotNull String name) { + @Nullable List values = null; + for (final @NotNull Header header : headers.headers(name)) { + final byte @Nullable [] value = header.value(); + if (value != null) { + if (values == null) { + values = new ArrayList<>(); + } + values.add(new String(value, StandardCharsets.UTF_8)); + } + } + return values; + } + + // --- Pure delegation for everything else --- + + @Override + public void initTransactions() { + delegate.initTransactions(); + } + + @Override + public void beginTransaction() throws ProducerFencedException { + delegate.beginTransaction(); + } + + @Override + @SuppressWarnings("deprecation") + public void sendOffsetsToTransaction( + final @NotNull Map offsets, + final @NotNull String consumerGroupId) + throws ProducerFencedException { + delegate.sendOffsetsToTransaction(offsets, consumerGroupId); + } + + @Override + public void sendOffsetsToTransaction( + final @NotNull Map offsets, + final @NotNull ConsumerGroupMetadata groupMetadata) + throws ProducerFencedException { + delegate.sendOffsetsToTransaction(offsets, groupMetadata); + } + + @Override + public void commitTransaction() throws ProducerFencedException { + delegate.commitTransaction(); + } + + @Override + public void abortTransaction() throws ProducerFencedException { + delegate.abortTransaction(); + } + + @Override + public void flush() { + delegate.flush(); + } + + @Override + public @NotNull List partitionsFor(final @NotNull String topic) { + return delegate.partitionsFor(topic); + } + + @Override + public @NotNull Map metrics() { + return delegate.metrics(); + } + + @Override + public @NotNull Uuid clientInstanceId(final @NotNull Duration timeout) { + return delegate.clientInstanceId(timeout); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public void close(final @NotNull Duration timeout) { + delegate.close(timeout); + } + + @Override + public @NotNull String toString() { + return "SentryKafkaProducer[delegate=" + delegate + "]"; + } +} diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java deleted file mode 100644 index 6bcb424397..0000000000 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java +++ /dev/null @@ -1,150 +0,0 @@ -package io.sentry.kafka; - -import io.sentry.BaggageHeader; -import io.sentry.DateUtils; -import io.sentry.IScopes; -import io.sentry.ISpan; -import io.sentry.ScopesAdapter; -import io.sentry.SentryLevel; -import io.sentry.SentryTraceHeader; -import io.sentry.SpanDataConvention; -import io.sentry.SpanOptions; -import io.sentry.SpanStatus; -import io.sentry.util.SpanUtils; -import io.sentry.util.TracingUtils; -import java.nio.charset.StandardCharsets; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import org.apache.kafka.clients.producer.ProducerInterceptor; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.apache.kafka.clients.producer.RecordMetadata; -import org.apache.kafka.common.header.Header; -import org.apache.kafka.common.header.Headers; -import org.jetbrains.annotations.ApiStatus; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -@ApiStatus.Experimental -public final class SentryKafkaProducerInterceptor implements ProducerInterceptor { - - public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.producer"; - public static final @NotNull String SENTRY_ENQUEUED_TIME_HEADER = "sentry-task-enqueued-time"; - - private final @NotNull IScopes scopes; - private final @NotNull String traceOrigin; - - public SentryKafkaProducerInterceptor() { - this(ScopesAdapter.getInstance(), TRACE_ORIGIN); - } - - public SentryKafkaProducerInterceptor(final @NotNull IScopes scopes) { - this(scopes, TRACE_ORIGIN); - } - - public SentryKafkaProducerInterceptor( - final @NotNull IScopes scopes, final @NotNull String traceOrigin) { - this.scopes = scopes; - this.traceOrigin = traceOrigin; - } - - @Override - public @NotNull ProducerRecord onSend(final @NotNull ProducerRecord record) { - if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { - return record; - } - - final @Nullable ISpan activeSpan = scopes.getSpan(); - if (activeSpan == null || activeSpan.isNoOp()) { - return record; - } - - @Nullable ISpan span = null; - try { - final @NotNull SpanOptions spanOptions = new SpanOptions(); - spanOptions.setOrigin(traceOrigin); - span = activeSpan.startChild("queue.publish", record.topic(), spanOptions); - if (span.isNoOp()) { - return record; - } - - span.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); - span.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); - - injectHeaders(record.headers(), span); - span.setStatus(SpanStatus.OK); - } catch (Throwable t) { - if (span != null) { - span.setThrowable(t); - span.setStatus(SpanStatus.INTERNAL_ERROR); - } - scopes - .getOptions() - .getLogger() - .log(SentryLevel.ERROR, "Failed to instrument Kafka producer record.", t); - } finally { - if (span != null && !span.isFinished()) { - span.finish(); - } - } - - return record; - } - - @Override - public void onAcknowledgement( - final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {} - - private boolean isIgnored() { - return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); - } - - @Override - public void close() {} - - @Override - public void configure(final @Nullable Map configs) {} - - private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan span) { - final @Nullable List existingBaggageHeaders = - readHeaderValues(headers, BaggageHeader.BAGGAGE_HEADER); - final @Nullable TracingUtils.TracingHeaders tracingHeaders = - TracingUtils.trace(scopes, existingBaggageHeaders, span); - if (tracingHeaders != null) { - final @NotNull SentryTraceHeader sentryTraceHeader = tracingHeaders.getSentryTraceHeader(); - headers.remove(sentryTraceHeader.getName()); - headers.add( - sentryTraceHeader.getName(), - sentryTraceHeader.getValue().getBytes(StandardCharsets.UTF_8)); - - final @Nullable BaggageHeader baggageHeader = tracingHeaders.getBaggageHeader(); - if (baggageHeader != null) { - headers.remove(baggageHeader.getName()); - headers.add( - baggageHeader.getName(), baggageHeader.getValue().getBytes(StandardCharsets.UTF_8)); - } - } - - headers.remove(SENTRY_ENQUEUED_TIME_HEADER); - headers.add( - SENTRY_ENQUEUED_TIME_HEADER, - DateUtils.doubleToBigDecimal(DateUtils.millisToSeconds(System.currentTimeMillis())) - .toString() - .getBytes(StandardCharsets.UTF_8)); - } - - private static @Nullable List readHeaderValues( - final @NotNull Headers headers, final @NotNull String name) { - @Nullable List values = null; - for (final @NotNull Header header : headers.headers(name)) { - final byte @Nullable [] value = header.value(); - if (value != null) { - if (values == null) { - values = new ArrayList<>(); - } - values.add(new String(value, StandardCharsets.UTF_8)); - } - } - return values; - } -} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt index 38c0bf3198..3bd992e8c8 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt @@ -232,7 +232,7 @@ class SentryKafkaConsumerTracingTest { } enqueuedTime?.let { headers.add( - SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, + SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER, it.toByteArray(StandardCharsets.UTF_8), ) } diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt deleted file mode 100644 index 758deed094..0000000000 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt +++ /dev/null @@ -1,225 +0,0 @@ -package io.sentry.kafka - -import io.sentry.BaggageHeader -import io.sentry.IScopes -import io.sentry.ISentryLifecycleToken -import io.sentry.ISpan -import io.sentry.Sentry -import io.sentry.SentryOptions -import io.sentry.SentryTraceHeader -import io.sentry.SentryTracer -import io.sentry.SpanOptions -import io.sentry.SpanStatus -import io.sentry.TransactionContext -import io.sentry.test.initForTest -import java.nio.charset.StandardCharsets -import kotlin.test.AfterTest -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertEquals -import kotlin.test.assertFalse -import kotlin.test.assertNotNull -import kotlin.test.assertSame -import kotlin.test.assertTrue -import org.apache.kafka.clients.producer.ProducerRecord -import org.apache.kafka.common.header.Header -import org.apache.kafka.common.header.Headers -import org.mockito.kotlin.any -import org.mockito.kotlin.eq -import org.mockito.kotlin.mock -import org.mockito.kotlin.verify -import org.mockito.kotlin.whenever - -class SentryKafkaProducerInterceptorTest { - - private lateinit var scopes: IScopes - private lateinit var options: SentryOptions - - @BeforeTest - fun setup() { - initForTest { - it.dsn = "https://key@sentry.io/proj" - it.isEnableQueueTracing = true - it.tracesSampleRate = 1.0 - } - scopes = mock() - options = - SentryOptions().apply { - dsn = "https://key@sentry.io/proj" - isEnableQueueTracing = true - } - whenever(scopes.options).thenReturn(options) - } - - @AfterTest - fun teardown() { - Sentry.close() - } - - private fun createTransaction(): SentryTracer { - val tx = SentryTracer(TransactionContext("tx", "op"), scopes) - whenever(scopes.span).thenReturn(tx) - return tx - } - - @Test - fun `creates queue publish span and injects headers`() { - val tx = createTransaction() - val interceptor = SentryKafkaProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - interceptor.onSend(record) - - assertEquals(1, tx.spans.size) - val span = tx.spans.first() - assertEquals("queue.publish", span.operation) - assertEquals("my-topic", span.description) - assertEquals("kafka", span.data["messaging.system"]) - assertEquals("my-topic", span.data["messaging.destination.name"]) - assertEquals(SentryKafkaProducerInterceptor.TRACE_ORIGIN, span.spanContext.origin) - assertTrue(span.isFinished) - - val sentryTraceHeader = record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER) - assertNotNull(sentryTraceHeader) - - val enqueuedTimeHeader = - record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) - assertNotNull(enqueuedTimeHeader) - val enqueuedTimeRaw = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8) - // Must be written as a plain decimal so cross-SDK consumers (e.g. sentry-python) can - // parse it. String.valueOf(double) would emit scientific notation (e.g. 1.77E9) for - // epoch seconds. - assertFalse( - enqueuedTimeRaw.contains('E') || enqueuedTimeRaw.contains('e'), - "enqueued-time header must not use scientific notation, got: $enqueuedTimeRaw", - ) - assertTrue( - enqueuedTimeRaw.matches(Regex("""^\d+\.\d{6}$""")), - "enqueued-time header must be plain epoch seconds with 6 decimals, got: $enqueuedTimeRaw", - ) - val enqueuedTime = enqueuedTimeRaw.toDouble() - assertTrue(enqueuedTime > 0) - } - - @Test - fun `preserves pre-existing third-party baggage header entries`() { - val tx = createTransaction() - val interceptor = SentryKafkaProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - record - .headers() - .add( - BaggageHeader.BAGGAGE_HEADER, - "othervendor=someValue,another=thing".toByteArray(StandardCharsets.UTF_8), - ) - - interceptor.onSend(record) - - val baggageHeaders = record.headers().headers(BaggageHeader.BAGGAGE_HEADER).toList() - assertEquals(1, baggageHeaders.size) - val baggageValue = String(baggageHeaders.first().value(), StandardCharsets.UTF_8) - assertTrue( - baggageValue.contains("othervendor=someValue"), - "expected third-party baggage entry preserved, got: $baggageValue", - ) - assertTrue( - baggageValue.contains("another=thing"), - "expected third-party baggage entry preserved, got: $baggageValue", - ) - assertTrue( - baggageValue.contains("sentry-"), - "expected Sentry baggage entries appended, got: $baggageValue", - ) - } - - @Test - fun `finishes span with error when header injection fails`() { - val activeSpan = mock() - val span = mock() - val headers = mock() - val record = mock>() - val exception = RuntimeException("boom") - whenever(scopes.span).thenReturn(activeSpan) - whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any())) - .thenReturn(span) - whenever(span.isNoOp).thenReturn(false) - whenever(span.isFinished).thenReturn(false) - whenever(span.toSentryTrace()) - .thenReturn(SentryTraceHeader("2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1")) - whenever(span.toBaggageHeader(null)).thenReturn(null) - whenever(record.topic()).thenReturn("my-topic") - whenever(record.headers()).thenReturn(headers) - whenever(headers.headers(BaggageHeader.BAGGAGE_HEADER)).thenReturn(emptyList

()) - whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)).thenThrow(exception) - - val interceptor = SentryKafkaProducerInterceptor(scopes) - - interceptor.onSend(record) - - verify(span).setStatus(SpanStatus.INTERNAL_ERROR) - verify(span).setThrowable(exception) - verify(span).finish() - } - - @Test - fun `does not create span when queue tracing is disabled`() { - val tx = createTransaction() - options.isEnableQueueTracing = false - val interceptor = SentryKafkaProducerInterceptor(scopes) - - interceptor.onSend(ProducerRecord("my-topic", "key", "value")) - - assertEquals(0, tx.spans.size) - } - - @Test - fun `does not create span when trace origin is ignored`() { - val tx = createTransaction() - options.setIgnoredSpanOrigins(listOf(SentryKafkaProducerInterceptor.TRACE_ORIGIN)) - val interceptor = SentryKafkaProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - interceptor.onSend(record) - - assertEquals(0, tx.spans.size) - assertEquals(null, record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) - assertEquals( - null, - record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER), - ) - } - - @Test - fun `returns original record when no active span`() { - whenever(scopes.span).thenReturn(null) - val interceptor = SentryKafkaProducerInterceptor(scopes) - val record = ProducerRecord("my-topic", "key", "value") - - val result = interceptor.onSend(record) - - assertSame(record, result) - } - - @Test - fun `no-arg constructor uses current scopes`() { - val transaction = Sentry.startTransaction("tx", "op") - val record = ProducerRecord("my-topic", "key", "value") - - try { - val token: ISentryLifecycleToken = transaction.makeCurrent() - try { - val interceptor = SentryKafkaProducerInterceptor() - interceptor.onSend(record) - } finally { - token.close() - } - } finally { - transaction.finish() - } - - assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) - assertNotNull( - record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER) - ) - } -} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt new file mode 100644 index 0000000000..90a6bb259b --- /dev/null +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerTest.kt @@ -0,0 +1,345 @@ +package io.sentry.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.ISpan +import io.sentry.Scope +import io.sentry.ScopeCallback +import io.sentry.Sentry +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SentryTracer +import io.sentry.SpanOptions +import io.sentry.SpanStatus +import io.sentry.TransactionContext +import io.sentry.test.initForTest +import java.nio.charset.StandardCharsets +import java.util.concurrent.CompletableFuture +import kotlin.test.AfterTest +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFalse +import kotlin.test.assertNotNull +import kotlin.test.assertSame +import kotlin.test.assertTrue +import org.apache.kafka.clients.producer.Callback +import org.apache.kafka.clients.producer.Producer +import org.apache.kafka.clients.producer.ProducerRecord +import org.apache.kafka.clients.producer.RecordMetadata +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.Headers +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.doAnswer +import org.mockito.kotlin.eq +import org.mockito.kotlin.isNull +import org.mockito.kotlin.mock +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever + +class SentryKafkaProducerTest { + + private lateinit var scopes: IScopes + private lateinit var options: SentryOptions + private lateinit var delegate: Producer + + @BeforeTest + fun setup() { + initForTest { + it.dsn = "https://key@sentry.io/proj" + it.isEnableQueueTracing = true + it.tracesSampleRate = 1.0 + } + scopes = mock() + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + } + whenever(scopes.options).thenReturn(options) + doAnswer { (it.arguments[0] as ScopeCallback).run(Scope(options)) } + .whenever(scopes) + .configureScope(any()) + delegate = mock() + whenever(delegate.send(any(), any())).thenReturn(CompletableFuture.completedFuture(null)) + } + + @AfterTest + fun teardown() { + Sentry.close() + } + + private fun createTransaction(): SentryTracer { + val tx = SentryTracer(TransactionContext("tx", "op"), scopes) + whenever(scopes.span).thenReturn(tx) + return tx + } + + @Test + fun `creates queue publish span and injects headers`() { + val tx = createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + assertEquals(1, tx.spans.size) + val span = tx.spans.first() + assertEquals("queue.publish", span.operation) + assertEquals("my-topic", span.description) + assertEquals("kafka", span.data["messaging.system"]) + assertEquals("my-topic", span.data["messaging.destination.name"]) + assertEquals(SentryKafkaProducer.TRACE_ORIGIN, span.spanContext.origin) + + val sentryTraceHeader = record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER) + assertNotNull(sentryTraceHeader) + + val enqueuedTimeHeader = + record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER) + assertNotNull(enqueuedTimeHeader) + val enqueuedTimeRaw = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8) + // Cross-SDK consumers (e.g. sentry-python) parse this as a plain decimal — must not use + // scientific notation. + assertFalse(enqueuedTimeRaw.contains('E') || enqueuedTimeRaw.contains('e')) + assertTrue(enqueuedTimeRaw.matches(Regex("""^\d+\.\d{6}$"""))) + } + + @Test + fun `delegates send and does not finish span synchronously`() { + val tx = createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + verify(delegate).send(eq(record), any()) + val span = tx.spans.first() + assertFalse(span.isFinished, "span should be open until callback fires") + } + + @Test + fun `finishes span as OK when broker ack callback succeeds`() { + val tx = createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + val captor = argumentCaptor() + verify(delegate).send(eq(record), captor.capture()) + val metadata = RecordMetadata(TopicPartition("my-topic", 0), 0L, 0, 0L, 0, 0) + captor.firstValue.onCompletion(metadata, null) + + val span = tx.spans.first() + assertTrue(span.isFinished) + assertEquals(SpanStatus.OK, span.status) + } + + @Test + fun `finishes span as INTERNAL_ERROR when broker ack callback fails`() { + val tx = createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + val exception = RuntimeException("boom") + + producer.send(record) + + val captor = argumentCaptor() + verify(delegate).send(eq(record), captor.capture()) + captor.firstValue.onCompletion(null, exception) + + val span = tx.spans.first() + assertTrue(span.isFinished) + assertEquals(SpanStatus.INTERNAL_ERROR, span.status) + assertSame(exception, span.throwable) + } + + @Test + fun `forwards user callback after finishing span`() { + createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + val userCallback = mock() + + producer.send(record, userCallback) + + val captor = argumentCaptor() + verify(delegate).send(eq(record), captor.capture()) + val metadata = RecordMetadata(TopicPartition("my-topic", 0), 0L, 0, 0L, 0, 0) + captor.firstValue.onCompletion(metadata, null) + + verify(userCallback).onCompletion(metadata, null) + } + + @Test + fun `finishes span with error when delegate send throws synchronously`() { + val tx = createTransaction() + val exception = RuntimeException("kaboom") + whenever(delegate.send(any(), any())).thenThrow(exception) + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + val thrown = runCatching { producer.send(record) }.exceptionOrNull() + + assertSame(exception, thrown) + val span = tx.spans.first() + assertTrue(span.isFinished) + assertEquals(SpanStatus.INTERNAL_ERROR, span.status) + assertSame(exception, span.throwable) + } + + @Test + fun `delegates send without span when queue tracing is disabled`() { + createTransaction() + options.isEnableQueueTracing = false + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + verify(delegate).send(eq(record), isNull()) + } + + @Test + fun `delegates send without span when trace origin is ignored`() { + val tx = createTransaction() + options.setIgnoredSpanOrigins(listOf(SentryKafkaProducer.TRACE_ORIGIN)) + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + assertEquals(0, tx.spans.size) + verify(delegate).send(eq(record), isNull()) + assertEquals(null, record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + } + + @Test + fun `injects headers but creates no span when no active span`() { + whenever(scopes.span).thenReturn(null) + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + verify(delegate).send(eq(record), isNull()) + // Headers should still be injected from PropagationContext + assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertNotNull(record.headers().lastHeader(BaggageHeader.BAGGAGE_HEADER)) + assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER)) + } + + @Test + fun `preserves pre-existing third-party baggage header entries`() { + createTransaction() + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + record + .headers() + .add( + BaggageHeader.BAGGAGE_HEADER, + "othervendor=someValue,another=thing".toByteArray(StandardCharsets.UTF_8), + ) + + producer.send(record) + + val baggageHeaders = record.headers().headers(BaggageHeader.BAGGAGE_HEADER).toList() + assertEquals(1, baggageHeaders.size) + val baggageValue = String(baggageHeaders.first().value(), StandardCharsets.UTF_8) + assertTrue(baggageValue.contains("othervendor=someValue")) + assertTrue(baggageValue.contains("another=thing")) + assertTrue(baggageValue.contains("sentry-")) + } + + @Test + fun `header injection failure does not prevent send`() { + val activeSpan = mock() + val span = mock() + val headers = mock() + val record = mock>() + whenever(scopes.span).thenReturn(activeSpan) + whenever(activeSpan.startChild(eq("queue.publish"), eq("my-topic"), any())) + .thenReturn(span) + whenever(span.isNoOp).thenReturn(false) + whenever(span.isFinished).thenReturn(false) + whenever(span.toSentryTrace()) + .thenReturn(SentryTraceHeader("2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1")) + whenever(span.toBaggageHeader(null)).thenReturn(null) + whenever(record.topic()).thenReturn("my-topic") + whenever(record.headers()).thenReturn(headers) + whenever(headers.headers(BaggageHeader.BAGGAGE_HEADER)).thenReturn(emptyList
()) + whenever(headers.remove(SentryTraceHeader.SENTRY_TRACE_HEADER)) + .thenThrow(RuntimeException("boom")) + + val producer = SentryKafkaProducer(delegate, scopes) + producer.send(record) + + // Header injection failed silently; send still proceeds with wrapped callback for span + // lifecycle. + verify(delegate).send(eq(record), any()) + } + + @Test + fun `delegates non-send methods to underlying producer`() { + val producer = SentryKafkaProducer(delegate, scopes) + + producer.flush() + producer.partitionsFor("my-topic") + producer.metrics() + producer.close() + + verify(delegate).flush() + verify(delegate).partitionsFor("my-topic") + verify(delegate).metrics() + verify(delegate).close() + } + + @Test + fun `no-arg constructor uses current scopes`() { + val transaction = Sentry.startTransaction("tx", "op") + val record = ProducerRecord("my-topic", "key", "value") + + try { + val token: ISentryLifecycleToken = transaction.makeCurrent() + try { + val producer = SentryKafkaProducer(delegate) + producer.send(record) + } finally { + token.close() + } + } finally { + transaction.finish() + } + + assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER)) + verify(delegate).send(eq(record), any()) + } + + @Test + fun `getDelegate exposes wrapped producer`() { + val producer = SentryKafkaProducer(delegate, scopes) + assertSame(delegate, producer.delegate) + } + + @Test + fun `wraps callback even when child span is no-op`() { + val tx = createTransaction() + // Set max spans to 1 so the child span is no-op (over limit) + options.maxSpans = 0 + val producer = SentryKafkaProducer(delegate, scopes) + val record = ProducerRecord("my-topic", "key", "value") + + producer.send(record) + + // Callback is still wrapped (no-op span finish is harmless) + verify(delegate).send(eq(record), any()) + // Headers should still be injected from PropagationContext + assertNotNull(record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertNotNull(record.headers().lastHeader(BaggageHeader.BAGGAGE_HEADER)) + assertNotNull(record.headers().lastHeader(SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER)) + } +} diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index da89145cfe..cc819ac0db 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -4,7 +4,7 @@ import io.sentry.ITransaction; import io.sentry.Sentry; import io.sentry.kafka.SentryKafkaConsumerTracing; -import io.sentry.kafka.SentryKafkaProducerInterceptor; +import io.sentry.kafka.SentryKafkaProducer; import java.time.Duration; import java.util.Collections; import java.util.Properties; @@ -16,6 +16,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; @@ -30,11 +31,19 @@ private KafkaShowcase() {} public static void runKafkaWithSentryTracing(final String bootstrapServers) { final CountDownLatch consumedLatch = new CountDownLatch(1); final Thread consumerThread = startConsumerWithSentryTracing(bootstrapServers, consumedLatch); - final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers); + final Properties producerProperties = createProducerProperties(bootstrapServers); final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); try (ISentryLifecycleToken ignored = transaction.makeCurrent()) { - try (KafkaProducer producer = new KafkaProducer<>(producerProperties)) { + // 1. Create the raw Kafka producer as you normally would. + final KafkaProducer rawProducer = new KafkaProducer<>(producerProperties); + + // 2. >>> Sentry instrumentation <<< + // Wrap it in SentryKafkaProducer so every send is captured as a + // `queue.publish` span that closes when the broker ack callback fires. + final Producer producer = new SentryKafkaProducer<>(rawProducer); + + try (producer) { Thread.sleep(500); producer.send(new ProducerRecord<>(TOPIC, "sentry-kafka sample message")).get(); } catch (InterruptedException e) { @@ -59,7 +68,7 @@ public static void runKafkaWithSentryTracing(final String bootstrapServers) { } } - public static Properties createProducerPropertiesWithSentry(final String bootstrapServers) { + public static Properties createProducerProperties(final String bootstrapServers) { final Properties producerProperties = new Properties(); producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); producerProperties.put( @@ -67,10 +76,6 @@ public static Properties createProducerPropertiesWithSentry(final String bootstr producerProperties.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - // Required for Sentry queue tracing in kafka-clients producer setup. - producerProperties.put( - ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaProducerInterceptor.class.getName()); - // Optional tuning for sample stability in CI/local runs. producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 2000); producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt index 61c298f86c..6ede83510e 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry-noagent/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt @@ -10,7 +10,7 @@ import org.junit.Before * * The Sentry Kafka auto-configuration (`SentryKafkaQueueConfiguration`) is intentionally suppressed * when `io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider` is on the classpath, so - * the Sentry `SentryKafkaProducerInterceptor` and `SentryKafkaRecordInterceptor` must not be wired. + * the Sentry `SentryKafkaProducer` and `SentryKafkaRecordInterceptor` must not be wired. * * These tests produce a Kafka message end-to-end and assert that Sentry-style `queue.publish` / * `queue.process` spans/transactions are *not* emitted. Any Kafka telemetry in OTel mode must come diff --git a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt index f55303541b..d150fe70cd 100644 --- a/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt +++ b/sentry-samples/sentry-samples-spring-boot-jakarta-opentelemetry/src/test/kotlin/io/sentry/systemtest/KafkaOtelCoexistenceSystemTest.kt @@ -10,7 +10,7 @@ import org.junit.Before * * The Sentry Kafka auto-configuration (`SentryKafkaQueueConfiguration`) is intentionally suppressed * when `io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider` is on the classpath, so - * the Sentry `SentryKafkaProducerInterceptor` and `SentryKafkaRecordInterceptor` must not be wired. + * the Sentry `SentryKafkaProducer` and `SentryKafkaRecordInterceptor` must not be wired. * * These tests produce a Kafka message end-to-end and assert that Sentry-style `queue.publish` / * `queue.process` spans/transactions are *not* emitted. Any Kafka telemetry in OTel mode must come diff --git a/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java b/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java index 688153046f..b678abc716 100644 --- a/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java +++ b/sentry-spring-boot-jakarta/src/main/java/io/sentry/spring/boot/jakarta/SentryAutoConfiguration.java @@ -252,7 +252,7 @@ static class SentryCacheConfiguration { @ConditionalOnClass( name = { "org.springframework.kafka.core.KafkaTemplate", - "io.sentry.kafka.SentryKafkaProducerInterceptor" + "io.sentry.kafka.SentryKafkaProducer" }) @ConditionalOnProperty(name = "sentry.enable-queue-tracing", havingValue = "true") @ConditionalOnMissingClass("io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider") diff --git a/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt b/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt index c3a4c12e35..5b010891a1 100644 --- a/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt +++ b/sentry-spring-boot-jakarta/src/test/kotlin/io/sentry/spring/boot/jakarta/SentryKafkaAutoConfigurationTest.kt @@ -1,6 +1,6 @@ package io.sentry.spring.boot.jakarta -import io.sentry.kafka.SentryKafkaProducerInterceptor +import io.sentry.kafka.SentryKafkaProducer import io.sentry.opentelemetry.SentryAutoConfigurationCustomizerProvider import io.sentry.spring.jakarta.kafka.SentryKafkaConsumerBeanPostProcessor import io.sentry.spring.jakarta.kafka.SentryKafkaProducerBeanPostProcessor @@ -34,7 +34,7 @@ class SentryKafkaAutoConfigurationTest { private val noSentryKafkaClassLoader = FilteredClassLoader( - SentryKafkaProducerInterceptor::class.java, + SentryKafkaProducer::class.java, SentryAutoConfigurationCustomizerProvider::class.java, ) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java index 4ce6a7c5ed..ed3faba853 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessor.java @@ -1,84 +1,71 @@ package io.sentry.spring.jakarta.kafka; import io.sentry.ScopesAdapter; -import io.sentry.SentryLevel; -import io.sentry.kafka.SentryKafkaProducerInterceptor; -import java.lang.reflect.Field; -import org.apache.kafka.clients.producer.ProducerInterceptor; +import io.sentry.kafka.SentryKafkaProducer; +import org.apache.kafka.clients.producer.Producer; import org.jetbrains.annotations.ApiStatus; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.springframework.beans.BeansException; import org.springframework.beans.factory.config.BeanPostProcessor; import org.springframework.core.Ordered; import org.springframework.core.PriorityOrdered; -import org.springframework.kafka.core.KafkaTemplate; -import org.springframework.kafka.support.CompositeProducerInterceptor; +import org.springframework.kafka.core.ProducerFactory; +import org.springframework.kafka.core.ProducerPostProcessor; /** - * Sets a {@link SentryKafkaProducerInterceptor} on {@link KafkaTemplate} beans via {@link - * KafkaTemplate#setProducerInterceptor(ProducerInterceptor)}. The original bean is not replaced. + * Installs a {@link ProducerPostProcessor} on every {@link ProducerFactory} bean so that each + * {@link Producer} created by Spring Kafka is wrapped in a {@link SentryKafkaProducer}. * - *

If the template already has a {@link ProducerInterceptor}, both are composed using {@link - * CompositeProducerInterceptor}. Reading the existing interceptor requires reflection (no public - * getter in Spring Kafka 3.x); if reflection fails, a warning is logged and only the Sentry - * interceptor is set. + *

The wrapper records a {@code queue.publish} span around each {@code send(...)} that finishes + * when the broker ack callback fires, giving a real producer-send lifecycle span. {@code + * KafkaTemplate} beans are left untouched, so all customer-configured listeners, interceptors and + * observation settings are preserved. + * + *

Idempotent: re-running on the same factory does not register the post-processor twice. + * + *

Note: {@link ProducerFactory#addPostProcessor(ProducerPostProcessor)} is a default method on + * the interface. Custom factories that do not extend {@code DefaultKafkaProducerFactory} and do not + * implement {@code addPostProcessor} will silently no-op. */ @ApiStatus.Internal public final class SentryKafkaProducerBeanPostProcessor implements BeanPostProcessor, PriorityOrdered { @Override - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "rawtypes"}) public @NotNull Object postProcessAfterInitialization( final @NotNull Object bean, final @NotNull String beanName) throws BeansException { - if (bean instanceof KafkaTemplate) { - final @NotNull KafkaTemplate template = (KafkaTemplate) bean; - final @Nullable ProducerInterceptor existing = getExistingInterceptor(template); + if (bean instanceof ProducerFactory) { + final @NotNull ProducerFactory factory = (ProducerFactory) bean; - if (existing instanceof SentryKafkaProducerInterceptor) { - return bean; + for (final Object existing : factory.getPostProcessors()) { + if (existing instanceof SentryProducerPostProcessor) { + return bean; + } } - @SuppressWarnings("rawtypes") - final SentryKafkaProducerInterceptor sentryInterceptor = - new SentryKafkaProducerInterceptor<>( - ScopesAdapter.getInstance(), "auto.queue.spring_jakarta.kafka.producer"); - - if (existing != null) { - @SuppressWarnings("rawtypes") - final CompositeProducerInterceptor composite = - new CompositeProducerInterceptor(sentryInterceptor, existing); - template.setProducerInterceptor(composite); - } else { - template.setProducerInterceptor(sentryInterceptor); - } + factory.addPostProcessor(new SentryProducerPostProcessor<>()); } return bean; } - @SuppressWarnings("unchecked") - private @Nullable ProducerInterceptor getExistingInterceptor( - final @NotNull KafkaTemplate template) { - try { - final @NotNull Field field = KafkaTemplate.class.getDeclaredField("producerInterceptor"); - field.setAccessible(true); - return (ProducerInterceptor) field.get(template); - } catch (NoSuchFieldException | IllegalAccessException e) { - ScopesAdapter.getInstance() - .getOptions() - .getLogger() - .log( - SentryLevel.WARNING, - "Unable to read existing producerInterceptor from KafkaTemplate via reflection. " - + "If you had a custom ProducerInterceptor, it may be overwritten by Sentry's interceptor.", - e); - return null; - } - } - @Override public int getOrder() { return Ordered.LOWEST_PRECEDENCE; } + + /** + * Marker {@link ProducerPostProcessor} that wraps the freshly created Kafka {@link Producer} in a + * {@link SentryKafkaProducer}, unless it is already wrapped. + */ + static final class SentryProducerPostProcessor implements ProducerPostProcessor { + @Override + public @NotNull Producer apply(final @NotNull Producer producer) { + if (producer instanceof SentryKafkaProducer) { + return producer; + } + return new SentryKafkaProducer<>( + producer, ScopesAdapter.getInstance(), "auto.queue.spring_jakarta.kafka.producer"); + } + } } diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index d2302dca57..a6b5247fe7 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -10,7 +10,7 @@ import io.sentry.SpanStatus; import io.sentry.TransactionContext; import io.sentry.TransactionOptions; -import io.sentry.kafka.SentryKafkaProducerInterceptor; +import io.sentry.kafka.SentryKafkaProducer; import io.sentry.util.SpanUtils; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; @@ -188,7 +188,7 @@ private boolean isIgnored() { } final @Nullable String enqueuedTimeStr = - headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); + headerValue(record, SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER); if (enqueuedTimeStr != null) { try { final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt index f0247178f2..9d36e9274c 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaProducerBeanPostProcessorTest.kt @@ -1,50 +1,54 @@ package io.sentry.spring.jakarta.kafka -import io.sentry.kafka.SentryKafkaProducerInterceptor +import io.sentry.kafka.SentryKafkaProducer import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertSame import kotlin.test.assertTrue -import org.apache.kafka.clients.producer.ProducerInterceptor +import org.apache.kafka.clients.producer.Producer +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor import org.mockito.kotlin.mock -import org.springframework.kafka.core.KafkaTemplate +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever +import org.springframework.kafka.core.DefaultKafkaProducerFactory import org.springframework.kafka.core.ProducerFactory -import org.springframework.kafka.support.CompositeProducerInterceptor +import org.springframework.kafka.core.ProducerPostProcessor class SentryKafkaProducerBeanPostProcessorTest { - private fun readInterceptor(template: KafkaTemplate<*, *>): Any? { - val field = KafkaTemplate::class.java.getDeclaredField("producerInterceptor") - field.isAccessible = true - return field.get(template) - } - @Test - fun `sets SentryKafkaProducerInterceptor on KafkaTemplate`() { - val template = KafkaTemplate(mock>()) + fun `registers Sentry post-processor on ProducerFactory`() { + val factory = mock>() + whenever(factory.postProcessors).thenReturn(emptyList()) val processor = SentryKafkaProducerBeanPostProcessor() - processor.postProcessAfterInitialization(template, "kafkaTemplate") + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") - assertTrue(readInterceptor(template) is SentryKafkaProducerInterceptor<*, *>) + val captor = argumentCaptor>() + verify(factory).addPostProcessor(captor.capture()) + assertTrue( + captor.firstValue is SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor<*, *> + ) } @Test - fun `does not double-wrap when SentryKafkaProducerInterceptor already set`() { - val template = KafkaTemplate(mock>()) + fun `is idempotent when Sentry post-processor is already registered`() { + val factory = mock>() + val existing = + SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + whenever(factory.postProcessors).thenReturn(listOf(existing)) val processor = SentryKafkaProducerBeanPostProcessor() - processor.postProcessAfterInitialization(template, "kafkaTemplate") - val firstInterceptor = readInterceptor(template) + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") - processor.postProcessAfterInitialization(template, "kafkaTemplate") - val secondInterceptor = readInterceptor(template) - - assertSame(firstInterceptor, secondInterceptor) + verify(factory, never()).addPostProcessor(any()) } @Test - fun `does not modify non-KafkaTemplate beans`() { - val someBean = "not a kafka template" + fun `does not modify non-ProducerFactory beans`() { + val someBean = "not a producer factory" val processor = SentryKafkaProducerBeanPostProcessor() val result = processor.postProcessAfterInitialization(someBean, "someBean") @@ -54,26 +58,50 @@ class SentryKafkaProducerBeanPostProcessorTest { @Test fun `returns the same bean instance`() { - val template = KafkaTemplate(mock>()) + val factory = mock>() + whenever(factory.postProcessors).thenReturn(emptyList()) val processor = SentryKafkaProducerBeanPostProcessor() - val result = processor.postProcessAfterInitialization(template, "kafkaTemplate") + val result = processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") - assertSame(template, result, "BPP should return the same bean, not a replacement") + assertSame(factory, result, "BPP must return the same bean, not a replacement") } @Test - fun `composes with existing customer interceptor using CompositeProducerInterceptor`() { - val template = KafkaTemplate(mock>()) - val customerInterceptor = mock>() - template.setProducerInterceptor(customerInterceptor) + fun `registered post-processor wraps producers in SentryKafkaProducer`() { + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + val raw = mock>() + + val wrapped = pp.apply(raw) + + assertTrue(wrapped is SentryKafkaProducer<*, *>) + assertSame(raw, (wrapped as SentryKafkaProducer).delegate) + } + @Test + fun `registered post-processor does not double-wrap`() { + val pp = SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor() + val raw = mock>() + val alreadyWrapped = SentryKafkaProducer(raw) + + val result = pp.apply(alreadyWrapped) + + assertSame(alreadyWrapped, result) + } + + @Test + fun `integrates with DefaultKafkaProducerFactory addPostProcessor contract`() { + // Sanity check against the real Spring Kafka API surface — DefaultKafkaProducerFactory + // honors addPostProcessor and exposes it via getPostProcessors(). + val factory = DefaultKafkaProducerFactory(emptyMap()) val processor = SentryKafkaProducerBeanPostProcessor() - processor.postProcessAfterInitialization(template, "kafkaTemplate") + processor.postProcessAfterInitialization(factory, "kafkaProducerFactory") + + assertEquals(1, factory.postProcessors.size) assertTrue( - readInterceptor(template) is CompositeProducerInterceptor<*, *>, - "Should use CompositeProducerInterceptor when existing interceptor is present", + factory.postProcessors.first() + is SentryKafkaProducerBeanPostProcessor.SentryProducerPostProcessor<*, *> ) } } diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index c17025285c..c08756da69 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -9,7 +9,7 @@ import io.sentry.SentryTraceHeader import io.sentry.SentryTracer import io.sentry.SpanDataConvention import io.sentry.TransactionContext -import io.sentry.kafka.SentryKafkaProducerInterceptor +import io.sentry.kafka.SentryKafkaProducer import io.sentry.test.initForTest import java.nio.ByteBuffer import java.nio.charset.StandardCharsets @@ -112,7 +112,7 @@ class SentryKafkaRecordInterceptorTest { } enqueuedTime?.let { headers.add( - SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, + SentryKafkaProducer.SENTRY_ENQUEUED_TIME_HEADER, it.toByteArray(StandardCharsets.UTF_8), ) }