Skip to content

Commit

Permalink
Rate limiting based span drop (#318)
Browse files Browse the repository at this point in the history
* add rate limiting config based span drop

* additional test

* helm config

* fix config

* add span drop counter

* remove extra lines

* fix indentation helm

* address review comments

* refactor
  • Loading branch information
SrikarMannepalli committed Apr 13, 2022
1 parent 3fe1e7a commit 7b1c78b
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 2 deletions.
11 changes: 11 additions & 0 deletions span-normalizer/helm/templates/span-normalizer-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,14 @@ data:
expireAfterWriteDuration = {{ .Values.spanNormalizerConfig.excludeSpanRulesConfig.cache.expireAfterWriteDuration }}
}
}
{{- if hasKey .Values.spanNormalizerConfig "rateLimitConfig" }}
rate.limit.config = [
{{- range $k,$v := $.Values.spanNormalizerConfig.rateLimitConfig }}
{
tenantId = {{ $v.tenantId }}
groupingKey = {{ $v.groupingKey }}
maxSpansPerMinute = {{ $v.maxSpansPerMinute }}
},
{{- end }}
]
{{- end }}
1 change: 1 addition & 0 deletions span-normalizer/helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ spanNormalizerConfig:
cache:
refreshAfterWriteDuration: 3m
expireAfterWriteDuration: 5m
rateLimitConfig: []

logConfig:
name: span-normalizer-log-appender-config
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package org.hypertrace.core.spannormalizer.jaeger;

import com.typesafe.config.Config;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.hypertrace.core.datamodel.Event;

