diff --git a/span-normalizer/helm/templates/span-normalizer-config.yaml b/span-normalizer/helm/templates/span-normalizer-config.yaml index 3e6d70502..71cb84e8e 100644 --- a/span-normalizer/helm/templates/span-normalizer-config.yaml +++ b/span-normalizer/helm/templates/span-normalizer-config.yaml @@ -65,6 +65,10 @@ data: bypass.key = "{{ .Values.spanNormalizerConfig.processor.bypassKey }}" {{- end }} + {{- if hasKey .Values.spanNormalizerConfig.processor "lateArrivalThresholdDuration" }} + late.arrival.threshold.duration = "{{ .Values.spanNormalizerConfig.processor.lateArrivalThresholdDuration }}" + {{- end }} + {{- if hasKey .Values.spanNormalizerConfig.processor "rootExitSpanDropCriterion" }} rootExitSpanDropCriterion = {{ .Values.spanNormalizerConfig.processor.rootExitSpanDropCriterion | toJson }} {{- end }} diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java index 4a1d1b500..4e1040969 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessor.java @@ -3,10 +3,13 @@ import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.util.Timestamps; import com.typesafe.config.Config; import io.jaegertracing.api_v2.JaegerSpanInternalModel; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; import io.micrometer.core.instrument.Counter; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; @@ -24,13 +27,18 @@ public class JaegerSpanPreProcessor static final String SPANS_COUNTER = "hypertrace.reported.spans"; private static final String DROPPED_SPANS_COUNTER = "hypertrace.reported.spans.dropped"; + private static final String IS_LATE_ARRIVAL_SPANS_TAGS = "is_late_arrival_spans"; + private static final String LATE_ARRIVAL_THRESHOLD_CONFIG_KEY = + "processor.late.arrival.threshold.duration"; private static final Logger LOG = LoggerFactory.getLogger(JaegerSpanPreProcessor.class); private static final ConcurrentMap statusToSpansCounter = new ConcurrentHashMap<>(); private static final ConcurrentMap tenantToSpansDroppedCount = new ConcurrentHashMap<>(); + private static final Duration minArrivalThreshold = Duration.of(30, ChronoUnit.SECONDS); private TenantIdHandler tenantIdHandler; private SpanFilter spanFilter; + private Duration lateArrivalThresholdDuration; public JaegerSpanPreProcessor() { // empty constructor @@ -40,6 +48,7 @@ public JaegerSpanPreProcessor() { JaegerSpanPreProcessor(Config jobConfig) { tenantIdHandler = new TenantIdHandler(jobConfig); spanFilter = new SpanFilter(jobConfig); + lateArrivalThresholdDuration = configureLateArrivalThreshold(jobConfig); } @Override @@ -47,6 +56,16 @@ public void init(ProcessorContext context) { Config jobConfig = (Config) context.appConfigs().get(SPAN_NORMALIZER_JOB_CONFIG); tenantIdHandler = new TenantIdHandler(jobConfig); spanFilter = new SpanFilter(jobConfig); + lateArrivalThresholdDuration = configureLateArrivalThreshold(jobConfig); + } + + private Duration configureLateArrivalThreshold(Config jobConfig) { + Duration configuredThreshold = jobConfig.getDuration(LATE_ARRIVAL_THRESHOLD_CONFIG_KEY); + if (minArrivalThreshold.compareTo(configuredThreshold) > 0) { + throw new IllegalArgumentException( + "the value of " + "processor.late.arrival.threshold.duration should be higher than 30s"); + } + return configuredThreshold; } @Override @@ -112,6 +131,24 @@ PreProcessedSpan preProcessSpan(Span span) { return null; } + // drop the span if the arrival time of it too old than configured threshold + long spanProcessedTime = System.currentTimeMillis(); + long spanStartTime = Timestamps.toMillis(span.getStartTime()); + Duration spanArrivalDelay = + Duration.of(Math.abs(spanProcessedTime - spanStartTime), ChronoUnit.MILLIS); + + if (spanStartTime > 0 && spanArrivalDelay.compareTo(lateArrivalThresholdDuration) > 0) { + tenantToSpansDroppedCount + .computeIfAbsent( + tenantId, + tenant -> + PlatformMetricsRegistry.registerCounter( + DROPPED_SPANS_COUNTER, + Map.of("tenantId", tenantId, IS_LATE_ARRIVAL_SPANS_TAGS, "true"))) + .increment(); + return null; + } + return new PreProcessedSpan(tenantId, span); } diff --git a/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf b/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf index 7768874a6..1135f40e2 100644 --- a/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf +++ b/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf @@ -26,6 +26,7 @@ kafka.streams.config = { processor { defaultTenantId = ${?DEFAULT_TENANT_ID} + late.arrival.threshold.duration = 365d } logger.names = ["file"] diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/SpanNormalizerTest.java b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/SpanNormalizerTest.java index 9c7c0454c..441bb7e54 100644 --- a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/SpanNormalizerTest.java +++ b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/SpanNormalizerTest.java @@ -12,6 +12,8 @@ import io.jaegertracing.api_v2.JaegerSpanInternalModel; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Log; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Properties; @@ -400,4 +402,94 @@ public void whenByPassedExpectStructuredTraceToBeOutput() { logEvents = (LogEvents) rawLogOutputTopic.readKeyValue().value; Assertions.assertEquals(1, logEvents.getLogEvents().size()); } + + @Test + @SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer") + public void testLaterArrivalJaegerSpans() { + Config config = + ConfigFactory.parseURL( + getClass().getClassLoader().getResource("configs/span-normalizer/application.conf")); + + Map mergedProps = new HashMap<>(); + underTest.getBaseStreamsConfig().forEach(mergedProps::put); + underTest.getStreamsConfig(config).forEach(mergedProps::put); + mergedProps.put(SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG, config); + + StreamsBuilder streamsBuilder = + underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>()); + + Properties props = new Properties(); + mergedProps.forEach(props::put); + + TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props); + TestInputTopic inputTopic = + td.createInputTopic( + config.getString(SpanNormalizerConstants.INPUT_TOPIC_CONFIG_KEY), + Serdes.ByteArray().serializer(), + new JaegerSpanSerde().serializer()); + + Serde rawSpanSerde = new AvroSerde<>(); + rawSpanSerde.configure(Map.of(), false); + + Serde spanIdentitySerde = new AvroSerde<>(); + spanIdentitySerde.configure(Map.of(), true); + + TestOutputTopic outputTopic = + td.createOutputTopic( + config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_CONFIG_KEY), + spanIdentitySerde.deserializer(), + rawSpanSerde.deserializer()); + + TestOutputTopic rawLogOutputTopic = + td.createOutputTopic( + config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY), + spanIdentitySerde.deserializer(), + new AvroSerde<>().deserializer()); + + // case 1: within threshold, expect output + Instant instant = Instant.now(); + Span span = + Span.newBuilder() + .setSpanId(ByteString.copyFrom("1".getBytes())) + .setTraceId(ByteString.copyFrom("trace-1".getBytes())) + .setStartTime(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .build(); + inputTopic.pipeInput(span); + + KeyValue kv = outputTopic.readKeyValue(); + assertEquals("__default", kv.key.getTenantId()); + assertEquals( + HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()), + HexUtils.getHex(kv.key.getTraceId().array())); + RawSpan value = kv.value; + assertEquals(HexUtils.getHex("1".getBytes()), HexUtils.getHex((value).getEvent().getEventId())); + assertEquals(SERVICE_NAME, value.getEvent().getServiceName()); + + // outside threshold, except no output to RawSpan + Instant instant1 = Instant.now().minus(25, ChronoUnit.HOURS); + Span span2 = + Span.newBuilder() + .setSpanId(ByteString.copyFrom("2".getBytes())) + .setTraceId(ByteString.copyFrom("trace-2".getBytes())) + .setStartTime(Timestamp.newBuilder().setSeconds(instant1.getEpochSecond()).build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("jaeger.servicename") + .setVStr(SERVICE_NAME) + .build()) + .addTags( + JaegerSpanInternalModel.KeyValue.newBuilder() + .setKey("http.method") + .setVStr("GET") + .build()) + .build(); + + inputTopic.pipeInput(span2); + Assertions.assertTrue(outputTopic.isEmpty()); + } } diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java index 24ebc1464..9d0ad8d2d 100644 --- a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java +++ b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java @@ -2,10 +2,13 @@ import static org.junit.jupiter.api.Assertions.assertThrows; +import com.google.protobuf.Timestamp; import com.typesafe.config.ConfigFactory; import io.jaegertracing.api_v2.JaegerSpanInternalModel.KeyValue; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Process; import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -27,7 +30,7 @@ void testPreProcessSpan_missingTenantId() { // span dropped since tenant detail not present String tenantId = "tenant-" + random.nextLong(); Map configs = new HashMap<>(getCommonConfig()); - configs.putAll(Map.of("processor", Map.of())); + configs.putAll(Map.of("processor", Map.of("late.arrival.threshold.duration", "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); @@ -46,7 +49,10 @@ void testPreProcessSpan_validTenantId() { // default tenant id String tenantId = "tenant-" + random.nextLong(); Map configs = new HashMap<>(getCommonConfig()); - configs.putAll(Map.of("processor", Map.of("defaultTenantId", "default-tenant"))); + configs.putAll( + Map.of( + "processor", + Map.of("defaultTenantId", "default-tenant", "late.arrival.threshold.duration", "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); @@ -61,7 +67,10 @@ void testPreProcessSpan_validTenantId() { // provided tenant id in span tags configs = new HashMap<>(getCommonConfig()); - configs.putAll(Map.of("processor", Map.of("tenantIdTagKey", "tenant-key"))); + configs.putAll( + Map.of( + "processor", + Map.of("tenantIdTagKey", "tenant-key", "late.arrival.threshold.duration", "1d"))); jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); Span span2 = @@ -91,7 +100,13 @@ void testPreProcessSpan_excludeTenantId() { configs.putAll( Map.of( "processor", - Map.of("tenantIdTagKey", "tenant-key", "excludeTenantIds", List.of(tenantId)))); + Map.of( + "tenantIdTagKey", + "tenant-key", + "excludeTenantIds", + List.of(tenantId), + "late.arrival.threshold.duration", + "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); @@ -120,7 +135,13 @@ public void testSpanDropCriterion() { configs.putAll( Map.of( "processor", - Map.of("tenantIdTagKey", "tenant-key", "spanDropCriterion", List.of("foo:bar,k1:v1")))); + Map.of( + "tenantIdTagKey", + "tenant-key", + "spanDropCriterion", + List.of("foo:bar,k1:v1"), + "late.arrival.threshold.duration", + "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); Process process = Process.newBuilder().setServiceName("testService").build(); @@ -155,7 +176,9 @@ public void testDropSpanWithMultipleCriterion() { "tenantIdTagKey", "tenant-key", "spanDropCriterion", - List.of("foo:bar,k1:v1", "k2:v2", "http.url:https://foo.bar")))); + List.of("foo:bar,k1:v1", "k2:v2", "http.url:https://foo.bar"), + "late.arrival.threshold.duration", + "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); @@ -206,7 +229,14 @@ public void testDropSpanWithEmptyCriterion() { Map configs = new HashMap<>(getCommonConfig()); configs.putAll( Map.of( - "processor", Map.of("tenantIdTagKey", "tenant-key", "spanDropCriterion", List.of()))); + "processor", + Map.of( + "tenantIdTagKey", + "tenant-key", + "spanDropCriterion", + List.of(), + "late.arrival.threshold.duration", + "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); @@ -231,7 +261,8 @@ public void testDropSpan_RootSpan_EmptyExclusionList() { "processor", Map.of( "tenantIdTagKey", "tenant-key", - "rootExitSpanDropCriterion.alwaysDrop", "true"))); + "rootExitSpanDropCriterion.alwaysDrop", "true", + "late.arrival.threshold.duration", "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); @@ -281,7 +312,8 @@ public void testDropSpan_RootSpan_AlwaysDrop_ExclusionList() { "tenantIdTagKey", "tenant-key", "rootExitSpanDropCriterion.alwaysDrop", "true", "rootExitSpanDropCriterion.exclusionsMatchCriterion", - List.of("foo:bar,k1:v1", "k2:v2")))); + List.of("foo:bar,k1:v1", "k2:v2"), + "late.arrival.threshold.duration", "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); @@ -344,7 +376,8 @@ public void testDropSpan_RootSpan_NotAlwaysDrop_ExclusionList() { "tenantIdTagKey", "tenant-key", "rootExitSpanDropCriterion.alwaysDrop", "false", "rootExitSpanDropCriterion.exclusionsMatchCriterion", - List.of("foo:bar,k1:v1", "k2:v2")))); + List.of("foo:bar,k1:v1", "k2:v2"), + "late.arrival.threshold.duration", "1d"))); JaegerSpanPreProcessor jaegerSpanPreProcessor = new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); @@ -405,6 +438,8 @@ public void testSpanDropFilters() { Map.of( "tenantIdTagKey", "tenant-key", + "late.arrival.threshold.duration", + "1d", "spanDropFilters", List.of( List.of( @@ -506,6 +541,8 @@ public void testSpanDropFiltersWithCombinationOfProcessAndSpanTags() { Map.of( "tenantIdTagKey", "tenant-key", + "late.arrival.threshold.duration", + "1d", "spanDropFilters", List.of( List.of( @@ -720,6 +757,8 @@ public void testSpanDropFiltersBadConfig() { Map.of( "tenantIdTagKey", "tenant-key", + "late.arrival.threshold.duration", + "1d", "spanDropFilters", List.of( List.of( @@ -735,6 +774,59 @@ public void testSpanDropFiltersBadConfig() { }); } + @Test + public void testLateArrivalSpanWithConfiguredConfig() { + // case 1: 24 hrs config, span within range, should not drop + String tenantId = "tenant-" + random.nextLong(); + Map configs = new HashMap<>(getCommonConfig()); + configs.putAll( + Map.of( + "processor", + Map.of("defaultTenantId", tenantId, "late.arrival.threshold.duration", "24h"))); + JaegerSpanPreProcessor jaegerSpanPreProcessor = + new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); + + Instant instant = Instant.now(); + Process process = Process.newBuilder().setServiceName("testService").build(); + Span span = + Span.newBuilder() + .setProcess(process) + .setStartTime(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).build()) + .addTags(KeyValue.newBuilder().setKey("key").setVStr("Val").build()) + .build(); + PreProcessedSpan preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span); + Assertions.assertNotNull(preProcessedSpan); + Assertions.assertEquals(tenantId, preProcessedSpan.getTenantId()); + + // case 2: 24 hrs config, span too old, more than 25hrs, should drop + instant = Instant.now().minus(25, ChronoUnit.HOURS); + process = Process.newBuilder().setServiceName("testService").build(); + span = + Span.newBuilder() + .setProcess(process) + .setStartTime(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).build()) + .addTags(KeyValue.newBuilder().setKey("key").setVStr("Val").build()) + .build(); + preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span); + Assertions.assertNull(preProcessedSpan); + } + + @Test + public void testBadLateArrivalSpanConfig() { + assertThrows( + IllegalArgumentException.class, + () -> { + Map configs = new HashMap<>(getCommonConfig()); + configs.putAll( + Map.of( + "processor", + Map.of( + "tenantIdTagKey", "tenant-key", "late.arrival.threshold.duration", "20s"))); + JaegerSpanPreProcessor jaegerSpanPreProcessor = + new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs)); + }); + } + private Map getCommonConfig() { return Map.of( "span.type", diff --git a/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf b/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf index 458c4dcc6..05ae0bdb2 100644 --- a/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf +++ b/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf @@ -54,6 +54,7 @@ processor { processor { bypass.key = "test.bypass" + late.arrival.threshold.duration = "1d" }