Skip to content

Commit

Permalink
move rateLimiter to PowerWafModule
Browse files Browse the repository at this point in the history
  • Loading branch information
jandro996 committed Jun 25, 2024
1 parent 2db47d1 commit 354ccb5
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,17 @@
import com.datadog.appsec.event.EventDispatcher;
import com.datadog.appsec.event.ReplaceableEventProducerService;
import com.datadog.appsec.gateway.GatewayBridge;
import com.datadog.appsec.gateway.RateLimiter;
import com.datadog.appsec.powerwaf.PowerWAFModule;
import com.datadog.appsec.util.AbortStartupException;
import com.datadog.appsec.util.StandardizedLogging;
import datadog.appsec.api.blocking.Blocking;
import datadog.appsec.api.blocking.BlockingService;
import datadog.communication.ddagent.SharedCommunicationObjects;
import datadog.communication.monitor.Counter;
import datadog.communication.monitor.Monitoring;
import datadog.remoteconfig.ConfigurationPoller;
import datadog.trace.api.Config;
import datadog.trace.api.ProductActivation;
import datadog.trace.api.gateway.SubscriptionService;
import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.bootstrap.ActiveSubsystems;
import datadog.trace.util.Strings;
import java.util.Collections;
Expand All @@ -41,7 +38,6 @@ public class AppSecSystem {
private static AppSecConfigServiceImpl APP_SEC_CONFIG_SERVICE;
private static ReplaceableEventProducerService REPLACEABLE_EVENT_PRODUCER; // testing
private static Runnable RESET_SUBSCRIPTION_SERVICE;
private static RateLimiter RATE_LIMITER; // static for testing purpose

public static void start(SubscriptionService gw, SharedCommunicationObjects sco) {
try {
Expand Down Expand Up @@ -82,16 +78,14 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s

sco.createRemaining(config);

RATE_LIMITER = getRateLimiter(config, sco.monitoring);

GatewayBridge gatewayBridge =
new GatewayBridge(
gw,
REPLACEABLE_EVENT_PRODUCER,
requestSampler,
APP_SEC_CONFIG_SERVICE.getTraceSegmentPostProcessors());

loadModules(eventDispatcher);
loadModules(eventDispatcher, sco.monitoring);

gatewayBridge.init();
RESET_SUBSCRIPTION_SERVICE = gatewayBridge::stop;
Expand All @@ -112,18 +106,6 @@ private static void doStart(SubscriptionService gw, SharedCommunicationObjects s
}
}

private static RateLimiter getRateLimiter(Config config, Monitoring monitoring) {
RateLimiter rateLimiter = null;
int appSecTraceRateLimit = config.getAppSecTraceRateLimit();
if (appSecTraceRateLimit > 0) {
Counter counter = monitoring.newCounter("_dd.java.appsec.rate_limit.dropped_traces");
rateLimiter =
new RateLimiter(
appSecTraceRateLimit, SystemTimeSource.INSTANCE, () -> counter.increment(1));
}
return rateLimiter;
}

public static boolean isActive() {
return ActiveSubsystems.APPSEC_ACTIVE;
}
Expand All @@ -144,11 +126,11 @@ public static void stop() {
APP_SEC_CONFIG_SERVICE.close();
}

private static void loadModules(EventDispatcher eventDispatcher) {
private static void loadModules(EventDispatcher eventDispatcher, Monitoring monitoring) {
EventDispatcher.DataSubscriptionSet dataSubscriptionSet =
new EventDispatcher.DataSubscriptionSet();

final List<AppSecModule> modules = Collections.singletonList(new PowerWAFModule(RATE_LIMITER));
final List<AppSecModule> modules = Collections.singletonList(new PowerWAFModule(monitoring));
for (AppSecModule module : modules) {
log.debug("Starting appsec module {}", module.getName());
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import com.squareup.moshi.Moshi;
import com.squareup.moshi.Types;
import datadog.appsec.api.blocking.BlockingContentType;
import datadog.communication.monitor.Counter;
import datadog.communication.monitor.Monitoring;
import datadog.trace.api.Config;
import datadog.trace.api.ProductActivation;
import datadog.trace.api.gateway.Flow;
import datadog.trace.api.telemetry.LogCollector;
import datadog.trace.api.telemetry.WafMetricCollector;
import datadog.trace.api.time.SystemTimeSource;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import datadog.trace.bootstrap.instrumentation.api.AgentTracer;
import datadog.trace.bootstrap.instrumentation.api.Tags;
Expand Down Expand Up @@ -156,8 +159,8 @@ public PowerWAFModule() {
this(null);
}

public PowerWAFModule(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
public PowerWAFModule(Monitoring monitoring) {
this.rateLimiter = getRateLimiter(monitoring);
}

@Override
Expand Down Expand Up @@ -338,6 +341,21 @@ private PowerwafConfig createPowerwafConfig() {
return pwConfig;
}

private static RateLimiter getRateLimiter(Monitoring monitoring) {
if (monitoring == null) {
return null;
}
RateLimiter rateLimiter = null;
int appSecTraceRateLimit = Config.get().getAppSecTraceRateLimit();
if (appSecTraceRateLimit > 0) {
Counter counter = monitoring.newCounter("_dd.java.appsec.rate_limit.dropped_traces");
rateLimiter =
new RateLimiter(
appSecTraceRateLimit, SystemTimeSource.INSTANCE, () -> counter.increment(1));
}
return rateLimiter;
}

@Override
public String getName() {
return "powerwaf";
Expand Down Expand Up @@ -464,7 +482,8 @@ public void onDataAvailable(
// reduced datadog sampling rate.
activeSpan.getLocalRootSpan().setTag(Tags.ASM_KEEP, true);
} else {
//If active span is not available the ASK_KEEP tag will be set in the GatewayBridge when the request ends
// If active span is not available the ASK_KEEP tag will be set in the GatewayBridge
// when the request ends
log.debug("There is no active span available");
}
reqCtx.reportEvents(events);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,6 @@ class AppSecSystemSpecification extends DDSpecification {
1 * traceSegment.setTagTop('actor.ip', '1.1.1.1')
}

void 'honors appsec.trace.rate.limit'() {

setup:
injectSysConfig('dd.appsec.trace.rate.limit', '5')
def sco = sharedCommunicationObjects()

when:
AppSecSystem.start(subService, sco)

then:
AppSecSystem.RATE_LIMITER.limitPerSec == 5

}

void 'throws if the config file is not parseable'() {
setup:
Path path = Files.createTempFile('dd-trace-', '.json')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.datadog.appsec.gateway.AppSecRequestContext
import com.datadog.appsec.report.AppSecEvent
import com.datadog.appsec.stack_trace.StackTraceEvent
import com.datadog.appsec.test.StubAppSecConfigService
import datadog.communication.monitor.Monitoring
import datadog.trace.api.ConfigDefaults
import datadog.trace.api.internal.TraceSegment
import datadog.appsec.api.blocking.BlockingContentType
Expand Down Expand Up @@ -1421,6 +1422,19 @@ class PowerWAFModuleSpecification extends DDSpecification {
n << (1..3)
}

void 'honors appsec.trace.rate.limit'() {
setup:
injectSysConfig('dd.appsec.trace.rate.limit', '5')
def monitoring = Mock(Monitoring)

when:
def waf = new PowerWAFModule(monitoring)

then:
waf.rateLimiter.limitPerSec == 5

}

private Map<String, Object> getDefaultConfig() {
def service = new StubAppSecConfigService()
service.init()
Expand Down

0 comments on commit 354ccb5

Please sign in to comment.