From 1f848a7d6f0e17657a39f9faed6b94bcf2989ff1 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 20 Apr 2026 13:06:26 +0200 Subject: [PATCH 1/5] feat(kafka): [Queue Instrumentation 17] Add manual consumer tracing helper Add an experimental helper for wrapping raw Kafka consumer record processing in queue.process transactions. This exposes Kafka consumer tracing outside interceptor-based integrations. Capture messaging metadata and distributed tracing context in the helper so future queue instrumentation can reuse the same behavior. Co-Authored-By: Claude --- sentry-kafka/api/sentry-kafka.api | 6 + .../kafka/SentryKafkaConsumerTracing.java | 255 ++++++++++++++++++ .../kafka/SentryKafkaConsumerTracingTest.kt | 235 ++++++++++++++++ 3 files changed, 496 insertions(+) create mode 100644 sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java create mode 100644 sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index 6fe7f41222..20189ae897 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -13,6 +13,12 @@ public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/k public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords; } +public final class io/sentry/kafka/SentryKafkaConsumerTracing { + public static final field TRACE_ORIGIN Ljava/lang/String; + public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Runnable;)V + public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object; +} + public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { public static final field SENTRY_ENQUEUED_TIME_HEADER Ljava/lang/String; public static final field TRACE_ORIGIN Ljava/lang/String; diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java new file mode 100644 index 0000000000..1c85634b10 --- /dev/null +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java @@ -0,0 +1,255 @@ +package io.sentry.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.DateUtils; +import io.sentry.IScopes; +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.ScopesAdapter; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import io.sentry.util.SpanUtils; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** Helper methods for instrumenting raw Kafka consumer record processing. */ +@ApiStatus.Experimental +public final class SentryKafkaConsumerTracing { + + public static final @NotNull String TRACE_ORIGIN = "manual.queue.kafka.consumer"; + + private static final @NotNull String CREATOR = "SentryKafkaConsumerTracing"; + private static final @NotNull String DELIVERY_ATTEMPT_HEADER = "kafka_deliveryAttempt"; + private static final @NotNull String MESSAGE_ID_HEADER = "messaging.message.id"; + + private final @NotNull IScopes scopes; + + SentryKafkaConsumerTracing(final @NotNull IScopes scopes) { + this.scopes = scopes; + } + + /** + * Runs the provided {@link Callable} with a Kafka consumer processing transaction for the given + * record. + * + * @param record the Kafka record being processed + * @param callable the processing callback + * @return the return value of the callback + * @param the Kafka record key type + * @param the Kafka record value type + * @param the callback return type + */ + public static U withTracing( + final @NotNull ConsumerRecord record, final @NotNull Callable callable) + throws Exception { + return new SentryKafkaConsumerTracing(ScopesAdapter.getInstance()) + .withTracingImpl(record, callable); + } + + /** + * Runs the provided {@link Runnable} with a Kafka consumer processing transaction for the given + * record. + * + * @param record the Kafka record being processed + * @param runnable the processing callback + * @param the Kafka record key type + * @param the Kafka record value type + */ + public static void withTracing( + final @NotNull ConsumerRecord record, final @NotNull Runnable runnable) { + new SentryKafkaConsumerTracing(ScopesAdapter.getInstance()).withTracingImpl(record, runnable); + } + + U withTracingImpl( + final @NotNull ConsumerRecord record, final @NotNull Callable callable) + throws Exception { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { + return callable.call(); + } + + final @NotNull IScopes forkedScopes; + final @NotNull ISentryLifecycleToken lifecycleToken; + try { + forkedScopes = scopes.forkedRootScopes(CREATOR); + lifecycleToken = forkedScopes.makeCurrent(); + } catch (Throwable ignored) { + return callable.call(); + } + + try (final @NotNull ISentryLifecycleToken ignored = lifecycleToken) { + final @Nullable ITransaction transaction = startTransaction(forkedScopes, record); + boolean didError = false; + @Nullable Throwable callbackThrowable = null; + + try { + return callable.call(); + } catch (Throwable t) { + didError = true; + callbackThrowable = t; + throw t; + } finally { + finishTransaction( + transaction, didError ? SpanStatus.INTERNAL_ERROR : SpanStatus.OK, callbackThrowable); + } + } + } + + void withTracingImpl( + final @NotNull ConsumerRecord record, final @NotNull Runnable runnable) { + try { + withTracingImpl( + record, + () -> { + runnable.run(); + return null; + }); + } catch (Throwable t) { + throwUnchecked(t); + } + } + + @SuppressWarnings("unchecked") + private static void throwUnchecked(final @NotNull Throwable throwable) + throws T { + throw (T) throwable; + } + + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN); + } + + private @Nullable ITransaction startTransaction( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + try { + final @Nullable TransactionContext continued = continueTrace(forkedScopes, record); + if (!forkedScopes.getOptions().isTracingEnabled()) { + return null; + } + + final @NotNull TransactionContext txContext = + continued != null ? continued : new TransactionContext("queue.process", "queue.process"); + txContext.setName("queue.process"); + txContext.setOperation("queue.process"); + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(true); + + final @NotNull ITransaction transaction = forkedScopes.startTransaction(txContext, txOptions); + if (transaction.isNoOp()) { + return null; + } + + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + final @Nullable String messageId = headerValue(record, MESSAGE_ID_HEADER); + if (messageId != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); + } + + final int bodySize = record.serializedValueSize(); + if (bodySize >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize); + } + + final @Nullable Integer retryCount = retryCount(record); + if (retryCount != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); + } + + final @Nullable Long receiveLatency = receiveLatency(record); + if (receiveLatency != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, receiveLatency); + } + + return transaction; + } catch (Throwable ignored) { + return null; + } + } + + private void finishTransaction( + final @Nullable ITransaction transaction, + final @NotNull SpanStatus status, + final @Nullable Throwable throwable) { + if (transaction == null || transaction.isNoOp()) { + return; + } + + try { + transaction.setStatus(status); + if (throwable != null) { + transaction.setThrowable(throwable); + } + transaction.finish(); + } catch (Throwable ignored) { + // Instrumentation must never break customer processing. + } + } + + private @Nullable TransactionContext continueTrace( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); + final @Nullable List baggageHeaders = + baggage != null ? Collections.singletonList(baggage) : null; + return forkedScopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable Integer retryCount(final @NotNull ConsumerRecord record) { + final @Nullable Header header = record.headers().lastHeader(DELIVERY_ATTEMPT_HEADER); + if (header == null) { + return null; + } + + final byte[] value = header.value(); + if (value == null || value.length != Integer.BYTES) { + return null; + } + + final int attempt = ByteBuffer.wrap(value).getInt(); + if (attempt <= 0) { + return null; + } + + return attempt - 1; + } + + private @Nullable Long receiveLatency(final @NotNull ConsumerRecord record) { + final @Nullable String enqueuedTimeStr = + headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); + if (enqueuedTimeStr == null) { + return null; + } + + try { + final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); + final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis()); + final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000); + return latencyMs >= 0 ? latencyMs : null; + } catch (NumberFormatException ignored) { + return null; + } + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } +} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt new file mode 100644 index 0000000000..29283102fa --- /dev/null +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt @@ -0,0 +1,235 @@ +package io.sentry.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.ITransaction +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SpanDataConvention +import io.sentry.SpanStatus +import io.sentry.TransactionContext +import io.sentry.TransactionOptions +import java.io.IOException +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.Optional +import java.util.concurrent.Callable +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.apache.kafka.common.record.TimestampType +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.check +import org.mockito.kotlin.eq +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever + +class SentryKafkaConsumerTracingTest { + + private lateinit var scopes: IScopes + private lateinit var forkedScopes: IScopes + private lateinit var options: SentryOptions + private lateinit var lifecycleToken: ISentryLifecycleToken + private lateinit var transaction: ITransaction + private lateinit var tracing: SentryKafkaConsumerTracing + + @BeforeTest + fun setup() { + scopes = mock() + forkedScopes = mock() + lifecycleToken = mock() + transaction = mock() + tracing = SentryKafkaConsumerTracing(scopes) + + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + tracesSampleRate = 1.0 + } + + whenever(scopes.options).thenReturn(options) + whenever(scopes.forkedRootScopes(any())).thenReturn(forkedScopes) + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + whenever(forkedScopes.startTransaction(any(), any())) + .thenReturn(transaction) + whenever(transaction.isNoOp).thenReturn(false) + } + + @Test + fun `withTracing creates queue process transaction with record metadata`() { + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val baggageValue = "sentry-sample_rate=1" + val record = + createRecord( + sentryTrace = sentryTraceValue, + baggage = baggageValue, + messageId = "message-123", + deliveryAttempt = 3, + enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString(), + serializedValueSize = 5, + ) + + val txContextCaptor = argumentCaptor() + val txOptionsCaptor = argumentCaptor() + + val result = tracing.withTracingImpl(record, Callable { "done" }) + + assertEquals("done", result) + verify(scopes).forkedRootScopes("SentryKafkaConsumerTracing") + verify(forkedScopes).makeCurrent() + verify(forkedScopes).continueTrace(eq(sentryTraceValue), eq(listOf(baggageValue))) + verify(forkedScopes).startTransaction(txContextCaptor.capture(), txOptionsCaptor.capture()) + + assertEquals("queue.process", txContextCaptor.firstValue.name) + assertEquals("queue.process", txContextCaptor.firstValue.operation) + assertEquals(SentryKafkaConsumerTracing.TRACE_ORIGIN, txOptionsCaptor.firstValue.origin) + assertTrue(txOptionsCaptor.firstValue.isBindToScope) + + verify(transaction).setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka") + verify(transaction).setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, "my-topic") + verify(transaction).setData(SpanDataConvention.MESSAGING_MESSAGE_ID, "message-123") + verify(transaction).setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, 5) + verify(transaction).setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, 2) + verify(transaction) + .setData( + eq(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY), + check { assertTrue(it >= 0) }, + ) + verify(transaction).setStatus(SpanStatus.OK) + verify(transaction).finish() + verify(lifecycleToken).close() + } + + @Test + fun `withTracing skips scope forking when queue tracing is disabled`() { + options.isEnableQueueTracing = false + val record = createRecord() + + val result = tracing.withTracingImpl(record, Callable { "done" }) + + assertEquals("done", result) + verify(scopes, never()).forkedRootScopes(any()) + } + + @Test + fun `withTracing skips scope forking when origin is ignored`() { + options.setIgnoredSpanOrigins(listOf(SentryKafkaConsumerTracing.TRACE_ORIGIN)) + val record = createRecord() + + val result = tracing.withTracingImpl(record, Callable { "done" }) + + assertEquals("done", result) + verify(scopes, never()).forkedRootScopes(any()) + } + + @Test + fun `withTracing marks transaction as error when callback throws`() { + val record = createRecord() + val exception = RuntimeException("boom") + + val thrown = + assertFailsWith { + tracing.withTracingImpl(record, Callable { throw exception }) + } + + assertEquals(exception, thrown) + verify(transaction).setStatus(SpanStatus.INTERNAL_ERROR) + verify(transaction).setThrowable(exception) + verify(transaction).finish() + verify(lifecycleToken).close() + } + + @Test + fun `withTracing falls back to direct callback execution when instrumentation setup fails`() { + whenever(scopes.forkedRootScopes(any())) + .thenThrow(RuntimeException("broken instrumentation")) + val record = createRecord() + + val result = tracing.withTracingImpl(record, Callable { "done" }) + + assertEquals("done", result) + verify(forkedScopes, never()).makeCurrent() + verify(transaction, never()).finish() + } + + @Test + fun `withTracing runnable overload executes callback`() { + val record = createRecord() + val didRun = AtomicBoolean(false) + + tracing.withTracingImpl(record, Runnable { didRun.set(true) }) + + assertTrue(didRun.get()) + verify(transaction).setStatus(SpanStatus.OK) + verify(transaction).finish() + } + + @Test + fun `withTracing runnable overload preserves original throwable`() { + val record = createRecord() + val exception = IOException("boom") + + val thrown = + assertFailsWith { tracing.withTracingImpl(record, Runnable { throw exception }) } + + assertEquals(exception, thrown) + verify(transaction).setStatus(SpanStatus.INTERNAL_ERROR) + verify(transaction).setThrowable(exception) + verify(transaction).finish() + } + + private fun createRecord( + topic: String = "my-topic", + sentryTrace: String? = null, + baggage: String? = null, + messageId: String? = null, + deliveryAttempt: Int? = null, + enqueuedTime: String? = null, + serializedValueSize: Int = -1, + ): ConsumerRecord { + val headers = RecordHeaders() + sentryTrace?.let { + headers.add(SentryTraceHeader.SENTRY_TRACE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggage?.let { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + messageId?.let { + headers.add(SpanDataConvention.MESSAGING_MESSAGE_ID, it.toByteArray(StandardCharsets.UTF_8)) + } + deliveryAttempt?.let { + headers.add("kafka_deliveryAttempt", ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array()) + } + enqueuedTime?.let { + headers.add( + SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, + it.toByteArray(StandardCharsets.UTF_8), + ) + } + + return ConsumerRecord( + topic, + 0, + 0L, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + 3, + serializedValueSize, + "key", + "value", + headers, + Optional.empty(), + ) + } +} From c52b8ad9b2ebda11433df0e69e457ad3b0441205 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 20 Apr 2026 14:29:20 +0200 Subject: [PATCH 2/5] ref(kafka): Remove raw consumer interceptor Remove the raw Kafka consumer interceptor from sentry-kafka and update the console sample to use the manual consumer tracing helper instead. Keep producer tracing on the interceptor path and move consumer tracing to explicit record processing. Co-Authored-By: Claude --- sentry-kafka/api/sentry-kafka.api | 10 -- .../kafka/SentryKafkaConsumerInterceptor.java | 100 ------------------ .../SentryKafkaConsumerInterceptorTest.kt | 100 ------------------ .../java/io/sentry/samples/console/Main.java | 4 +- .../samples/console/kafka/KafkaShowcase.java | 27 +++-- 5 files changed, 14 insertions(+), 227 deletions(-) delete mode 100644 sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java delete mode 100644 sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index 20189ae897..ce5b0efb66 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -3,16 +3,6 @@ public final class io/sentry/kafka/BuildConfig { public static final field VERSION_NAME Ljava/lang/String; } -public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor { - public static final field TRACE_ORIGIN Ljava/lang/String; - public fun ()V - public fun (Lio/sentry/IScopes;)V - public fun close ()V - public fun configure (Ljava/util/Map;)V - public fun onCommit (Ljava/util/Map;)V - public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords; -} - public final class io/sentry/kafka/SentryKafkaConsumerTracing { public static final field TRACE_ORIGIN Ljava/lang/String; public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Runnable;)V diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java deleted file mode 100644 index a37d01cd90..0000000000 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.sentry.kafka; - -import io.sentry.BaggageHeader; -import io.sentry.IScopes; -import io.sentry.ITransaction; -import io.sentry.ScopesAdapter; -import io.sentry.SentryTraceHeader; -import io.sentry.SpanDataConvention; -import io.sentry.SpanStatus; -import io.sentry.TransactionContext; -import io.sentry.TransactionOptions; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerInterceptor; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.Header; -import org.jetbrains.annotations.ApiStatus; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -@ApiStatus.Internal -public final class SentryKafkaConsumerInterceptor implements ConsumerInterceptor { - - public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.consumer"; - - private final @NotNull IScopes scopes; - - public SentryKafkaConsumerInterceptor() { - this(ScopesAdapter.getInstance()); - } - - public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) { - this.scopes = scopes; - } - - @Override - public @NotNull ConsumerRecords onConsume(final @NotNull ConsumerRecords records) { - if (!scopes.getOptions().isEnableQueueTracing() || records.isEmpty()) { - return records; - } - - final @NotNull ConsumerRecord firstRecord = records.iterator().next(); - - try { - final @Nullable TransactionContext continued = continueTrace(firstRecord); - final @NotNull TransactionContext txContext = - continued != null ? continued : new TransactionContext("queue.receive", "queue.receive"); - txContext.setName("queue.receive"); - txContext.setOperation("queue.receive"); - - final @NotNull TransactionOptions txOptions = new TransactionOptions(); - txOptions.setOrigin(TRACE_ORIGIN); - txOptions.setBindToScope(false); - - final @NotNull ITransaction transaction = scopes.startTransaction(txContext, txOptions); - if (!transaction.isNoOp()) { - transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); - transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, firstRecord.topic()); - transaction.setData("messaging.batch.message.count", records.count()); - transaction.setStatus(SpanStatus.OK); - transaction.finish(); - } - } catch (Throwable ignored) { - // Instrumentation must never break the customer's Kafka poll loop. - } - - return records; - } - - @Override - public void onCommit(final @NotNull Map offsets) {} - - @Override - public void close() {} - - @Override - public void configure(final @Nullable Map configs) {} - - private @Nullable TransactionContext continueTrace(final @NotNull ConsumerRecord record) { - final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); - final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); - final @Nullable List baggageHeaders = - baggage != null ? Collections.singletonList(baggage) : null; - return scopes.continueTrace(sentryTrace, baggageHeaders); - } - - private @Nullable String headerValue( - final @NotNull ConsumerRecord record, final @NotNull String headerName) { - final @Nullable Header header = record.headers().lastHeader(headerName); - if (header == null || header.value() == null) { - return null; - } - return new String(header.value(), StandardCharsets.UTF_8); - } -} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt deleted file mode 100644 index f6786bc8f5..0000000000 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt +++ /dev/null @@ -1,100 +0,0 @@ -package io.sentry.kafka - -import io.sentry.IScopes -import io.sentry.ITransaction -import io.sentry.Sentry -import io.sentry.SentryOptions -import io.sentry.TransactionContext -import io.sentry.TransactionOptions -import io.sentry.test.initForTest -import kotlin.test.AfterTest -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertSame -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.ConsumerRecords -import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.TopicPartition -import org.mockito.kotlin.any -import org.mockito.kotlin.mock -import org.mockito.kotlin.never -import org.mockito.kotlin.verify -import org.mockito.kotlin.whenever - -class SentryKafkaConsumerInterceptorTest { - - @BeforeTest - fun setup() { - initForTest { - it.dsn = "https://key@sentry.io/proj" - it.isEnableQueueTracing = true - it.tracesSampleRate = 1.0 - } - } - - @AfterTest - fun teardown() { - Sentry.close() - } - - @Test - fun `does nothing when queue tracing is disabled`() { - val scopes = mock() - val options = SentryOptions().apply { isEnableQueueTracing = false } - whenever(scopes.options).thenReturn(options) - - val interceptor = SentryKafkaConsumerInterceptor(scopes) - val records = singleRecordBatch() - - val result = interceptor.onConsume(records) - - assertSame(records, result) - verify(scopes, never()).startTransaction(any(), any()) - } - - @Test - fun `starts and finishes queue receive transaction for consumed batch`() { - val scopes = mock() - val options = SentryOptions().apply { isEnableQueueTracing = true } - val transaction = mock() - - whenever(scopes.options).thenReturn(options) - whenever(scopes.continueTrace(any(), any())).thenReturn(null) - whenever(scopes.startTransaction(any(), any())) - .thenReturn(transaction) - whenever(transaction.isNoOp).thenReturn(false) - - val interceptor = SentryKafkaConsumerInterceptor(scopes) - - interceptor.onConsume(singleRecordBatch()) - - verify(scopes).startTransaction(any(), any()) - verify(transaction).setData("messaging.system", "kafka") - verify(transaction).setData("messaging.destination.name", "my-topic") - verify(transaction).setData("messaging.batch.message.count", 1) - verify(transaction).finish() - } - - @Test - fun `commit callback is no-op`() { - val interceptor = SentryKafkaConsumerInterceptor(mock()) - - interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1))) - } - - @Test - fun `no-arg constructor uses current scopes`() { - val interceptor = SentryKafkaConsumerInterceptor() - val records = singleRecordBatch() - - val result = interceptor.onConsume(records) - - assertSame(records, result) - } - - private fun singleRecordBatch(): ConsumerRecords { - val partition = TopicPartition("my-topic", 0) - val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") - return ConsumerRecords(mapOf(partition to listOf(record))) - } -} diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index 4fee0a8374..2a45ef6902 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -184,11 +184,11 @@ public static void main(String[] args) throws InterruptedException { // cache.remove, and cache.flush spans as children of the active transaction. demonstrateCacheTracing(); - // Kafka queue tracing with kafka-clients interceptors. + // Kafka queue tracing with the kafka-clients producer interceptor and manual consumer tracing. // // Enable with: SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092 if (kafkaEnabled) { - KafkaShowcase.runKafkaWithSentryInterceptors(kafkaBootstrapServers); + KafkaShowcase.runKafkaWithSentryTracing(kafkaBootstrapServers); } // Performance feature diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index 9c84b6f004..b00b6e83d7 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -3,7 +3,7 @@ import io.sentry.ISentryLifecycleToken; import io.sentry.ITransaction; import io.sentry.Sentry; -import io.sentry.kafka.SentryKafkaConsumerInterceptor; +import io.sentry.kafka.SentryKafkaConsumerTracing; import io.sentry.kafka.SentryKafkaProducerInterceptor; import java.time.Duration; import java.util.Collections; @@ -12,6 +12,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -26,10 +27,9 @@ public final class KafkaShowcase { private KafkaShowcase() {} - public static void runKafkaWithSentryInterceptors(final String bootstrapServers) { + public static void runKafkaWithSentryTracing(final String bootstrapServers) { final CountDownLatch consumedLatch = new CountDownLatch(1); - final Thread consumerThread = - startConsumerWithSentryInterceptor(bootstrapServers, consumedLatch); + final Thread consumerThread = startConsumerWithSentryTracing(bootstrapServers, consumedLatch); final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers); final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); @@ -79,7 +79,7 @@ public static Properties createProducerPropertiesWithSentry(final String bootstr return producerProperties; } - public static Properties createConsumerPropertiesWithSentry(final String bootstrapServers) { + public static Properties createConsumerProperties(final String bootstrapServers) { final Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.put( @@ -90,10 +90,6 @@ public static Properties createConsumerPropertiesWithSentry(final String bootstr consumerProperties.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - // Required for Sentry queue tracing in kafka-clients consumer setup. - consumerProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName()); - // Optional tuning for sample stability in CI/local runs. consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); @@ -101,13 +97,12 @@ public static Properties createConsumerPropertiesWithSentry(final String bootstr return consumerProperties; } - private static Thread startConsumerWithSentryInterceptor( + private static Thread startConsumerWithSentryTracing( final String bootstrapServers, final CountDownLatch consumedLatch) { final Thread consumerThread = new Thread( () -> { - final Properties consumerProperties = - createConsumerPropertiesWithSentry(bootstrapServers); + final Properties consumerProperties = createConsumerProperties(bootstrapServers); try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { @@ -116,9 +111,11 @@ private static Thread startConsumerWithSentryInterceptor( while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); - if (!records.isEmpty()) { - consumedLatch.countDown(); - break; + for (final ConsumerRecord record : records) { + SentryKafkaConsumerTracing.withTracing(record, consumedLatch::countDown); + if (consumedLatch.getCount() == 0) { + break; + } } } } catch (Exception ignored) { From 74430c089937dd5562752df73547f72cc30b2805 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 20 Apr 2026 14:31:43 +0200 Subject: [PATCH 3/5] ref(samples): Clarify Kafka consumer tracing sample Print the consumed Kafka record inside the manual consumer tracing callback so the sample shows where application processing happens. Update the console system test to assert the manual queue.process transaction and its manual consumer origin. Co-Authored-By: Claude --- .../sentry/samples/console/kafka/KafkaShowcase.java | 11 ++++++++++- .../sentry/systemtest/ConsoleApplicationSystemTest.kt | 3 ++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index b00b6e83d7..da89145cfe 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -112,7 +112,16 @@ private static Thread startConsumerWithSentryTracing( final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); for (final ConsumerRecord record : records) { - SentryKafkaConsumerTracing.withTracing(record, consumedLatch::countDown); + SentryKafkaConsumerTracing.withTracing( + record, + () -> { + System.out.println( + "Consumed Kafka message from " + + record.topic() + + ": " + + record.value()); + consumedLatch.countDown(); + }); if (consumedLatch.getCount() == 0) { break; } diff --git a/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt b/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt index 1b512fdc48..db6f54a616 100644 --- a/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt +++ b/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt @@ -42,7 +42,8 @@ class ConsoleApplicationSystemTest { } testHelper.ensureTransactionReceived { transaction, _ -> - testHelper.doesTransactionHaveOp(transaction, "queue.receive") && + testHelper.doesTransactionHaveOp(transaction, "queue.process") && + transaction.contexts.trace?.origin == "manual.queue.kafka.consumer" && transaction.contexts.trace?.data?.get("messaging.system") == "kafka" } } From d2f4d8cb21a223328d6b400404fbfbf134995fb6 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 20 Apr 2026 14:46:14 +0200 Subject: [PATCH 4/5] fix(kafka): Honor ignored producer span origins Short-circuit the raw Kafka producer interceptor when its trace origin is configured in ignoredSpanOrigins. This lets customers disable the integration quickly without relying on the later no-op span path, and keeps the interceptor from injecting tracing headers when the origin is ignored. Co-Authored-By: Claude --- .../kafka/SentryKafkaProducerInterceptor.java | 7 ++++++- .../kafka/SentryKafkaProducerInterceptorTest.kt | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java index 923104427e..89e621a3a3 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java @@ -9,6 +9,7 @@ import io.sentry.SpanDataConvention; import io.sentry.SpanOptions; import io.sentry.SpanStatus; +import io.sentry.util.SpanUtils; import io.sentry.util.TracingUtils; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -45,7 +46,7 @@ public SentryKafkaProducerInterceptor( @Override public @NotNull ProducerRecord onSend(final @NotNull ProducerRecord record) { - if (!scopes.getOptions().isEnableQueueTracing()) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { return record; } @@ -81,6 +82,10 @@ public SentryKafkaProducerInterceptor( public void onAcknowledgement( final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {} + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); + } + @Override public void close() {} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt index 61ac1ab20e..b9787aba09 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt @@ -90,6 +90,23 @@ class SentryKafkaProducerInterceptorTest { assertEquals(0, tx.spans.size) } + @Test + fun `does not create span when trace origin is ignored`() { + val tx = createTransaction() + options.setIgnoredSpanOrigins(listOf(SentryKafkaProducerInterceptor.TRACE_ORIGIN)) + val interceptor = SentryKafkaProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + interceptor.onSend(record) + + assertEquals(0, tx.spans.size) + assertEquals(null, record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertEquals( + null, + record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER), + ) + } + @Test fun `returns original record when no active span`() { whenever(scopes.span).thenReturn(null) From 97d82f37ae3be57ec4af023a6e18c5033dd8fc91 Mon Sep 17 00:00:00 2001 From: Alexander Dinauer Date: Mon, 20 Apr 2026 15:05:25 +0200 Subject: [PATCH 5/5] ref(spring): Use injected scopes in Kafka interceptor Stop the Spring Kafka record interceptor from reaching through the static Sentry API when forking root scopes. This keeps the raw Kafka and Spring Kafka paths aligned and makes the interceptor easier to test. Co-Authored-By: Claude --- .../kafka/SentryKafkaRecordInterceptor.java | 3 +- .../kafka/SentryKafkaRecordInterceptorTest.kt | 53 ++++++++----------- 2 files changed, 23 insertions(+), 33 deletions(-) diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index 9cfda3c237..70a115bf7d 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -5,7 +5,6 @@ import io.sentry.IScopes; import io.sentry.ISentryLifecycleToken; import io.sentry.ITransaction; -import io.sentry.Sentry; import io.sentry.SentryTraceHeader; import io.sentry.SpanDataConvention; import io.sentry.SpanStatus; @@ -60,7 +59,7 @@ public SentryKafkaRecordInterceptor( finishStaleContext(); - final @NotNull IScopes forkedScopes = Sentry.forkedRootScopes("SentryKafkaRecordInterceptor"); + final @NotNull IScopes forkedScopes = scopes.forkedRootScopes("SentryKafkaRecordInterceptor"); final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 1239b4007e..6191654012 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -22,7 +22,6 @@ import kotlin.test.assertTrue import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeaders -import org.mockito.Mockito import org.mockito.kotlin.any import org.mockito.kotlin.mock import org.mockito.kotlin.never @@ -56,6 +55,7 @@ class SentryKafkaRecordInterceptorTest { whenever(scopes.isEnabled).thenReturn(true) forkedScopes = mock() + whenever(scopes.forkedRootScopes(any())).thenReturn(forkedScopes) whenever(forkedScopes.options).thenReturn(options) whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) @@ -69,13 +69,6 @@ class SentryKafkaRecordInterceptorTest { Sentry.close() } - private fun withMockSentry(closure: () -> T): T = - Mockito.mockStatic(Sentry::class.java).use { - it.`when` { Sentry.forkedRootScopes(any()) }.thenReturn(forkedScopes) - it.`when` { Sentry.getCurrentScopes() }.thenReturn(scopes) - closure.invoke() - } - private fun createRecord( topic: String = "my-topic", headers: RecordHeaders = RecordHeaders(), @@ -120,8 +113,9 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) + verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor") verify(forkedScopes).makeCurrent() } @@ -131,7 +125,7 @@ class SentryKafkaRecordInterceptorTest { val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" val record = createRecordWithHeaders(sentryTrace = sentryTraceValue) - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) verify(forkedScopes) .continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull()) @@ -142,7 +136,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) } @@ -152,7 +146,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecordWithHeaders(deliveryAttempt = 3) - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) } @@ -162,7 +156,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) } @@ -173,7 +167,7 @@ class SentryKafkaRecordInterceptorTest { val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString() val record = createRecordWithHeaders(enqueuedTime = enqueuedTime) - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY) assertTrue(latency is Long && latency >= 0) @@ -187,6 +181,7 @@ class SentryKafkaRecordInterceptorTest { val result = interceptor.intercept(record, consumer) + verify(scopes, never()).forkedRootScopes(any()) verify(forkedScopes, never()).makeCurrent() assertEquals(record, result) } @@ -199,6 +194,7 @@ class SentryKafkaRecordInterceptorTest { val result = interceptor.intercept(record, consumer) + verify(scopes, never()).forkedRootScopes(any()) verify(forkedScopes, never()).makeCurrent() assertEquals(record, result) } @@ -210,7 +206,7 @@ class SentryKafkaRecordInterceptorTest { whenever(delegate.intercept(record, consumer)).thenReturn(record) val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) verify(delegate).intercept(record, consumer) } @@ -221,7 +217,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) interceptor.success(record, consumer) verify(delegate).success(record, consumer) @@ -234,7 +230,7 @@ class SentryKafkaRecordInterceptorTest { val record = createRecord() val exception = RuntimeException("processing failed") - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) interceptor.failure(record, exception, consumer) verify(delegate).failure(record, exception, consumer) @@ -264,7 +260,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) interceptor.clearThreadState(consumer) @@ -293,21 +289,16 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - Mockito.mockStatic(Sentry::class.java).use { mockSentry -> - mockSentry.`when` { Sentry.getCurrentScopes() }.thenReturn(scopes) - mockSentry - .`when` { Sentry.forkedRootScopes(any()) } - .thenAnswer { - callCount++ - if (callCount == 1) forkedScopes else forkedScopes2 - } + whenever(scopes.forkedRootScopes(any())).thenAnswer { + callCount++ + if (callCount == 1) forkedScopes else forkedScopes2 + } - // First intercept sets up context - interceptor.intercept(record, consumer) + // First intercept sets up context + interceptor.intercept(record, consumer) - // Second intercept without success/failure — should clean up stale context first - interceptor.intercept(record, consumer) - } + // Second intercept without success/failure — should clean up stale context first + interceptor.intercept(record, consumer) // First lifecycle token should have been closed by the defensive cleanup verify(lifecycleToken).close()