Skip to content

Commit

Permalink
Introduced separate global rate limiter for logs
Browse files Browse the repository at this point in the history
ProbeRateLimiter having only one global rate for snapshot, we need to
have a separate one for logs with higher rate. We need to store if the
probe capture snapshot or not for choosing the right global rate
limiter.
  • Loading branch information
jpbempel committed Jun 8, 2023
1 parent ec94b3b commit 8e7cd8b
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,38 +7,49 @@
import java.util.concurrent.ConcurrentMap;

/** Rate limiter for sending snapshot to backend Use a global rate limiter and one per probe */
public final class ProbeRateLimiter {
public class ProbeRateLimiter {
public static final double DEFAULT_SNAPSHOT_RATE = 1.0;
public static final double DEFAULT_LOG_RATE = 5000.0;
private static final Duration ONE_SECOND_WINDOW = Duration.of(1, ChronoUnit.SECONDS);
private static final Duration TEN_SECONDS_WINDOW = Duration.of(10, ChronoUnit.SECONDS);
private static final double DEFAULT_RATE = 1.0;
private static final double DEFAULT_GLOBAL_RATE = DEFAULT_RATE * 100;
private static final ConcurrentMap<String, AdaptiveSampler> PROBE_SAMPLERS =
private static final double DEFAULT_GLOBAL_SNAPSHOT_RATE = DEFAULT_SNAPSHOT_RATE * 100;
private static final double DEFAULT_GLOBAL_LOG_RATE = 5000.0;
private static final ConcurrentMap<String, RateLimitInfo> PROBE_SAMPLERS =
new ConcurrentHashMap<>();

private static AdaptiveSampler GLOBAL_SAMPLER = createSampler(DEFAULT_GLOBAL_RATE);
private static AdaptiveSampler GLOBAL_SNAPSHOT_SAMPLER =
createSampler(DEFAULT_GLOBAL_SNAPSHOT_RATE);
private static AdaptiveSampler GLOBAL_LOG_SAMPLER = createSampler(DEFAULT_GLOBAL_LOG_RATE);

public static boolean tryProbe(String probeId) {
// rate limiter engaged at ~1 probe per second (1 probes per 1s time window)
if (GLOBAL_SAMPLER.sample()) {
return PROBE_SAMPLERS.computeIfAbsent(probeId, k -> createSampler(DEFAULT_RATE)).sample();
RateLimitInfo rateLimitInfo =
PROBE_SAMPLERS.computeIfAbsent(
probeId, k -> new RateLimitInfo(createSampler(DEFAULT_SNAPSHOT_RATE), true));
AdaptiveSampler globalSampler =
rateLimitInfo.isCaptureSnapshot ? GLOBAL_SNAPSHOT_SAMPLER : GLOBAL_LOG_SAMPLER;
if (globalSampler.sample()) {
return rateLimitInfo.sampler.sample();
}
return false;
}

public static void setRate(String probeId, double rate) {
PROBE_SAMPLERS.put(probeId, createSampler(rate));
public static void setRate(String probeId, double rate, boolean isCaptureSnapshot) {
PROBE_SAMPLERS.put(probeId, new RateLimitInfo(createSampler(rate), isCaptureSnapshot));
}

public static void setGlobalSnapshotRate(double rate) {
GLOBAL_SNAPSHOT_SAMPLER = createSampler(rate);
}

public static void setGlobalRate(double rate) {
GLOBAL_SAMPLER = createSampler(rate);
public static void setGlobalLogRate(double rate) {
GLOBAL_LOG_SAMPLER = createSampler(rate);
}

public static void resetRate(String probeId) {
PROBE_SAMPLERS.remove(probeId);
}

public static void resetGlobalRate() {
setGlobalRate(DEFAULT_GLOBAL_RATE);
setGlobalSnapshotRate(DEFAULT_GLOBAL_LOG_RATE);
}

private static AdaptiveSampler createSampler(double rate) {
Expand All @@ -48,4 +59,14 @@ private static AdaptiveSampler createSampler(double rate) {
}
return new AdaptiveSampler(ONE_SECOND_WINDOW, (int) Math.round(rate), 180, 16);
}

private static class RateLimitInfo {
final AdaptiveSampler sampler;
final boolean isCaptureSnapshot;

public RateLimitInfo(AdaptiveSampler sampler, boolean isCaptureSnapshot) {
this.sampler = sampler;
this.isCaptureSnapshot = isCaptureSnapshot;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ public class ConfigurationUpdater
public static final int MAX_ALLOWED_LOG_PROBES = 100;
private static final int MAX_ALLOWED_SPAN_PROBES = 100;
private static final int MAX_ALLOWED_SPAN_DECORATION_PROBES = 100;
private static final double RATE_LIMIT_PER_SNAPSHOT_PROBE = 1.0;
private static final double RATE_LIMIT_PER_LOG_PROBE = 5000.0;

public interface TransformerSupplier {
DebuggerTransformer supply(
Expand Down Expand Up @@ -256,7 +254,8 @@ private void applyRateLimiter(ConfigurationComparer changes) {
probe.getId(),
sampling != null
? sampling.getSnapshotsPerSecond()
: getDefaultRateLimitPerProbe(probe));
: getDefaultRateLimitPerProbe(probe),
probe.isCaptureSnapshot());
}
}
// remove rate for all removed probes
Expand All @@ -268,12 +267,14 @@ private void applyRateLimiter(ConfigurationComparer changes) {
// set global sampling
LogProbe.Sampling sampling = currentConfiguration.getSampling();
if (sampling != null) {
ProbeRateLimiter.setGlobalRate(sampling.getSnapshotsPerSecond());
ProbeRateLimiter.setGlobalSnapshotRate(sampling.getSnapshotsPerSecond());
}
}

private double getDefaultRateLimitPerProbe(LogProbe probe) {
return probe.isCaptureSnapshot() ? RATE_LIMIT_PER_SNAPSHOT_PROBE : RATE_LIMIT_PER_LOG_PROBE;
return probe.isCaptureSnapshot()
? ProbeRateLimiter.DEFAULT_SNAPSHOT_RATE
: ProbeRateLimiter.DEFAULT_LOG_RATE;
}

private void removeCurrentTransformer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1453,11 +1453,12 @@ private DebuggerTransformerTest.TestSnapshotListener installProbes(
DebuggerContext.initValueSerializer(new JsonSnapshotSerializer());
for (LogProbe probe : logProbes) {
if (probe.getSampling() != null) {
ProbeRateLimiter.setRate(probe.getId(), probe.getSampling().getSnapshotsPerSecond());
ProbeRateLimiter.setRate(
probe.getId(), probe.getSampling().getSnapshotsPerSecond(), probe.isCaptureSnapshot());
}
}
if (configuration.getSampling() != null) {
ProbeRateLimiter.setGlobalRate(configuration.getSampling().getSnapshotsPerSecond());
ProbeRateLimiter.setGlobalSnapshotRate(configuration.getSampling().getSnapshotsPerSecond());
}
return listener;
}
Expand Down

0 comments on commit 8e7cd8b

Please sign in to comment.