diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizer.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizer.java index e3b490544..cb8a8909e 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizer.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizer.java @@ -21,6 +21,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; @@ -63,6 +64,11 @@ public class JaegerSpanNormalizer { private final JaegerResourceNormalizer resourceNormalizer = new JaegerResourceNormalizer(); private final TenantIdHandler tenantIdHandler; + // measure the difference between span's start time and its processing time + private static final String SPAN_ARRIVAL_DELAY = "span.arrival.delay"; + private final ConcurrentMap tenantToSpanArrivalDelayTimer = + new ConcurrentHashMap<>(); + public static JaegerSpanNormalizer get(Config config) { if (INSTANCE == null) { synchronized (JaegerSpanNormalizer.class) { @@ -82,6 +88,10 @@ public Timer getSpanNormalizationTimer(String tenantId) { return tenantToSpanNormalizationTimer.get(tenantId); } + public Timer getSpanArrivalDelayTimer(String tenantId) { + return tenantToSpanArrivalDelayTimer.get(tenantId); + } + @Nullable public RawSpan convert(String tenantId, Span jaegerSpan) throws Exception { Map tags = @@ -102,6 +112,9 @@ public RawSpan convert(String tenantId, Span jaegerSpan) throws Exception { private Callable getRawSpanNormalizerCallable( Span jaegerSpan, Map spanTags, String tenantId) { return () -> { + long spanProcessedTime = System.currentTimeMillis(); + long spanStartTime = Timestamps.toMillis(jaegerSpan.getStartTime()); + Builder rawSpanBuilder = fastNewBuilder(RawSpan.Builder.class); rawSpanBuilder.setCustomerId(tenantId); rawSpanBuilder.setTraceId(jaegerSpan.getTraceId().asReadOnlyByteBuffer()); @@ -113,7 +126,7 @@ private Callable getRawSpanNormalizerCallable( spanTags, tenantIdHandler.getTenantIdProvider().getTenantIdTagKey()); rawSpanBuilder.setEvent(event); - rawSpanBuilder.setReceivedTimeMillis(System.currentTimeMillis()); + rawSpanBuilder.setReceivedTimeMillis(spanProcessedTime); resourceNormalizer .normalize(jaegerSpan, tenantIdHandler.getTenantIdProvider().getTenantIdTagKey()) .ifPresent(rawSpanBuilder::setResource); @@ -123,6 +136,16 @@ private Callable getRawSpanNormalizerCallable( if (LOG.isDebugEnabled()) { logSpanConversion(jaegerSpan, rawSpan); } + + // register and update timer per tenant + // it uses absolute value to take care of any clock skewness too. + tenantToSpanArrivalDelayTimer + .computeIfAbsent( + tenantId, + tenant -> + PlatformMetricsRegistry.registerTimer( + SPAN_ARRIVAL_DELAY, Map.of("tenantId", tenant))) + .record(Math.abs(spanProcessedTime - spanStartTime), TimeUnit.MILLISECONDS); return rawSpan; }; } diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java index 4a1d1b500..56d46ad5a 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java @@ -112,6 +112,8 @@ PreProcessedSpan preProcessSpan(Span span) { return null; } + // should drop late span + return new PreProcessedSpan(tenantId, span); } diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java index 4020d1af3..7d170015e 100644 --- a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java +++ b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanNormalizerTest.java @@ -219,6 +219,12 @@ public void testServiceNameNotAddedToEvent() throws Exception { // Assert that metrics are collected. Assertions.assertEquals(1, timer.count()); + + Timer timer1 = normalizer.getSpanArrivalDelayTimer(tenantId); + Assertions.assertNotNull(timer1); + + // Assert that metrics are collected. + Assertions.assertEquals(1, timer1.count()); } @Test