Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -63,6 +64,11 @@ public class JaegerSpanNormalizer {
private final JaegerResourceNormalizer resourceNormalizer = new JaegerResourceNormalizer();
private final TenantIdHandler tenantIdHandler;

// measure the difference between span's start time and its processing time
private static final String SPAN_ARRIVAL_DELAY = "span.arrival.delay";
private final ConcurrentMap<String, Timer> tenantToSpanArrivalDelayTimer =
new ConcurrentHashMap<>();

public static JaegerSpanNormalizer get(Config config) {
if (INSTANCE == null) {
synchronized (JaegerSpanNormalizer.class) {
Expand All @@ -82,6 +88,10 @@ public Timer getSpanNormalizationTimer(String tenantId) {
return tenantToSpanNormalizationTimer.get(tenantId);
}

public Timer getSpanArrivalDelayTimer(String tenantId) {
return tenantToSpanArrivalDelayTimer.get(tenantId);
}

@Nullable
public RawSpan convert(String tenantId, Span jaegerSpan) throws Exception {
Map<String, KeyValue> tags =
Expand All @@ -102,6 +112,9 @@ public RawSpan convert(String tenantId, Span jaegerSpan) throws Exception {
private Callable<RawSpan> getRawSpanNormalizerCallable(
Span jaegerSpan, Map<String, KeyValue> spanTags, String tenantId) {
return () -> {
long spanProcessedTime = System.currentTimeMillis();
long spanStartTime = Timestamps.toMillis(jaegerSpan.getStartTime());

Builder rawSpanBuilder = fastNewBuilder(RawSpan.Builder.class);
rawSpanBuilder.setCustomerId(tenantId);
rawSpanBuilder.setTraceId(jaegerSpan.getTraceId().asReadOnlyByteBuffer());
Expand All @@ -113,7 +126,7 @@ private Callable<RawSpan> getRawSpanNormalizerCallable(
spanTags,
tenantIdHandler.getTenantIdProvider().getTenantIdTagKey());
rawSpanBuilder.setEvent(event);
rawSpanBuilder.setReceivedTimeMillis(System.currentTimeMillis());
rawSpanBuilder.setReceivedTimeMillis(spanProcessedTime);
resourceNormalizer
.normalize(jaegerSpan, tenantIdHandler.getTenantIdProvider().getTenantIdTagKey())
.ifPresent(rawSpanBuilder::setResource);
Expand All @@ -123,6 +136,16 @@ private Callable<RawSpan> getRawSpanNormalizerCallable(
if (LOG.isDebugEnabled()) {
logSpanConversion(jaegerSpan, rawSpan);
}

// register and update timer per tenant
// it uses absolute value to take care of any clock skewness too.
tenantToSpanArrivalDelayTimer
.computeIfAbsent(
tenantId,
tenant ->
PlatformMetricsRegistry.registerTimer(
SPAN_ARRIVAL_DELAY, Map.of("tenantId", tenant)))
.record(Math.abs(spanProcessedTime - spanStartTime), TimeUnit.MILLISECONDS);
return rawSpan;
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ PreProcessedSpan preProcessSpan(Span span) {
return null;
}

// should drop late span

return new PreProcessedSpan(tenantId, span);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ public void testServiceNameNotAddedToEvent() throws Exception {

// Assert that metrics are collected.
Assertions.assertEquals(1, timer.count());

Timer timer1 = normalizer.getSpanArrivalDelayTimer(tenantId);
Assertions.assertNotNull(timer1);

// Assert that metrics are collected.
Assertions.assertEquals(1, timer1.count());
}

@Test
Expand Down