Skip to content

Commit

Permalink
apply rate limiter when appsec event and not only at the end of the r…
Browse files Browse the repository at this point in the history
…equest
  • Loading branch information
jandro996 committed Jun 19, 2024
1 parent 4d8d7a7 commit 3ef1547
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
requestSampler,
APP_SEC_CONFIG_SERVICE.getTraceSegmentPostProcessors());

loadModules(eventDispatcher);
loadModules(eventDispatcher, rateLimiter);

gatewayBridge.init();
RESET_SUBSCRIPTION_SERVICE = gatewayBridge::stop;
Expand Down Expand Up @@ -144,11 +144,11 @@ public static void stop() {
APP_SEC_CONFIG_SERVICE.close();
}

private static void loadModules(EventDispatcher eventDispatcher) {
private static void loadModules(EventDispatcher eventDispatcher, RateLimiter rateLimiter) {
EventDispatcher.DataSubscriptionSet dataSubscriptionSet =
new EventDispatcher.DataSubscriptionSet();

final List<AppSecModule> modules = Collections.singletonList(new PowerWAFModule());
final List<AppSecModule> modules = Collections.singletonList(new PowerWAFModule(rateLimiter));
for (AppSecModule module : modules) {
log.debug("Starting appsec module {}", module.getName());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -103,6 +104,9 @@ public class AppSecRequestContext implements DataBundle, Closeable {
private boolean pathParamsPublished;
private Map<String, String> apiSchemas;

private AtomicBoolean rateLimited = new AtomicBoolean(false);
private boolean throttled;

// should be guarded by this
private Additive additive;
// set after additive is set
Expand Down Expand Up @@ -486,4 +490,11 @@ boolean commitApiSchemas(TraceSegment traceSegment) {
apiSchemas.forEach(traceSegment::setTagTop);
return true;
}

public boolean isThrottled(RateLimiter rateLimiter) {
if (rateLimiter != null && rateLimited.compareAndSet(false, true)) {
throttled = rateLimiter.isThrottled();
}
return throttled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,9 @@ public void init() {
pp.processTraceSegment(traceSeg, ctx, collectedEvents);
}

if (rateLimiter == null || !rateLimiter.isThrottled()) {
// If detected any events - mark span at appsec.event
if (!collectedEvents.isEmpty()) {
// Keep event related span, because it could be ignored in case of
// reduced datadog sampling rate.
traceSeg.setTagTop(Tags.ASM_KEEP, true);
// If detected any events - mark span at appsec.event
if (!collectedEvents.isEmpty()) {
if (ctx.isThrottled(rateLimiter)) {
traceSeg.setTagTop("appsec.event", true);
traceSeg.setTagTop("network.client.ip", ctx.getPeerAddress());

Expand All @@ -173,15 +170,16 @@ public void init() {
traceSeg.setMetaStructTop("_dd.stack", flatStruct);
}
}

} else if (hasUserTrackingEvent(traceSeg)) {
}
} else if (hasUserTrackingEvent(traceSeg)) {
if (ctx.isThrottled(rateLimiter)) {
// Report all collected request headers on user tracking event
writeRequestHeaders(traceSeg, REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders());
} else {
// Report minimum set of collected request headers
writeRequestHeaders(
traceSeg, DEFAULT_REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders());
}
} else {
// Report minimum set of collected request headers
writeRequestHeaders(
traceSeg, DEFAULT_REQUEST_HEADERS_ALLOW_LIST, ctx.getRequestHeaders());
}
// If extracted any Api Schemas - commit them
if (!ctx.commitApiSchemas(traceSeg)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.datadog.appsec.event.data.DataBundle;
import com.datadog.appsec.event.data.KnownAddresses;
import com.datadog.appsec.gateway.AppSecRequestContext;
import com.datadog.appsec.gateway.RateLimiter;
import com.datadog.appsec.report.AppSecEvent;
import com.datadog.appsec.stack_trace.StackTraceEvent;
import com.datadog.appsec.stack_trace.StackTraceEvent.Frame;
Expand All @@ -28,6 +29,7 @@
import datadog.trace.api.telemetry.WafMetricCollector;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Tags;
import datadog.trace.util.stacktrace.StackWalkerFactory;
import io.sqreen.powerwaf.Additive;
import io.sqreen.powerwaf.Powerwaf;
Expand Down Expand Up @@ -146,9 +148,18 @@ static void createLimitsObject() {
private final PowerWAFInitializationResultReporter initReporter =
new PowerWAFInitializationResultReporter();
private final PowerWAFStatsReporter statsReporter = new PowerWAFStatsReporter();
private final RateLimiter rateLimiter;

private String currentRulesVersion;

public PowerWAFModule() {
this(null);
}

public PowerWAFModule(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
}

@Override
public void config(AppSecModuleConfigurer appSecConfigService)
throws AppSecModuleActivationException {
Expand Down Expand Up @@ -444,7 +455,19 @@ public void onDataAvailable(
}
}
Collection<AppSecEvent> events = buildEvents(resultWithData);
reqCtx.reportEvents(events);

if (!events.isEmpty() && !reqCtx.isThrottled(rateLimiter)) {
AgentSpan activeSpan = AgentTracer.activeSpan();
if (activeSpan != null) {
if (log.isDebugEnabled()) {
log.debug("Setting force-keep tag on the current span");
}
// Keep event related span, because it could be ignored in case of
// reduced datadog sampling rate.
activeSpan.getLocalRootSpan().setTag(Tags.ASM_KEEP, true);
}
reqCtx.reportEvents(events);
}

if (flow.isBlocking()) {
reqCtx.setBlocked();
Expand Down

0 comments on commit 3ef1547

Please sign in to comment.