Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.sentry.spring.jakarta.kafka;

import io.sentry.BaggageHeader;
import io.sentry.DateUtils;
import io.sentry.IScopes;
import io.sentry.ISentryLifecycleToken;
import io.sentry.ITransaction;
Expand Down Expand Up @@ -172,8 +173,9 @@ private boolean isIgnored() {
headerValue(record, SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER);
if (enqueuedTimeStr != null) {
try {
final long enqueuedTime = Long.parseLong(enqueuedTimeStr);
final long latencyMs = System.currentTimeMillis() - enqueuedTime;
final double enqueuedTimeSeconds = Double.parseDouble(enqueuedTimeStr);
final double nowSeconds = DateUtils.millisToSeconds(System.currentTimeMillis());
final long latencyMs = (long) ((nowSeconds - enqueuedTimeSeconds) * 1000);
if (latencyMs >= 0) {
transaction.setData(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY, latencyMs);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.sentry.spring.jakarta.kafka;

import io.sentry.BaggageHeader;
import io.sentry.DateUtils;
import io.sentry.IScopes;
import io.sentry.ISpan;
import io.sentry.SentryTraceHeader;
Expand Down Expand Up @@ -107,6 +108,7 @@ private void injectHeaders(final @NotNull Headers headers, final @NotNull ISpan
headers.remove(SENTRY_ENQUEUED_TIME_HEADER);
headers.add(
SENTRY_ENQUEUED_TIME_HEADER,
String.valueOf(System.currentTimeMillis()).getBytes(StandardCharsets.UTF_8));
String.valueOf(DateUtils.millisToSeconds(System.currentTimeMillis()))
.getBytes(StandardCharsets.UTF_8));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import kotlin.test.BeforeTest
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.header.internals.RecordHeaders
Expand Down Expand Up @@ -86,7 +87,7 @@ class SentryKafkaRecordInterceptorTest {
private fun createRecordWithHeaders(
sentryTrace: String? = null,
baggage: String? = null,
enqueuedTime: Long? = null,
enqueuedTime: String? = null,
deliveryAttempt: Int? = null,
): ConsumerRecord<String, String> {
val headers = RecordHeaders()
Expand All @@ -99,7 +100,7 @@ class SentryKafkaRecordInterceptorTest {
enqueuedTime?.let {
headers.add(
SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER,
it.toString().toByteArray(StandardCharsets.UTF_8),
it.toByteArray(StandardCharsets.UTF_8),
)
}
deliveryAttempt?.let {
Expand Down Expand Up @@ -165,6 +166,18 @@ class SentryKafkaRecordInterceptorTest {
assertNull(transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RETRY_COUNT))
}

@Test
fun `sets receive latency from enqueued time in epoch seconds`() {
val interceptor = SentryKafkaRecordInterceptor<String, String>(scopes)
val enqueuedTime = (System.currentTimeMillis() / 1000.0 - 1.0).toString()
val record = createRecordWithHeaders(enqueuedTime = enqueuedTime)

withMockSentry { interceptor.intercept(record, consumer) }

val latency = transaction.data?.get(SpanDataConvention.MESSAGING_MESSAGE_RECEIVE_LATENCY)
assertTrue(latency is Long && latency >= 0)
}

@Test
fun `does not create span when queue tracing is disabled`() {
options.isEnableQueueTracing = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class SentryProducerInterceptorTest {
val enqueuedTimeHeader =
resultHeaders.lastHeader(SentryProducerInterceptor.SENTRY_ENQUEUED_TIME_HEADER)
assertNotNull(enqueuedTimeHeader, "sentry-task-enqueued-time header should be injected")
val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toLong()
assertTrue(enqueuedTime > 0, "enqueued time should be a positive epoch millis value")
val enqueuedTime = String(enqueuedTimeHeader.value(), StandardCharsets.UTF_8).toDouble()
assertTrue(enqueuedTime > 0, "enqueued time should be a positive epoch seconds value")
}

@Test
Expand Down
Loading