Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sentry-spring-jakarta/api/sentry-spring-jakarta.api
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ public final class io/sentry/spring/jakarta/kafka/SentryKafkaRecordInterceptor :
public fun clearThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
public fun failure (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Ljava/lang/Exception;Lorg/apache/kafka/clients/consumer/Consumer;)V
public fun intercept (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)Lorg/apache/kafka/clients/consumer/ConsumerRecord;
public fun setupThreadState (Lorg/apache/kafka/clients/consumer/Consumer;)V
public fun success (Lorg/apache/kafka/clients/consumer/ConsumerRecord;Lorg/apache/kafka/clients/consumer/Consumer;)V
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,22 @@ public void afterRecord(
}
}

@Override
public void setupThreadState(final @NotNull Consumer<?, ?> consumer) {
if (delegate != null) {
delegate.setupThreadState(consumer);
}
}

@Override
public void clearThreadState(final @NotNull Consumer<?, ?> consumer) {
finishStaleContext();
try {
finishStaleContext();
} finally {
if (delegate != null) {
delegate.clearThreadState(consumer);
}
}
}

private boolean isIgnored() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,52 @@ class SentryKafkaRecordInterceptorTest {
interceptor.clearThreadState(consumer)
}

@Test
fun `setupThreadState delegates to existing interceptor`() {
val delegate = mock<RecordInterceptor<String, String>>()
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)

interceptor.setupThreadState(consumer)

verify(delegate).setupThreadState(consumer)
}

@Test
fun `setupThreadState is no-op without delegate`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)

// should not throw
interceptor.setupThreadState(consumer)
}

@Test
fun `clearThreadState delegates to existing interceptor`() {
val delegate = mock<RecordInterceptor<String, String>>()
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)

interceptor.clearThreadState(consumer)

verify(delegate).clearThreadState(consumer)
}

@Test
fun `clearThreadState delegates to existing interceptor even when sentry cleanup throws`() {
val delegate = mock<RecordInterceptor<String, String>>()
whenever(lifecycleToken.close()).thenThrow(RuntimeException("boom"))
val interceptor = SentryKafkaRecordInterceptor(scopes, delegate)
val record = createRecord()

interceptor.intercept(record, consumer)

try {
interceptor.clearThreadState(consumer)
} catch (ignored: RuntimeException) {
// expected
}

verify(delegate).clearThreadState(consumer)
}

@Test
fun `intercept cleans up stale context from previous record`() {
val lifecycleToken2 = mock<ISentryLifecycleToken>()
Expand Down
Loading