diff --git a/sentry-kafka/api/sentry-kafka.api b/sentry-kafka/api/sentry-kafka.api index 6fe7f41222..ce5b0efb66 100644 --- a/sentry-kafka/api/sentry-kafka.api +++ b/sentry-kafka/api/sentry-kafka.api @@ -3,14 +3,10 @@ public final class io/sentry/kafka/BuildConfig { public static final field VERSION_NAME Ljava/lang/String; } -public final class io/sentry/kafka/SentryKafkaConsumerInterceptor : org/apache/kafka/clients/consumer/ConsumerInterceptor { +public final class io/sentry/kafka/SentryKafkaConsumerTracing { public static final field TRACE_ORIGIN Ljava/lang/String; - public fun ()V - public fun (Lio/sentry/IScopes;)V - public fun close ()V - public fun configure (Ljava/util/Map;)V - public fun onCommit (Ljava/util/Map;)V - public fun onConsume (Lorg/apache/kafka/clients/consumer/ConsumerRecords;)Lorg/apache/kafka/clients/consumer/ConsumerRecords; + public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Runnable;)V + public static fun withTracing (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/util/concurrent/Callable;)Ljava/lang/Object; } public final class io/sentry/kafka/SentryKafkaProducerInterceptor : org/apache/kafka/clients/producer/ProducerInterceptor { diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java deleted file mode 100644 index a37d01cd90..0000000000 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerInterceptor.java +++ /dev/null @@ -1,100 +0,0 @@ -package io.sentry.kafka; - -import io.sentry.BaggageHeader; -import io.sentry.IScopes; -import io.sentry.ITransaction; -import io.sentry.ScopesAdapter; -import io.sentry.SentryTraceHeader; -import io.sentry.SpanDataConvention; -import io.sentry.SpanStatus; -import io.sentry.TransactionContext; -import io.sentry.TransactionOptions; -import java.nio.charset.StandardCharsets; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import org.apache.kafka.clients.consumer.ConsumerInterceptor; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.header.Header; -import org.jetbrains.annotations.ApiStatus; -import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; - -@ApiStatus.Internal -public final class SentryKafkaConsumerInterceptor implements ConsumerInterceptor { - - public static final @NotNull String TRACE_ORIGIN = "auto.queue.kafka.consumer"; - - private final @NotNull IScopes scopes; - - public SentryKafkaConsumerInterceptor() { - this(ScopesAdapter.getInstance()); - } - - public SentryKafkaConsumerInterceptor(final @NotNull IScopes scopes) { - this.scopes = scopes; - } - - @Override - public @NotNull ConsumerRecords onConsume(final @NotNull ConsumerRecords records) { - if (!scopes.getOptions().isEnableQueueTracing() || records.isEmpty()) { - return records; - } - - final @NotNull ConsumerRecord firstRecord = records.iterator().next(); - - try { - final @Nullable TransactionContext continued = continueTrace(firstRecord); - final @NotNull TransactionContext txContext = - continued != null ? continued : new TransactionContext("queue.receive", "queue.receive"); - txContext.setName("queue.receive"); - txContext.setOperation("queue.receive"); - - final @NotNull TransactionOptions txOptions = new TransactionOptions(); - txOptions.setOrigin(TRACE_ORIGIN); - txOptions.setBindToScope(false); - - final @NotNull ITransaction transaction = scopes.startTransaction(txContext, txOptions); - if (!transaction.isNoOp()) { - transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); - transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, firstRecord.topic()); - transaction.setData("messaging.batch.message.count", records.count()); - transaction.setStatus(SpanStatus.OK); - transaction.finish(); - } - } catch (Throwable ignored) { - // Instrumentation must never break the customer's Kafka poll loop. - } - - return records; - } - - @Override - public void onCommit(final @NotNull Map offsets) {} - - @Override - public void close() {} - - @Override - public void configure(final @Nullable Map configs) {} - - private @Nullable TransactionContext continueTrace(final @NotNull ConsumerRecord record) { - final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); - final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); - final @Nullable List baggageHeaders = - baggage != null ? Collections.singletonList(baggage) : null; - return scopes.continueTrace(sentryTrace, baggageHeaders); - } - - private @Nullable String headerValue( - final @NotNull ConsumerRecord record, final @NotNull String headerName) { - final @Nullable Header header = record.headers().lastHeader(headerName); - if (header == null || header.value() == null) { - return null; - } - return new String(header.value(), StandardCharsets.UTF_8); - } -} diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java new file mode 100644 index 0000000000..1c85634b10 --- /dev/null +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaConsumerTracing.java @@ -0,0 +1,255 @@ +package io.sentry.kafka; + +import io.sentry.BaggageHeader; +import io.sentry.DateUtils; +import io.sentry.IScopes; +import io.sentry.ISentryLifecycleToken; +import io.sentry.ITransaction; +import io.sentry.ScopesAdapter; +import io.sentry.SentryTraceHeader; +import io.sentry.SpanDataConvention; +import io.sentry.SpanStatus; +import io.sentry.TransactionContext; +import io.sentry.TransactionOptions; +import io.sentry.util.SpanUtils; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.jetbrains.annotations.ApiStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** Helper methods for instrumenting raw Kafka consumer record processing. */ +@ApiStatus.Experimental +public final class SentryKafkaConsumerTracing { + + public static final @NotNull String TRACE_ORIGIN = "manual.queue.kafka.consumer"; + + private static final @NotNull String CREATOR = "SentryKafkaConsumerTracing"; + private static final @NotNull String DELIVERY_ATTEMPT_HEADER = "kafka_deliveryAttempt"; + private static final @NotNull String MESSAGE_ID_HEADER = "messaging.message.id"; + + private final @NotNull IScopes scopes; + + SentryKafkaConsumerTracing(final @NotNull IScopes scopes) { + this.scopes = scopes; + } + + /** + * Runs the provided {@link Callable} with a Kafka consumer processing transaction for the given + * record. + * + * @param record the Kafka record being processed + * @param callable the processing callback + * @return the return value of the callback + * @param the Kafka record key type + * @param the Kafka record value type + * @param the callback return type + */ + public static U withTracing( + final @NotNull ConsumerRecord record, final @NotNull Callable callable) + throws Exception { + return new SentryKafkaConsumerTracing(ScopesAdapter.getInstance()) + .withTracingImpl(record, callable); + } + + /** + * Runs the provided {@link Runnable} with a Kafka consumer processing transaction for the given + * record. + * + * @param record the Kafka record being processed + * @param runnable the processing callback + * @param the Kafka record key type + * @param the Kafka record value type + */ + public static void withTracing( + final @NotNull ConsumerRecord record, final @NotNull Runnable runnable) { + new SentryKafkaConsumerTracing(ScopesAdapter.getInstance()).withTracingImpl(record, runnable); + } + + U withTracingImpl( + final @NotNull ConsumerRecord record, final @NotNull Callable callable) + throws Exception { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { + return callable.call(); + } + + final @NotNull IScopes forkedScopes; + final @NotNull ISentryLifecycleToken lifecycleToken; + try { + forkedScopes = scopes.forkedRootScopes(CREATOR); + lifecycleToken = forkedScopes.makeCurrent(); + } catch (Throwable ignored) { + return callable.call(); + } + + try (final @NotNull ISentryLifecycleToken ignored = lifecycleToken) { + final @Nullable ITransaction transaction = startTransaction(forkedScopes, record); + boolean didError = false; + @Nullable Throwable callbackThrowable = null; + + try { + return callable.call(); + } catch (Throwable t) { + didError = true; + callbackThrowable = t; + throw t; + } finally { + finishTransaction( + transaction, didError ? SpanStatus.INTERNAL_ERROR : SpanStatus.OK, callbackThrowable); + } + } + } + + void withTracingImpl( + final @NotNull ConsumerRecord record, final @NotNull Runnable runnable) { + try { + withTracingImpl( + record, + () -> { + runnable.run(); + return null; + }); + } catch (Throwable t) { + throwUnchecked(t); + } + } + + @SuppressWarnings("unchecked") + private static void throwUnchecked(final @NotNull Throwable throwable) + throws T { + throw (T) throwable; + } + + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), TRACE_ORIGIN); + } + + private @Nullable ITransaction startTransaction( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + try { + final @Nullable TransactionContext continued = continueTrace(forkedScopes, record); + if (!forkedScopes.getOptions().isTracingEnabled()) { + return null; + } + + final @NotNull TransactionContext txContext = + continued != null ? continued : new TransactionContext("queue.process", "queue.process"); + txContext.setName("queue.process"); + txContext.setOperation("queue.process"); + + final @NotNull TransactionOptions txOptions = new TransactionOptions(); + txOptions.setOrigin(TRACE_ORIGIN); + txOptions.setBindToScope(true); + + final @NotNull ITransaction transaction = forkedScopes.startTransaction(txContext, txOptions); + if (transaction.isNoOp()) { + return null; + } + + transaction.setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka"); + transaction.setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, record.topic()); + + final @Nullable String messageId = headerValue(record, MESSAGE_ID_HEADER); + if (messageId != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_ID, messageId); + } + + final int bodySize = record.serializedValueSize(); + if (bodySize >= 0) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, bodySize); + } + + final @Nullable Integer retryCount = retryCount(record); + if (retryCount != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, retryCount); + } + + final @Nullable Long receiveLatency = receiveLatency(record); + if (receiveLatency != null) { + transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, receiveLatency); + } + + return transaction; + } catch (Throwable ignored) { + return null; + } + } + + private void finishTransaction( + final @Nullable ITransaction transaction, + final @NotNull SpanStatus status, + final @Nullable Throwable throwable) { + if (transaction == null || transaction.isNoOp()) { + return; + } + + try { + transaction.setStatus(status); + if (throwable != null) { + transaction.setThrowable(throwable); + } + transaction.finish(); + } catch (Throwable ignored) { + // Instrumentation must never break customer processing. + } + } + + private @Nullable TransactionContext continueTrace( + final @NotNull IScopes forkedScopes, final @NotNull ConsumerRecord record) { + final @Nullable String sentryTrace = headerValue(record, SentryTraceHeader.SENTRY_TRACE_HEADER); + final @Nullable String baggage = headerValue(record, BaggageHeader.BAGGAGE_HEADER); + final @Nullable List baggageHeaders = + baggage != null ? Collections.singletonList(baggage) : null; + return forkedScopes.continueTrace(sentryTrace, baggageHeaders); + } + + private @Nullable Integer retryCount(final @NotNull ConsumerRecord record) { + final @Nullable Header header = record.headers().lastHeader(DELIVERY_ATTEMPT_HEADER); + if (header == null) { + return null; + } + + final byte[] value = header.value(); + if (value == null || value.length != Integer.BYTES) { + return null; + } + + final int attempt = ByteBuffer.wrap(value).getInt(); + if (attempt <= 0) { + return null; + } + + return attempt - 1; + } + + private @Nullable Long receiveLatency(final @NotNull ConsumerRecord record) { + final @Nullable String enqueuedTimeStr = + headerValue(record, SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER); + if (enqueuedTimeStr == null) { + return null; + } + + try { + final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr); + final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis()); + final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000); + return latencyMs >= 0 ? latencyMs : null; + } catch (NumberFormatException ignored) { + return null; + } + } + + private @Nullable String headerValue( + final @NotNull ConsumerRecord record, final @NotNull String headerName) { + final @Nullable Header header = record.headers().lastHeader(headerName); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } +} diff --git a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java index 923104427e..89e621a3a3 100644 --- a/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java +++ b/sentry-kafka/src/main/java/io/sentry/kafka/SentryKafkaProducerInterceptor.java @@ -9,6 +9,7 @@ import io.sentry.SpanDataConvention; import io.sentry.SpanOptions; import io.sentry.SpanStatus; +import io.sentry.util.SpanUtils; import io.sentry.util.TracingUtils; import java.nio.charset.StandardCharsets; import java.util.Map; @@ -45,7 +46,7 @@ public SentryKafkaProducerInterceptor( @Override public @NotNull ProducerRecord onSend(final @NotNull ProducerRecord record) { - if (!scopes.getOptions().isEnableQueueTracing()) { + if (!scopes.getOptions().isEnableQueueTracing() || isIgnored()) { return record; } @@ -81,6 +82,10 @@ public SentryKafkaProducerInterceptor( public void onAcknowledgement( final @Nullable RecordMetadata metadata, final @Nullable Exception exception) {} + private boolean isIgnored() { + return SpanUtils.isIgnored(scopes.getOptions().getIgnoredSpanOrigins(), traceOrigin); + } + @Override public void close() {} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt deleted file mode 100644 index f6786bc8f5..0000000000 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerInterceptorTest.kt +++ /dev/null @@ -1,100 +0,0 @@ -package io.sentry.kafka - -import io.sentry.IScopes -import io.sentry.ITransaction -import io.sentry.Sentry -import io.sentry.SentryOptions -import io.sentry.TransactionContext -import io.sentry.TransactionOptions -import io.sentry.test.initForTest -import kotlin.test.AfterTest -import kotlin.test.BeforeTest -import kotlin.test.Test -import kotlin.test.assertSame -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.clients.consumer.ConsumerRecords -import org.apache.kafka.clients.consumer.OffsetAndMetadata -import org.apache.kafka.common.TopicPartition -import org.mockito.kotlin.any -import org.mockito.kotlin.mock -import org.mockito.kotlin.never -import org.mockito.kotlin.verify -import org.mockito.kotlin.whenever - -class SentryKafkaConsumerInterceptorTest { - - @BeforeTest - fun setup() { - initForTest { - it.dsn = "https://key@sentry.io/proj" - it.isEnableQueueTracing = true - it.tracesSampleRate = 1.0 - } - } - - @AfterTest - fun teardown() { - Sentry.close() - } - - @Test - fun `does nothing when queue tracing is disabled`() { - val scopes = mock() - val options = SentryOptions().apply { isEnableQueueTracing = false } - whenever(scopes.options).thenReturn(options) - - val interceptor = SentryKafkaConsumerInterceptor(scopes) - val records = singleRecordBatch() - - val result = interceptor.onConsume(records) - - assertSame(records, result) - verify(scopes, never()).startTransaction(any(), any()) - } - - @Test - fun `starts and finishes queue receive transaction for consumed batch`() { - val scopes = mock() - val options = SentryOptions().apply { isEnableQueueTracing = true } - val transaction = mock() - - whenever(scopes.options).thenReturn(options) - whenever(scopes.continueTrace(any(), any())).thenReturn(null) - whenever(scopes.startTransaction(any(), any())) - .thenReturn(transaction) - whenever(transaction.isNoOp).thenReturn(false) - - val interceptor = SentryKafkaConsumerInterceptor(scopes) - - interceptor.onConsume(singleRecordBatch()) - - verify(scopes).startTransaction(any(), any()) - verify(transaction).setData("messaging.system", "kafka") - verify(transaction).setData("messaging.destination.name", "my-topic") - verify(transaction).setData("messaging.batch.message.count", 1) - verify(transaction).finish() - } - - @Test - fun `commit callback is no-op`() { - val interceptor = SentryKafkaConsumerInterceptor(mock()) - - interceptor.onCommit(mapOf(TopicPartition("my-topic", 0) to OffsetAndMetadata(1))) - } - - @Test - fun `no-arg constructor uses current scopes`() { - val interceptor = SentryKafkaConsumerInterceptor() - val records = singleRecordBatch() - - val result = interceptor.onConsume(records) - - assertSame(records, result) - } - - private fun singleRecordBatch(): ConsumerRecords { - val partition = TopicPartition("my-topic", 0) - val record = ConsumerRecord("my-topic", 0, 0L, "key", "value") - return ConsumerRecords(mapOf(partition to listOf(record))) - } -} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt new file mode 100644 index 0000000000..29283102fa --- /dev/null +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaConsumerTracingTest.kt @@ -0,0 +1,235 @@ +package io.sentry.kafka + +import io.sentry.BaggageHeader +import io.sentry.IScopes +import io.sentry.ISentryLifecycleToken +import io.sentry.ITransaction +import io.sentry.SentryOptions +import io.sentry.SentryTraceHeader +import io.sentry.SpanDataConvention +import io.sentry.SpanStatus +import io.sentry.TransactionContext +import io.sentry.TransactionOptions +import java.io.IOException +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.Optional +import java.util.concurrent.Callable +import java.util.concurrent.atomic.AtomicBoolean +import kotlin.test.BeforeTest +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith +import kotlin.test.assertTrue +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.internals.RecordHeaders +import org.apache.kafka.common.record.TimestampType +import org.mockito.kotlin.any +import org.mockito.kotlin.argumentCaptor +import org.mockito.kotlin.check +import org.mockito.kotlin.eq +import org.mockito.kotlin.mock +import org.mockito.kotlin.never +import org.mockito.kotlin.verify +import org.mockito.kotlin.whenever + +class SentryKafkaConsumerTracingTest { + + private lateinit var scopes: IScopes + private lateinit var forkedScopes: IScopes + private lateinit var options: SentryOptions + private lateinit var lifecycleToken: ISentryLifecycleToken + private lateinit var transaction: ITransaction + private lateinit var tracing: SentryKafkaConsumerTracing + + @BeforeTest + fun setup() { + scopes = mock() + forkedScopes = mock() + lifecycleToken = mock() + transaction = mock() + tracing = SentryKafkaConsumerTracing(scopes) + + options = + SentryOptions().apply { + dsn = "https://key@sentry.io/proj" + isEnableQueueTracing = true + tracesSampleRate = 1.0 + } + + whenever(scopes.options).thenReturn(options) + whenever(scopes.forkedRootScopes(any())).thenReturn(forkedScopes) + whenever(forkedScopes.options).thenReturn(options) + whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) + whenever(forkedScopes.startTransaction(any(), any())) + .thenReturn(transaction) + whenever(transaction.isNoOp).thenReturn(false) + } + + @Test + fun `withTracing creates queue process transaction with record metadata`() { + val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" + val baggageValue = "sentry-sample_rate=1" + val record = + createRecord( + sentryTrace = sentryTraceValue, + baggage = baggageValue, + messageId = "message-123", + deliveryAttempt = 3, + enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString(), + serializedValueSize = 5, + ) + + val txContextCaptor = argumentCaptor() + val txOptionsCaptor = argumentCaptor() + + val result = tracing.withTracingImpl(record, Callable { "done" }) + + assertEquals("done", result) + verify(scopes).forkedRootScopes("SentryKafkaConsumerTracing") + verify(forkedScopes).makeCurrent() + verify(forkedScopes).continueTrace(eq(sentryTraceValue), eq(listOf(baggageValue))) + verify(forkedScopes).startTransaction(txContextCaptor.capture(), txOptionsCaptor.capture()) + + assertEquals("queue.process", txContextCaptor.firstValue.name) + assertEquals("queue.process", txContextCaptor.firstValue.operation) + assertEquals(SentryKafkaConsumerTracing.TRACE_ORIGIN, txOptionsCaptor.firstValue.origin) + assertTrue(txOptionsCaptor.firstValue.isBindToScope) + + verify(transaction).setData(SpanDataConvention.MESSAGING_SYSTEM, "kafka") + verify(transaction).setData(SpanDataConvention.MESSAGING_DESTINATION_NAME, "my-topic") + verify(transaction).setData(SpanDataConvention.MESSAGING_MESSAGE_ID, "message-123") + verify(transaction).setData(SpanDataConvention.MESSAGING_MESSAGE_BODY_SIZE, 5) + verify(transaction).setData(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT, 2) + verify(transaction) + .setData( + eq(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY), + check { assertTrue(it >= 0) }, + ) + verify(transaction).setStatus(SpanStatus.OK) + verify(transaction).finish() + verify(lifecycleToken).close() + } + + @Test + fun `withTracing skips scope forking when queue tracing is disabled`() { + options.isEnableQueueTracing = false + val record = createRecord() + + val result = tracing.withTracingImpl(record, Callable { "done" }) + + assertEquals("done", result) + verify(scopes, never()).forkedRootScopes(any()) + } + + @Test + fun `withTracing skips scope forking when origin is ignored`() { + options.setIgnoredSpanOrigins(listOf(SentryKafkaConsumerTracing.TRACE_ORIGIN)) + val record = createRecord() + + val result = tracing.withTracingImpl(record, Callable { "done" }) + + assertEquals("done", result) + verify(scopes, never()).forkedRootScopes(any()) + } + + @Test + fun `withTracing marks transaction as error when callback throws`() { + val record = createRecord() + val exception = RuntimeException("boom") + + val thrown = + assertFailsWith { + tracing.withTracingImpl(record, Callable { throw exception }) + } + + assertEquals(exception, thrown) + verify(transaction).setStatus(SpanStatus.INTERNAL_ERROR) + verify(transaction).setThrowable(exception) + verify(transaction).finish() + verify(lifecycleToken).close() + } + + @Test + fun `withTracing falls back to direct callback execution when instrumentation setup fails`() { + whenever(scopes.forkedRootScopes(any())) + .thenThrow(RuntimeException("broken instrumentation")) + val record = createRecord() + + val result = tracing.withTracingImpl(record, Callable { "done" }) + + assertEquals("done", result) + verify(forkedScopes, never()).makeCurrent() + verify(transaction, never()).finish() + } + + @Test + fun `withTracing runnable overload executes callback`() { + val record = createRecord() + val didRun = AtomicBoolean(false) + + tracing.withTracingImpl(record, Runnable { didRun.set(true) }) + + assertTrue(didRun.get()) + verify(transaction).setStatus(SpanStatus.OK) + verify(transaction).finish() + } + + @Test + fun `withTracing runnable overload preserves original throwable`() { + val record = createRecord() + val exception = IOException("boom") + + val thrown = + assertFailsWith { tracing.withTracingImpl(record, Runnable { throw exception }) } + + assertEquals(exception, thrown) + verify(transaction).setStatus(SpanStatus.INTERNAL_ERROR) + verify(transaction).setThrowable(exception) + verify(transaction).finish() + } + + private fun createRecord( + topic: String = "my-topic", + sentryTrace: String? = null, + baggage: String? = null, + messageId: String? = null, + deliveryAttempt: Int? = null, + enqueuedTime: String? = null, + serializedValueSize: Int = -1, + ): ConsumerRecord { + val headers = RecordHeaders() + sentryTrace?.let { + headers.add(SentryTraceHeader.SENTRY_TRACE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + baggage?.let { + headers.add(BaggageHeader.BAGGAGE_HEADER, it.toByteArray(StandardCharsets.UTF_8)) + } + messageId?.let { + headers.add(SpanDataConvention.MESSAGING_MESSAGE_ID, it.toByteArray(StandardCharsets.UTF_8)) + } + deliveryAttempt?.let { + headers.add("kafka_deliveryAttempt", ByteBuffer.allocate(Int.SIZE_BYTES).putInt(it).array()) + } + enqueuedTime?.let { + headers.add( + SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER, + it.toByteArray(StandardCharsets.UTF_8), + ) + } + + return ConsumerRecord( + topic, + 0, + 0L, + System.currentTimeMillis(), + TimestampType.CREATE_TIME, + 3, + serializedValueSize, + "key", + "value", + headers, + Optional.empty(), + ) + } +} diff --git a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt index 61ac1ab20e..b9787aba09 100644 --- a/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt +++ b/sentry-kafka/src/test/kotlin/io/sentry/kafka/SentryKafkaProducerInterceptorTest.kt @@ -90,6 +90,23 @@ class SentryKafkaProducerInterceptorTest { assertEquals(0, tx.spans.size) } + @Test + fun `does not create span when trace origin is ignored`() { + val tx = createTransaction() + options.setIgnoredSpanOrigins(listOf(SentryKafkaProducerInterceptor.TRACE_ORIGIN)) + val interceptor = SentryKafkaProducerInterceptor(scopes) + val record = ProducerRecord("my-topic", "key", "value") + + interceptor.onSend(record) + + assertEquals(0, tx.spans.size) + assertEquals(null, record.headers().lastHeader(SentryTraceHeader.SENTRY_TRACE_HEADER)) + assertEquals( + null, + record.headers().lastHeader(SentryKafkaProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER), + ) + } + @Test fun `returns original record when no active span`() { whenever(scopes.span).thenReturn(null) diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java index 4fee0a8374..2a45ef6902 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/Main.java @@ -184,11 +184,11 @@ public static void main(String[] args) throws InterruptedException { // cache.remove, and cache.flush spans as children of the active transaction. demonstrateCacheTracing(); - // Kafka queue tracing with kafka-clients interceptors. + // Kafka queue tracing with the kafka-clients producer interceptor and manual consumer tracing. // // Enable with: SENTRY_SAMPLE_KAFKA_BOOTSTRAP_SERVERS=localhost:9092 if (kafkaEnabled) { - KafkaShowcase.runKafkaWithSentryInterceptors(kafkaBootstrapServers); + KafkaShowcase.runKafkaWithSentryTracing(kafkaBootstrapServers); } // Performance feature diff --git a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java index 9c84b6f004..da89145cfe 100644 --- a/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java +++ b/sentry-samples/sentry-samples-console/src/main/java/io/sentry/samples/console/kafka/KafkaShowcase.java @@ -3,7 +3,7 @@ import io.sentry.ISentryLifecycleToken; import io.sentry.ITransaction; import io.sentry.Sentry; -import io.sentry.kafka.SentryKafkaConsumerInterceptor; +import io.sentry.kafka.SentryKafkaConsumerTracing; import io.sentry.kafka.SentryKafkaProducerInterceptor; import java.time.Duration; import java.util.Collections; @@ -12,6 +12,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; @@ -26,10 +27,9 @@ public final class KafkaShowcase { private KafkaShowcase() {} - public static void runKafkaWithSentryInterceptors(final String bootstrapServers) { + public static void runKafkaWithSentryTracing(final String bootstrapServers) { final CountDownLatch consumedLatch = new CountDownLatch(1); - final Thread consumerThread = - startConsumerWithSentryInterceptor(bootstrapServers, consumedLatch); + final Thread consumerThread = startConsumerWithSentryTracing(bootstrapServers, consumedLatch); final Properties producerProperties = createProducerPropertiesWithSentry(bootstrapServers); final ITransaction transaction = Sentry.startTransaction("kafka-demo", "demo"); @@ -79,7 +79,7 @@ public static Properties createProducerPropertiesWithSentry(final String bootstr return producerProperties; } - public static Properties createConsumerPropertiesWithSentry(final String bootstrapServers) { + public static Properties createConsumerProperties(final String bootstrapServers) { final Properties consumerProperties = new Properties(); consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); consumerProperties.put( @@ -90,10 +90,6 @@ public static Properties createConsumerPropertiesWithSentry(final String bootstr consumerProperties.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); - // Required for Sentry queue tracing in kafka-clients consumer setup. - consumerProperties.put( - ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, SentryKafkaConsumerInterceptor.class.getName()); - // Optional tuning for sample stability in CI/local runs. consumerProperties.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 2000); consumerProperties.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 2000); @@ -101,13 +97,12 @@ public static Properties createConsumerPropertiesWithSentry(final String bootstr return consumerProperties; } - private static Thread startConsumerWithSentryInterceptor( + private static Thread startConsumerWithSentryTracing( final String bootstrapServers, final CountDownLatch consumedLatch) { final Thread consumerThread = new Thread( () -> { - final Properties consumerProperties = - createConsumerPropertiesWithSentry(bootstrapServers); + final Properties consumerProperties = createConsumerProperties(bootstrapServers); try (KafkaConsumer consumer = new KafkaConsumer<>(consumerProperties)) { @@ -116,9 +111,20 @@ private static Thread startConsumerWithSentryInterceptor( while (!Thread.currentThread().isInterrupted() && consumedLatch.getCount() > 0) { final ConsumerRecords records = consumer.poll(Duration.ofMillis(500)); - if (!records.isEmpty()) { - consumedLatch.countDown(); - break; + for (final ConsumerRecord record : records) { + SentryKafkaConsumerTracing.withTracing( + record, + () -> { + System.out.println( + "Consumed Kafka message from " + + record.topic() + + ": " + + record.value()); + consumedLatch.countDown(); + }); + if (consumedLatch.getCount() == 0) { + break; + } } } } catch (Exception ignored) { diff --git a/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt b/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt index 1b512fdc48..db6f54a616 100644 --- a/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt +++ b/sentry-samples/sentry-samples-console/src/test/kotlin/io/sentry/systemtest/ConsoleApplicationSystemTest.kt @@ -42,7 +42,8 @@ class ConsoleApplicationSystemTest { } testHelper.ensureTransactionReceived { transaction, _ -> - testHelper.doesTransactionHaveOp(transaction, "queue.receive") && + testHelper.doesTransactionHaveOp(transaction, "queue.process") && + transaction.contexts.trace?.origin == "manual.queue.kafka.consumer" && transaction.contexts.trace?.data?.get("messaging.system") == "kafka" } } diff --git a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java index 9cfda3c237..70a115bf7d 100644 --- a/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java +++ b/sentry-spring-jakarta/src/main/java/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor.java @@ -5,7 +5,6 @@ import io.sentry.IScopes; import io.sentry.ISentryLifecycleToken; import io.sentry.ITransaction; -import io.sentry.Sentry; import io.sentry.SentryTraceHeader; import io.sentry.SpanDataConvention; import io.sentry.SpanStatus; @@ -60,7 +59,7 @@ public SentryKafkaRecordInterceptor( finishStaleContext(); - final @NotNull IScopes forkedScopes = Sentry.forkedRootScopes("SentryKafkaRecordInterceptor"); + final @NotNull IScopes forkedScopes = scopes.forkedRootScopes("SentryKafkaRecordInterceptor"); final @NotNull ISentryLifecycleToken lifecycleToken = forkedScopes.makeCurrent(); final @Nullable TransactionContext transactionContext = continueTrace(forkedScopes, record); diff --git a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt index 1239b4007e..6191654012 100644 --- a/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt +++ b/sentry-spring-jakarta/src/test/kotlin/io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptorTest.kt @@ -22,7 +22,6 @@ import kotlin.test.assertTrue import org.apache.kafka.clients.consumer.Consumer import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.header.internals.RecordHeaders -import org.mockito.Mockito import org.mockito.kotlin.any import org.mockito.kotlin.mock import org.mockito.kotlin.never @@ -56,6 +55,7 @@ class SentryKafkaRecordInterceptorTest { whenever(scopes.isEnabled).thenReturn(true) forkedScopes = mock() + whenever(scopes.forkedRootScopes(any())).thenReturn(forkedScopes) whenever(forkedScopes.options).thenReturn(options) whenever(forkedScopes.makeCurrent()).thenReturn(lifecycleToken) @@ -69,13 +69,6 @@ class SentryKafkaRecordInterceptorTest { Sentry.close() } - private fun withMockSentry(closure: () -> T): T = - Mockito.mockStatic(Sentry::class.java).use { - it.`when` { Sentry.forkedRootScopes(any()) }.thenReturn(forkedScopes) - it.`when` { Sentry.getCurrentScopes() }.thenReturn(scopes) - closure.invoke() - } - private fun createRecord( topic: String = "my-topic", headers: RecordHeaders = RecordHeaders(), @@ -120,8 +113,9 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) + verify(scopes).forkedRootScopes("SentryKafkaRecordInterceptor") verify(forkedScopes).makeCurrent() } @@ -131,7 +125,7 @@ class SentryKafkaRecordInterceptorTest { val sentryTraceValue = "2722d9f6ec019ade60c776169d9a8904-cedf5b7571cb4972-1" val record = createRecordWithHeaders(sentryTrace = sentryTraceValue) - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) verify(forkedScopes) .continueTrace(org.mockito.kotlin.eq(sentryTraceValue), org.mockito.kotlin.isNull()) @@ -142,7 +136,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) verify(forkedScopes).continueTrace(org.mockito.kotlin.isNull(), org.mockito.kotlin.isNull()) } @@ -152,7 +146,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecordWithHeaders(deliveryAttempt = 3) - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) assertEquals(2, transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) } @@ -162,7 +156,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT)) } @@ -173,7 +167,7 @@ class SentryKafkaRecordInterceptorTest { val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString() val record = createRecordWithHeaders(enqueuedTime = enqueuedTime) - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY) assertTrue(latency is Long && latency >= 0) @@ -187,6 +181,7 @@ class SentryKafkaRecordInterceptorTest { val result = interceptor.intercept(record, consumer) + verify(scopes, never()).forkedRootScopes(any()) verify(forkedScopes, never()).makeCurrent() assertEquals(record, result) } @@ -199,6 +194,7 @@ class SentryKafkaRecordInterceptorTest { val result = interceptor.intercept(record, consumer) + verify(scopes, never()).forkedRootScopes(any()) verify(forkedScopes, never()).makeCurrent() assertEquals(record, result) } @@ -210,7 +206,7 @@ class SentryKafkaRecordInterceptorTest { whenever(delegate.intercept(record, consumer)).thenReturn(record) val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) verify(delegate).intercept(record, consumer) } @@ -221,7 +217,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes, delegate) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) interceptor.success(record, consumer) verify(delegate).success(record, consumer) @@ -234,7 +230,7 @@ class SentryKafkaRecordInterceptorTest { val record = createRecord() val exception = RuntimeException("processing failed") - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) interceptor.failure(record, exception, consumer) verify(delegate).failure(record, exception, consumer) @@ -264,7 +260,7 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - withMockSentry { interceptor.intercept(record, consumer) } + interceptor.intercept(record, consumer) interceptor.clearThreadState(consumer) @@ -293,21 +289,16 @@ class SentryKafkaRecordInterceptorTest { val interceptor = SentryKafkaRecordInterceptor(scopes) val record = createRecord() - Mockito.mockStatic(Sentry::class.java).use { mockSentry -> - mockSentry.`when` { Sentry.getCurrentScopes() }.thenReturn(scopes) - mockSentry - .`when` { Sentry.forkedRootScopes(any()) } - .thenAnswer { - callCount++ - if (callCount == 1) forkedScopes else forkedScopes2 - } + whenever(scopes.forkedRootScopes(any())).thenAnswer { + callCount++ + if (callCount == 1) forkedScopes else forkedScopes2 + } - // First intercept sets up context - interceptor.intercept(record, consumer) + // First intercept sets up context + interceptor.intercept(record, consumer) - // Second intercept without success/failure — should clean up stale context first - interceptor.intercept(record, consumer) - } + // Second intercept without success/failure — should clean up stale context first + interceptor.intercept(record, consumer) // First lifecycle token should have been closed by the defensive cleanup verify(lifecycleToken).close()