Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions span-normalizer/helm/templates/span-normalizer-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ data:
bypass.key = "{{ .Values.spanNormalizerConfig.processor.bypassKey }}"
{{- end }}

{{- if hasKey .Values.spanNormalizerConfig.processor "lateArrivalThresholdDuration" }}
late.arrival.threshold.duration = "{{ .Values.spanNormalizerConfig.processor.lateArrivalThresholdDuration }}"
{{- end }}

{{- if hasKey .Values.spanNormalizerConfig.processor "rootExitSpanDropCriterion" }}
rootExitSpanDropCriterion = {{ .Values.spanNormalizerConfig.processor.rootExitSpanDropCriterion | toJson }}
{{- end }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import static org.hypertrace.core.spannormalizer.constants.SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG;

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.util.Timestamps;
import com.typesafe.config.Config;
import io.jaegertracing.api_v2.JaegerSpanInternalModel;
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
import io.micrometer.core.instrument.Counter;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -24,13 +27,18 @@ public class JaegerSpanPreProcessor

static final String SPANS_COUNTER = "hypertrace.reported.spans";
private static final String DROPPED_SPANS_COUNTER = "hypertrace.reported.spans.dropped";
private static final String IS_LATE_ARRIVAL_SPANS_TAGS = "is_late_arrival_spans";
private static final String LATE_ARRIVAL_THRESHOLD_CONFIG_KEY =
"processor.late.arrival.threshold.duration";
private static final Logger LOG = LoggerFactory.getLogger(JaegerSpanPreProcessor.class);
private static final ConcurrentMap<String, Counter> statusToSpansCounter =
new ConcurrentHashMap<>();
private static final ConcurrentMap<String, Counter> tenantToSpansDroppedCount =
new ConcurrentHashMap<>();
private static final Duration minArrivalThreshold = Duration.of(30, ChronoUnit.SECONDS);
private TenantIdHandler tenantIdHandler;
private SpanFilter spanFilter;
private Duration lateArrivalThresholdDuration;

public JaegerSpanPreProcessor() {
// empty constructor
Expand All @@ -40,13 +48,24 @@ public JaegerSpanPreProcessor() {
JaegerSpanPreProcessor(Config jobConfig) {
tenantIdHandler = new TenantIdHandler(jobConfig);
spanFilter = new SpanFilter(jobConfig);
lateArrivalThresholdDuration = configureLateArrivalThreshold(jobConfig);
}

@Override
public void init(ProcessorContext context) {
Config jobConfig = (Config) context.appConfigs().get(SPAN_NORMALIZER_JOB_CONFIG);
tenantIdHandler = new TenantIdHandler(jobConfig);
spanFilter = new SpanFilter(jobConfig);
lateArrivalThresholdDuration = configureLateArrivalThreshold(jobConfig);
}

private Duration configureLateArrivalThreshold(Config jobConfig) {
Duration configuredThreshold = jobConfig.getDuration(LATE_ARRIVAL_THRESHOLD_CONFIG_KEY);
if (minArrivalThreshold.compareTo(configuredThreshold) > 0) {
throw new IllegalArgumentException(
"the value of " + "processor.late.arrival.threshold.duration should be higher than 30s");
}
return configuredThreshold;
}

@Override
Expand Down Expand Up @@ -112,6 +131,24 @@ PreProcessedSpan preProcessSpan(Span span) {
return null;
}

// drop the span if the arrival time of it too old than configured threshold
long spanProcessedTime = System.currentTimeMillis();
long spanStartTime = Timestamps.toMillis(span.getStartTime());
Duration spanArrivalDelay =
Duration.of(Math.abs(spanProcessedTime - spanStartTime), ChronoUnit.MILLIS);

if (spanStartTime > 0 && spanArrivalDelay.compareTo(lateArrivalThresholdDuration) > 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition spanStartTime > 0 is required as default proto value for the field is 0. So, if there are spans when it is not set value, they are considered valid for backward compatibility.

tenantToSpansDroppedCount
.computeIfAbsent(
tenantId,
tenant ->
PlatformMetricsRegistry.registerCounter(
DROPPED_SPANS_COUNTER,
Map.of("tenantId", tenantId, IS_LATE_ARRIVAL_SPANS_TAGS, "true")))
.increment();
return null;
}

return new PreProcessedSpan(tenantId, span);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ kafka.streams.config = {

processor {
defaultTenantId = ${?DEFAULT_TENANT_ID}
late.arrival.threshold.duration = 365d
}

logger.names = ["file"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import io.jaegertracing.api_v2.JaegerSpanInternalModel;
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Log;
import io.jaegertracing.api_v2.JaegerSpanInternalModel.Span;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -400,4 +402,94 @@ public void whenByPassedExpectStructuredTraceToBeOutput() {
logEvents = (LogEvents) rawLogOutputTopic.readKeyValue().value;
Assertions.assertEquals(1, logEvents.getLogEvents().size());
}

@Test
@SetEnvironmentVariable(key = "SERVICE_NAME", value = "span-normalizer")
public void testLaterArrivalJaegerSpans() {
Config config =
ConfigFactory.parseURL(
getClass().getClassLoader().getResource("configs/span-normalizer/application.conf"));

Map<String, Object> mergedProps = new HashMap<>();
underTest.getBaseStreamsConfig().forEach(mergedProps::put);
underTest.getStreamsConfig(config).forEach(mergedProps::put);
mergedProps.put(SpanNormalizerConstants.SPAN_NORMALIZER_JOB_CONFIG, config);

StreamsBuilder streamsBuilder =
underTest.buildTopology(mergedProps, new StreamsBuilder(), new HashMap<>());

Properties props = new Properties();
mergedProps.forEach(props::put);

TopologyTestDriver td = new TopologyTestDriver(streamsBuilder.build(), props);
TestInputTopic<byte[], Span> inputTopic =
td.createInputTopic(
config.getString(SpanNormalizerConstants.INPUT_TOPIC_CONFIG_KEY),
Serdes.ByteArray().serializer(),
new JaegerSpanSerde().serializer());

Serde<RawSpan> rawSpanSerde = new AvroSerde<>();
rawSpanSerde.configure(Map.of(), false);

Serde<TraceIdentity> spanIdentitySerde = new AvroSerde<>();
spanIdentitySerde.configure(Map.of(), true);

TestOutputTopic outputTopic =
td.createOutputTopic(
config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_CONFIG_KEY),
spanIdentitySerde.deserializer(),
rawSpanSerde.deserializer());

TestOutputTopic rawLogOutputTopic =
td.createOutputTopic(
config.getString(SpanNormalizerConstants.OUTPUT_TOPIC_RAW_LOGS_CONFIG_KEY),
spanIdentitySerde.deserializer(),
new AvroSerde<>().deserializer());

// case 1: within threshold, expect output
Instant instant = Instant.now();
Span span =
Span.newBuilder()
.setSpanId(ByteString.copyFrom("1".getBytes()))
.setTraceId(ByteString.copyFrom("trace-1".getBytes()))
.setStartTime(Timestamp.newBuilder().setSeconds(instant.getEpochSecond()).build())
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("jaeger.servicename")
.setVStr(SERVICE_NAME)
.build())
.build();
inputTopic.pipeInput(span);

KeyValue<TraceIdentity, RawSpan> kv = outputTopic.readKeyValue();
assertEquals("__default", kv.key.getTenantId());
assertEquals(
HexUtils.getHex(ByteString.copyFrom("trace-1".getBytes()).toByteArray()),
HexUtils.getHex(kv.key.getTraceId().array()));
RawSpan value = kv.value;
assertEquals(HexUtils.getHex("1".getBytes()), HexUtils.getHex((value).getEvent().getEventId()));
assertEquals(SERVICE_NAME, value.getEvent().getServiceName());

// outside threshold, except no output to RawSpan
Instant instant1 = Instant.now().minus(25, ChronoUnit.HOURS);
Span span2 =
Span.newBuilder()
.setSpanId(ByteString.copyFrom("2".getBytes()))
.setTraceId(ByteString.copyFrom("trace-2".getBytes()))
.setStartTime(Timestamp.newBuilder().setSeconds(instant1.getEpochSecond()).build())
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("jaeger.servicename")
.setVStr(SERVICE_NAME)
.build())
.addTags(
JaegerSpanInternalModel.KeyValue.newBuilder()
.setKey("http.method")
.setVStr("GET")
.build())
.build();

inputTopic.pipeInput(span2);
Assertions.assertTrue(outputTopic.isEmpty());
}
}
Loading