diff --git a/span-normalizer/helm/templates/span-normalizer-config.yaml b/span-normalizer/helm/templates/span-normalizer-config.yaml index 19da60f44..cbfa5703c 100644 --- a/span-normalizer/helm/templates/span-normalizer-config.yaml +++ b/span-normalizer/helm/templates/span-normalizer-config.yaml @@ -106,3 +106,14 @@ data: expireAfterWriteDuration = {{ .Values.spanNormalizerConfig.excludeSpanRulesConfig.cache.expireAfterWriteDuration }} } } + {{- if hasKey .Values.spanNormalizerConfig "rateLimitConfig" }} + rate.limit.config = [ + {{- range $k,$v := $.Values.spanNormalizerConfig.rateLimitConfig }} + { + tenantId = {{ $v.tenantId }} + groupingKey = {{ $v.groupingKey }} + maxSpansPerMinute = {{ $v.maxSpansPerMinute }} + }, + {{- end }} + ] + {{- end }} diff --git a/span-normalizer/helm/values.yaml b/span-normalizer/helm/values.yaml index 5617b1aa6..310137137 100644 --- a/span-normalizer/helm/values.yaml +++ b/span-normalizer/helm/values.yaml @@ -106,6 +106,7 @@ spanNormalizerConfig: cache: refreshAfterWriteDuration: 3m expireAfterWriteDuration: 5m + rateLimitConfig: [] logConfig: name: span-normalizer-log-appender-config diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/RateLimitingSpanFilter.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/RateLimitingSpanFilter.java new file mode 100644 index 000000000..d29d1a39e --- /dev/null +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/RateLimitingSpanFilter.java @@ -0,0 +1,60 @@ +package org.hypertrace.core.spannormalizer.jaeger; + +import com.typesafe.config.Config; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.hypertrace.core.datamodel.Event; + +public class RateLimitingSpanFilter { + + private static final String RATE_LIMIT_CONFIG_PATH = "rate.limit.config"; + private static final String TENANT_ID_KEY = "tenantId"; + private static final String GROUPING_KEY_KEY = "groupingKey"; + private static final String MAX_SPANS_PER_MINUTE_KEY = "maxSpansPerMinute"; + private static final long SPAN_COUNT_WINDOW = 3600; // fixed at 1minute + + Map> tenantSpanCountMap = new HashMap<>(); + Map tenantMaxSpansPerMinuteMap = new HashMap<>(); + Set seenAttributes = new HashSet<>(); + + public RateLimitingSpanFilter(Config config) { + for (Config rateLimitConfig : config.getConfigList(RATE_LIMIT_CONFIG_PATH)) { + String tenantId = rateLimitConfig.getString(TENANT_ID_KEY); + String groupingKey = rateLimitConfig.getString(GROUPING_KEY_KEY); + String key = generateKey(tenantId, groupingKey); + tenantMaxSpansPerMinuteMap.put(key, rateLimitConfig.getLong(MAX_SPANS_PER_MINUTE_KEY)); + tenantSpanCountMap.put(key, Map.of(0L, 0L)); + seenAttributes.add(groupingKey); + } + } + + public boolean shouldDropSpan(String tenantId, Event event) { + boolean shouldDropSpan = false; + for (String attribute : seenAttributes) { + String key = generateKey(tenantId, attribute); + if (!tenantMaxSpansPerMinuteMap.containsKey(key) + || !event.getAttributes().getAttributeMap().containsKey(attribute) + || !tenantSpanCountMap.containsKey(key)) { + continue; + } + + Long startTimeKey = tenantSpanCountMap.get(key).keySet().iterator().next(); + if (event.getStartTimeMillis() - startTimeKey > SPAN_COUNT_WINDOW) { + tenantSpanCountMap.put(key, Map.of(event.getStartTimeMillis(), 1L)); + } else { + long currentProcessedSpansCount = tenantSpanCountMap.get(key).get(startTimeKey); + if (currentProcessedSpansCount >= tenantMaxSpansPerMinuteMap.get(key)) { + shouldDropSpan = true; + } + tenantSpanCountMap.put(key, Map.of(startTimeKey, currentProcessedSpansCount + 1)); + } + } + return shouldDropSpan; + } + + private String generateKey(String first, String second) { + return first + "/" + second; + } +} diff --git a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropManager.java b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropManager.java index a265ca3b3..931876d90 100644 --- a/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropManager.java +++ b/span-normalizer/span-normalizer/src/main/java/org/hypertrace/core/spannormalizer/jaeger/SpanDropManager.java @@ -36,11 +36,13 @@ public class SpanDropManager { private List tenantIdsToExclude; private static final Duration minArrivalThreshold = Duration.of(30, ChronoUnit.SECONDS); private final Duration lateArrivalThresholdDuration; + private RateLimitingSpanFilter rateLimitingSpanFilter; public SpanDropManager(Config config) { tenantIdHandler = new TenantIdHandler(config); spanFilter = new SpanFilter(config); excludeSpanRuleEvaluator = new ExcludeSpanRuleEvaluator(config); + rateLimitingSpanFilter = new RateLimitingSpanFilter(config); lateArrivalThresholdDuration = configureLateArrivalThreshold(config); tenantIdsToExclude = config.hasPath(TENANT_IDS_TO_EXCLUDE_CONFIG) @@ -57,6 +59,7 @@ public SpanDropManager(Config config) { spanFilter = new SpanFilter(config); excludeSpanRuleEvaluator = new ExcludeSpanRuleEvaluator(excludeSpanRulesCache); lateArrivalThresholdDuration = configureLateArrivalThreshold(config); + rateLimitingSpanFilter = new RateLimitingSpanFilter(config); tenantIdsToExclude = config.hasPath(TENANT_IDS_TO_EXCLUDE_CONFIG) ? config.getStringList(TENANT_IDS_TO_EXCLUDE_CONFIG) @@ -82,13 +85,29 @@ public boolean shouldDropSpan(JaegerSpanInternalModel.Span span, Event event, St .collect(Collectors.toMap(t -> t.getKey().toLowerCase(), t -> t, (v1, v2) -> v2)); // TODO: Eventually get rid of span filter and tenantID based filter - return shouldDropSpansBasedOnTenantIdFilter(tenantId) + return shouldDropSpanBasedOnRateLimitConfig(tenantId, event) + || shouldDropSpansBasedOnTenantIdFilter(tenantId) || shouldDropSpansBasedOnSpanFilter(tenantId, span, spanTags, processTags) // event is needed to evaluate the first class field related relational filters || shouldDropSpansBasedOnExcludeRules(tenantId, event, spanTags, processTags) || shouldDropSpansBasedOnLateArrival(tenantId, span); } + private boolean shouldDropSpanBasedOnRateLimitConfig(String tenantId, Event event) { + if (rateLimitingSpanFilter.shouldDropSpan(tenantId, event)) { + // increment dropped counter at tenant level + tenantToSpansDroppedCount + .computeIfAbsent( + tenantId, + tenant -> + PlatformMetricsRegistry.registerCounter( + DROPPED_SPANS_COUNTER, Map.of("tenantId", tenantId))) + .increment(); + return true; + } + return false; + } + private boolean shouldDropSpansBasedOnSpanFilter( String tenantId, JaegerSpanInternalModel.Span span, diff --git a/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf b/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf index 1fd14764f..614ce2b4d 100644 --- a/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf +++ b/span-normalizer/span-normalizer/src/main/resources/configs/common/application.conf @@ -61,3 +61,5 @@ span.rules.exclude { expireAfterWriteDuration = 5m } } + +rate.limit.config = [] \ No newline at end of file diff --git a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java index 081477b62..fcfa6c8d1 100644 --- a/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java +++ b/span-normalizer/span-normalizer/src/test/java/org/hypertrace/core/spannormalizer/jaeger/JaegerSpanPreProcessorTest.java @@ -1658,6 +1658,71 @@ public void testTagsFiltersExpectsNoExceptionForNotSetValue() { Assertions.assertFalse(preProcessedSpan.getSpan().getTagsList().contains(notAllowed3)); } + @Test + public void testRateLimitBasedSpanFilter() { + String tenantId1 = "tenant-" + random.nextLong(); + String tenantId2 = "tenant-" + random.nextLong(); + Map configs = new HashMap<>(getCommonConfig()); + configs.putAll( + Map.of( + "processor", + Map.of( + "tenantIdTagKey", + "tenant-key", + "spanDropFilters", + Collections.emptyList(), + "late.arrival.threshold.duration", + "1d"), + "rate.limit.config", + List.of( + Map.of("tenantId", tenantId1, "groupingKey", "servicename", "maxSpansPerMinute", 2), + Map.of( + "tenantId", tenantId2, "groupingKey", "http.method", "maxSpansPerMinute", 0)))); + + JaegerSpanPreProcessor jaegerSpanPreProcessor = + new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs), excludeSpanRulesCache); + + Process process = + Process.newBuilder() + .setServiceName("testService") + .addTags(KeyValue.newBuilder().setKey("tenant-key").setVStr(tenantId1).build()) + .build(); + + Span span = + Span.newBuilder() + .setProcess(process) + .addTags( + KeyValue.newBuilder() + .setKey("http.url") + .setVStr("http://xyz.com/api/v1/health/check") + .build()) + .build(); + PreProcessedSpan preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span); + Assertions.assertNotNull(preProcessedSpan); + preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span); + Assertions.assertNotNull(preProcessedSpan); + preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span); + Assertions.assertNull(preProcessedSpan); + + process = + Process.newBuilder() + .setServiceName("testService") + .addTags(KeyValue.newBuilder().setKey("tenant-key").setVStr(tenantId2).build()) + .build(); + + span = + Span.newBuilder() + .setProcess(process) + .addTags( + KeyValue.newBuilder() + .setKey("http.url") + .setVStr("http://xyz.com/api/v1/health/check") + .build()) + .build(); + preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span); + Assertions.assertNotNull(preProcessedSpan); + } + private Map getCommonConfig() { return Map.of( "span.type", @@ -1677,7 +1742,9 @@ private Map getCommonConfig() { "clients", Map.of("config.service.config", Map.of("host", "localhost", "port", 50101)), "span.rules.exclude.cache", - Map.of("refreshAfterWriteDuration", "3m", "expireAfterWriteDuration", "5m")); + Map.of("refreshAfterWriteDuration", "3m", "expireAfterWriteDuration", "5m"), + "rate.limit.config", + List.of()); } private static SpanFilter buildRelationalFilter( diff --git a/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf b/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf index 3f622dc1c..c4e46e53e 100644 --- a/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf +++ b/span-normalizer/span-normalizer/src/test/resources/configs/span-normalizer/application.conf @@ -83,3 +83,4 @@ span.rules.exclude { } } +rate.limit.config = []