public class RateLimitingSpanFilter {

private static final String RATE_LIMIT_CONFIG_PATH = "rate.limit.config";
private static final String TENANT_ID_KEY = "tenantId";
private static final String GROUPING_KEY_KEY = "groupingKey";
private static final String MAX_SPANS_PER_MINUTE_KEY = "maxSpansPerMinute";
private static final long SPAN_COUNT_WINDOW = 3600; // fixed at 1minute

Map<String, Map<Long, Long>> tenantSpanCountMap = new HashMap<>();
Map<String, Long> tenantMaxSpansPerMinuteMap = new HashMap<>();
Set<String> seenAttributes = new HashSet<>();

public RateLimitingSpanFilter(Config config) {
for (Config rateLimitConfig : config.getConfigList(RATE_LIMIT_CONFIG_PATH)) {
String tenantId = rateLimitConfig.getString(TENANT_ID_KEY);
String groupingKey = rateLimitConfig.getString(GROUPING_KEY_KEY);
String key = generateKey(tenantId, groupingKey);
tenantMaxSpansPerMinuteMap.put(key, rateLimitConfig.getLong(MAX_SPANS_PER_MINUTE_KEY));
tenantSpanCountMap.put(key, Map.of(0L, 0L));
seenAttributes.add(groupingKey);
}
}

public boolean shouldDropSpan(String tenantId, Event event) {
boolean shouldDropSpan = false;
for (String attribute : seenAttributes) {
String key = generateKey(tenantId, attribute);
if (!tenantMaxSpansPerMinuteMap.containsKey(key)
|| !event.getAttributes().getAttributeMap().containsKey(attribute)
|| !tenantSpanCountMap.containsKey(key)) {
continue;
}

Long startTimeKey = tenantSpanCountMap.get(key).keySet().iterator().next();
if (event.getStartTimeMillis() - startTimeKey > SPAN_COUNT_WINDOW) {
tenantSpanCountMap.put(key, Map.of(event.getStartTimeMillis(), 1L));
} else {
long currentProcessedSpansCount = tenantSpanCountMap.get(key).get(startTimeKey);
if (currentProcessedSpansCount >= tenantMaxSpansPerMinuteMap.get(key)) {
shouldDropSpan = true;
}
tenantSpanCountMap.put(key, Map.of(startTimeKey, currentProcessedSpansCount + 1));
}
}
return shouldDropSpan;
}

private String generateKey(String first, String second) {
return first + "/" + second;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ public class SpanDropManager {
private List<String> tenantIdsToExclude;
private static final Duration minArrivalThreshold = Duration.of(30, ChronoUnit.SECONDS);
private final Duration lateArrivalThresholdDuration;
private RateLimitingSpanFilter rateLimitingSpanFilter;

public SpanDropManager(Config config) {
tenantIdHandler = new TenantIdHandler(config);
spanFilter = new SpanFilter(config);
excludeSpanRuleEvaluator = new ExcludeSpanRuleEvaluator(config);
rateLimitingSpanFilter = new RateLimitingSpanFilter(config);
lateArrivalThresholdDuration = configureLateArrivalThreshold(config);
tenantIdsToExclude =
config.hasPath(TENANT_IDS_TO_EXCLUDE_CONFIG)
Expand All @@ -57,6 +59,7 @@ public SpanDropManager(Config config) {
spanFilter = new SpanFilter(config);
excludeSpanRuleEvaluator = new ExcludeSpanRuleEvaluator(excludeSpanRulesCache);
lateArrivalThresholdDuration = configureLateArrivalThreshold(config);
rateLimitingSpanFilter = new RateLimitingSpanFilter(config);
tenantIdsToExclude =
config.hasPath(TENANT_IDS_TO_EXCLUDE_CONFIG)
? config.getStringList(TENANT_IDS_TO_EXCLUDE_CONFIG)
Expand All @@ -82,13 +85,29 @@ public boolean shouldDropSpan(JaegerSpanInternalModel.Span span, Event event, St
.collect(Collectors.toMap(t -> t.getKey().toLowerCase(), t -> t, (v1, v2) -> v2));

// TODO: Eventually get rid of span filter and tenantID based filter
return shouldDropSpansBasedOnTenantIdFilter(tenantId)
return shouldDropSpanBasedOnRateLimitConfig(tenantId, event)
|| shouldDropSpansBasedOnTenantIdFilter(tenantId)
|| shouldDropSpansBasedOnSpanFilter(tenantId, span, spanTags, processTags)
// event is needed to evaluate the first class field related relational filters
|| shouldDropSpansBasedOnExcludeRules(tenantId, event, spanTags, processTags)
|| shouldDropSpansBasedOnLateArrival(tenantId, span);
}

private boolean shouldDropSpanBasedOnRateLimitConfig(String tenantId, Event event) {
if (rateLimitingSpanFilter.shouldDropSpan(tenantId, event)) {
// increment dropped counter at tenant level
tenantToSpansDroppedCount
.computeIfAbsent(
tenantId,
tenant ->
PlatformMetricsRegistry.registerCounter(
DROPPED_SPANS_COUNTER, Map.of("tenantId", tenantId)))
.increment();
return true;
}
return false;
}

private boolean shouldDropSpansBasedOnSpanFilter(
String tenantId,
JaegerSpanInternalModel.Span span,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,5 @@ span.rules.exclude {
expireAfterWriteDuration = 5m
}
}

rate.limit.config = []
Original file line number Diff line number Diff line change
Expand Up @@ -1658,6 +1658,71 @@ public void testTagsFiltersExpectsNoExceptionForNotSetValue() {
Assertions.assertFalse(preProcessedSpan.getSpan().getTagsList().contains(notAllowed3));
}

@Test
public void testRateLimitBasedSpanFilter() {
String tenantId1 = "tenant-" + random.nextLong();
String tenantId2 = "tenant-" + random.nextLong();
Map<String, Object> configs = new HashMap<>(getCommonConfig());
configs.putAll(
Map.of(
"processor",
Map.of(
"tenantIdTagKey",
"tenant-key",
"spanDropFilters",
Collections.emptyList(),
"late.arrival.threshold.duration",
"1d"),
"rate.limit.config",
List.of(
Map.of("tenantId", tenantId1, "groupingKey", "servicename", "maxSpansPerMinute", 2),
Map.of(
"tenantId", tenantId2, "groupingKey", "http.method", "maxSpansPerMinute", 0))));

JaegerSpanPreProcessor jaegerSpanPreProcessor =
new JaegerSpanPreProcessor(ConfigFactory.parseMap(configs), excludeSpanRulesCache);

Process process =
Process.newBuilder()
.setServiceName("testService")
.addTags(KeyValue.newBuilder().setKey("tenant-key").setVStr(tenantId1).build())
.build();

Span span =
Span.newBuilder()
.setProcess(process)
.addTags(
KeyValue.newBuilder()
.setKey("http.url")
.setVStr("http://xyz.com/api/v1/health/check")
.build())
.build();
PreProcessedSpan preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span);
Assertions.assertNotNull(preProcessedSpan);
preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span);
Assertions.assertNotNull(preProcessedSpan);
preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span);
Assertions.assertNull(preProcessedSpan);

process =
Process.newBuilder()
.setServiceName("testService")
.addTags(KeyValue.newBuilder().setKey("tenant-key").setVStr(tenantId2).build())
.build();

span =
Span.newBuilder()
.setProcess(process)
.addTags(
KeyValue.newBuilder()
.setKey("http.url")
.setVStr("http://xyz.com/api/v1/health/check")
.build())
.build();
preProcessedSpan = jaegerSpanPreProcessor.preProcessSpan(span);
Assertions.assertNotNull(preProcessedSpan);
}

private Map<String, Object> getCommonConfig() {
return Map.of(
"span.type",
Expand All @@ -1677,7 +1742,9 @@ private Map<String, Object> getCommonConfig() {
"clients",
Map.of("config.service.config", Map.of("host", "localhost", "port", 50101)),
"span.rules.exclude.cache",
Map.of("refreshAfterWriteDuration", "3m", "expireAfterWriteDuration", "5m"));
Map.of("refreshAfterWriteDuration", "3m", "expireAfterWriteDuration", "5m"),
"rate.limit.config",
List.of());
}

private static SpanFilter buildRelationalFilter(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ span.rules.exclude {
}
}

rate.limit.config = []

0 comments on commit 7b1c78b

Please sign in to comment.