From fa305b8152ef46a32a6c4b71392e3c0bb5292090 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Tue, 2 Dec 2025 02:08:55 -0700 Subject: [PATCH 01/20] fix test --- src/main/java/com/uid2/optout/Main.java | 16 +- .../optout/vertx/DeltaProductionResult.java | 47 +- .../optout/vertx/OptOutSqsLogProducer.java | 239 +++- .../vertx/OptOutSqsLogProducerTest.java | 1084 ++++++++++++++++- 4 files changed, 1327 insertions(+), 59 deletions(-) diff --git a/src/main/java/com/uid2/optout/Main.java b/src/main/java/com/uid2/optout/Main.java index dbccd32..f6bf5ca 100644 --- a/src/main/java/com/uid2/optout/Main.java +++ b/src/main/java/com/uid2/optout/Main.java @@ -1,6 +1,8 @@ package com.uid2.optout; import com.uid2.optout.vertx.*; +import com.uid2.optout.vertx.OptOutTrafficFilter.MalformedTrafficFilterConfigException; +import com.uid2.optout.vertx.OptOutTrafficCalculator.MalformedTrafficCalcConfigException; import com.uid2.shared.ApplicationVersion; import com.uid2.shared.Utils; import com.uid2.shared.attest.AttestationResponseHandler; @@ -27,7 +29,6 @@ import io.vertx.config.ConfigRetriever; import io.vertx.core.*; import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.http.impl.HttpUtils; import io.vertx.core.json.JsonObject; import io.vertx.micrometer.MetricsDomain; import org.slf4j.Logger; @@ -296,14 +297,23 @@ public void run(String[] args) throws IOException { fsSqs = CloudUtils.createStorage(optoutBucket, sqsConfig); } - // Deploy SQS log producer with its own storage instance - OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, sqsCs); + // Create SQS-specific cloud storage instance for dropped requests (different bucket) + String optoutBucketDroppedRequests = this.config.getString(Const.Config.OptOutS3BucketDroppedRequestsProp); + ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, config); + + // Deploy SQS log producer with its own storage instance + OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null); futs.add(this.deploySingleInstance(sqsLogProducer)); LOGGER.info("SQS log producer deployed - bucket: {}, folder: {}", this.config.getString(Const.Config.OptOutS3BucketProp), sqsFolder); } catch (IOException e) { LOGGER.error("Failed to initialize SQS log producer: " + e.getMessage(), e); + LOGGER.error("Failed to initialize SQS log producer, delta production will be disabled: " + e.getMessage(), e); + } catch (MalformedTrafficFilterConfigException e) { + LOGGER.error("The traffic filter config is malformed, refusing to process messages, delta production will be disabled: " + e.getMessage(), e); + } catch (MalformedTrafficCalcConfigException e) { + LOGGER.error("The traffic calc config is malformed, refusing to process messages, delta production will be disabled: " + e.getMessage(), e); } } diff --git a/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java b/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java index fa18556..790b127 100644 --- a/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java +++ b/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java @@ -1,16 +1,25 @@ package com.uid2.optout.vertx; +import io.vertx.core.json.JsonObject; + /** - * Result object containing statistics from delta production. + * Data class containing statistics from delta production. + * + * This class holds the counts and provides JSON encoding methods. + * API response status is determined by the caller based on these statistics. */ public class DeltaProductionResult { private final int deltasProduced; private final int entriesProcessed; + private final int droppedRequestFilesProduced; + private final int droppedRequestsProcessed; private final boolean stoppedDueToRecentMessages; - public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToRecentMessages) { + public DeltaProductionResult(int deltasProduced, int entriesProcessed, int droppedRequestFilesProduced, int droppedRequestsProcessed, boolean stoppedDueToRecentMessages) { this.deltasProduced = deltasProduced; this.entriesProcessed = entriesProcessed; + this.droppedRequestFilesProduced = droppedRequestFilesProduced; + this.droppedRequestsProcessed = droppedRequestsProcessed; this.stoppedDueToRecentMessages = stoppedDueToRecentMessages; } @@ -25,5 +34,37 @@ public int getEntriesProcessed() { public boolean stoppedDueToRecentMessages() { return stoppedDueToRecentMessages; } -} + public int getDroppedRequestFilesProduced() { + return droppedRequestFilesProduced; + } + + public int getDroppedRequestsProcessed() { + return droppedRequestsProcessed; + } + + /** + * Convert to JSON with just the production counts. + */ + public JsonObject toJson() { + return new JsonObject() + .put("deltas_produced", deltasProduced) + .put("entries_processed", entriesProcessed) + .put("dropped_request_files_produced", droppedRequestFilesProduced) + .put("dropped_requests_processed", droppedRequestsProcessed); + } + + /** + * Convert to JSON with status and counts. + */ + public JsonObject toJsonWithStatus(String status) { + return toJson().put("status", status); + } + + /** + * Convert to JSON with status, reason/error, and counts. + */ + public JsonObject toJsonWithStatus(String status, String reasonKey, String reasonValue) { + return toJsonWithStatus(status).put(reasonKey, reasonValue); + } +} diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index 8ac1d1b..ae2619b 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -2,6 +2,8 @@ import com.uid2.optout.Const; import com.uid2.optout.auth.InternalAuthMiddleware; +import com.uid2.optout.vertx.OptOutTrafficCalculator.MalformedTrafficCalcConfigException; +import com.uid2.optout.vertx.OptOutTrafficFilter.MalformedTrafficFilterConfigException; import com.uid2.shared.Utils; import com.uid2.shared.cloud.ICloudStorage; import com.uid2.shared.health.HealthComponent; @@ -14,6 +16,9 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; +import io.vertx.core.json.JsonArray; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; @@ -22,6 +27,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; @@ -80,6 +86,7 @@ public class OptOutSqsLogProducer extends AbstractVerticle { private final String eventDeltaProduced; private final int replicaId; private final ICloudStorage cloudStorage; + private final ICloudStorage cloudStorageDroppedRequests; private final OptOutCloudSync cloudSync; private final int maxMessagesPerPoll; private final int visibilityTimeout; @@ -88,6 +95,9 @@ public class OptOutSqsLogProducer extends AbstractVerticle { private final int listenPort; private final String internalApiKey; private final InternalAuthMiddleware internalAuth; + private final OptOutTrafficFilter trafficFilter; + private final OptOutTrafficCalculator trafficCalculator; + private final String manualOverrideS3Path; private Counter counterDeltaProduced = Counter .builder("uid2_optout_sqs_delta_produced_total") @@ -99,6 +109,16 @@ public class OptOutSqsLogProducer extends AbstractVerticle { .description("counter for how many optout entries are processed from SQS") .register(Metrics.globalRegistry); + private Counter counterDroppedRequestFilesProduced = Counter + .builder("uid2_optout_sqs_dropped_request_files_produced_total") + .description("counter for how many optout dropped request files are produced from SQS") + .register(Metrics.globalRegistry); + + private Counter counterDroppedRequestsProcessed = Counter + .builder("uid2_optout_sqs_dropped_requests_processed_total") + .description("counter for how many optout dropped requests are processed from SQS") + .register(Metrics.globalRegistry); + private ByteBuffer buffer; private boolean shutdownInProgress = false; @@ -108,19 +128,20 @@ public class OptOutSqsLogProducer extends AbstractVerticle { // Helper for reading complete 5-minute windows from SQS private final SqsWindowReader windowReader; - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync) throws IOException { + public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync) throws IOException, MalformedTrafficCalcConfigException, MalformedTrafficFilterConfigException { this(jsonConfig, cloudStorage, cloudSync, Const.Event.DeltaProduce); } - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced) throws IOException { - this(jsonConfig, cloudStorage, cloudSync, eventDeltaProduced, null); + public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced) throws IOException, MalformedTrafficCalcConfigException, MalformedTrafficFilterConfigException { + this(jsonConfig, cloudStorage, null, cloudSync, eventDeltaProduced, null); } // Constructor for testing - allows injecting mock SqsClient - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced, SqsClient sqsClient) throws IOException { + public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, ICloudStorage cloudStorageDroppedRequests, OptOutCloudSync cloudSync, String eventDeltaProduced, SqsClient sqsClient) throws IOException, MalformedTrafficCalcConfigException, MalformedTrafficFilterConfigException { this.eventDeltaProduced = eventDeltaProduced; this.replicaId = OptOutUtils.getReplicaId(jsonConfig); this.cloudStorage = cloudStorage; + this.cloudStorageDroppedRequests = cloudStorageDroppedRequests; this.cloudSync = cloudSync; // Initialize SQS client @@ -148,6 +169,10 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O int bufferSize = jsonConfig.getInteger(Const.Config.OptOutProducerBufferSizeProp); this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); + + this.trafficFilter = new OptOutTrafficFilter(jsonConfig.getString(Const.Config.TrafficFilterConfigPathProp)); + this.trafficCalculator = new OptOutTrafficCalculator(cloudStorage, jsonConfig.getString(Const.Config.OptOutSqsS3FolderProp), jsonConfig.getString(Const.Config.TrafficCalcConfigPathProp)); + this.manualOverrideS3Path = jsonConfig.getString(Const.Config.ManualOverrideS3PathProp); // Initialize window reader this.windowReader = new SqsWindowReader( @@ -228,10 +253,7 @@ private void handleDeltaProduceStatus(RoutingContext routingContext) { if (job == null) { resp.setStatusCode(200) .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("state", "idle") - .put("message", "No job running on this pod") - .encode()); + .end(new JsonObject().put("status", "idle").put("message", "No job running on this pod").encode()); return; } @@ -256,6 +278,26 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { LOGGER.info("Delta production job requested via /deltaproduce endpoint"); + try { + this.trafficFilter.reloadTrafficFilterConfig(); + } catch (MalformedTrafficFilterConfigException e) { + LOGGER.error("Error reloading traffic filter config: " + e.getMessage(), e); + resp.setStatusCode(500) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(new JsonObject().put("status", "failed").put("error", e.getMessage()).encode()); + return; + } + + try { + this.trafficCalculator.reloadTrafficCalcConfig(); + } catch (MalformedTrafficCalcConfigException e) { + LOGGER.error("Error reloading traffic calculator config: " + e.getMessage(), e); + resp.setStatusCode(500) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(new JsonObject().put("status", "failed").put("error", e.getMessage()).encode()); + return; + } + DeltaProduceJobStatus existingJob = currentJob.get(); @@ -266,11 +308,7 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { LOGGER.warn("Delta production job already running on this pod"); resp.setStatusCode(409) .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "conflict") - .put("message", "A delta production job is already running on this pod") - .put("current_job", existingJob.toJson()) - .encode()); + .end(new JsonObject().put("status", "conflict").put("reason", "A delta production job is already running on this pod").encode()); return; } @@ -283,10 +321,7 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { if (!currentJob.compareAndSet(existingJob, newJob)) { resp.setStatusCode(409) .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "conflict") - .put("message", "Job state changed, please retry") - .encode()); + .end(new JsonObject().put("status", "conflict").put("reason", "Job state changed, please retry").encode()); return; } @@ -297,10 +332,7 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { // Return immediately with 202 Accepted resp.setStatusCode(202) .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "accepted") - .put("message", "Delta production job started on this pod") - .encode()); + .end(new JsonObject().put("status", "accepted").put("message", "Delta production job started on this pod").encode()); } /** @@ -336,8 +368,6 @@ private JsonObject produceDeltasBlocking() throws Exception { if (this.shutdownInProgress) { throw new Exception("Producer is shutting down"); } - - JsonObject result = new JsonObject(); LOGGER.info("Starting delta production from SQS queue"); // Process messages until queue is empty or messages are too recent @@ -345,20 +375,14 @@ private JsonObject produceDeltasBlocking() throws Exception { // Determine status based on results if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToRecentMessages()) { - // No deltas produced because all messages were too recent - result.put("status", "skipped"); - result.put("reason", "All messages too recent"); - LOGGER.info("Delta production skipped: all messages too recent"); + // No deltas produced - either messages too recent or traffic spike detected + LOGGER.info("Delta production skipped: {} entries processed", deltaResult.getEntriesProcessed()); + return deltaResult.toJsonWithStatus("skipped", "reason", "No deltas produced"); } else { - result.put("status", "success"); LOGGER.info("Delta production complete: {} deltas, {} entries", deltaResult.getDeltasProduced(), deltaResult.getEntriesProcessed()); + return deltaResult.toJsonWithStatus("success"); } - - result.put("deltas_produced", deltaResult.getDeltasProduced()); - result.put("entries_processed", deltaResult.getEntriesProcessed()); - - return result; } @@ -370,8 +394,18 @@ private JsonObject produceDeltasBlocking() throws Exception { * @throws IOException if delta production fails */ private DeltaProductionResult produceBatchedDeltas() throws IOException { + // Check for manual override at the start of the batch (and then between each delta window) + if (getManualOverride().equals("DELAYED_PROCESSING")) { + LOGGER.info("Manual override set to DELAYED_PROCESSING, stopping production"); + return new DeltaProductionResult(0, 0, 0, 0, false); + } + int deltasProduced = 0; int totalEntriesProcessed = 0; + + int droppedRequestFilesProduced = 0; + int droppedRequestsProcessed = 0; + boolean stoppedDueToRecentMessages = false; long jobStartTime = OptOutUtils.nowEpochSeconds(); @@ -393,36 +427,73 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { break; } + // Create delta file buffer + String deltaName = OptOutUtils.newDeltaFileName(this.replicaId); + ByteArrayOutputStream deltaStream = new ByteArrayOutputStream(); + + // Create dropped request file buffer + JsonArray droppedRequestStream = new JsonArray(); + String currentDroppedRequestName = String.format("%s%03d_%s_%08x.json", "optout-dropped-", replicaId, Instant.now().truncatedTo(ChronoUnit.SECONDS).toString().replace(':', '.'), OptOutUtils.rand.nextInt()); + // Produce delta for this window long windowStart = windowResult.getWindowStart(); List messages = windowResult.getMessages(); - - // Create delta file - String deltaName = OptOutUtils.newDeltaFileName(this.replicaId); - ByteArrayOutputStream deltaStream = new ByteArrayOutputStream(); + writeStartOfDelta(deltaStream, windowStart); // Write all messages - List sqsMessages = new ArrayList<>(); + List currentDeltaMessages = new ArrayList<>(); + List droppedRequestMessages = new ArrayList<>(); for (SqsParsedMessage msg : messages) { - writeOptOutEntry(deltaStream, msg.getHashBytes(), msg.getIdBytes(), msg.getTimestamp()); - sqsMessages.add(msg.getOriginalMessage()); + if (trafficFilter.isBlacklisted(msg)) { + this.writeDroppedRequestEntry(droppedRequestStream, msg); + droppedRequestMessages.add(msg.getOriginalMessage()); + droppedRequestsProcessed++; + } else { + writeOptOutEntry(deltaStream, msg.getHashBytes(), msg.getIdBytes(), msg.getTimestamp()); + currentDeltaMessages.add(msg.getOriginalMessage()); + totalEntriesProcessed++; + } + } + + // check for manual override + if (getManualOverride().equals("DELAYED_PROCESSING")) { + LOGGER.info("Manual override set to DELAYED_PROCESSING, stopping production"); + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, true); + } else if (getManualOverride().equals("DEFAULT")) { + LOGGER.info("Manual override set to DEFAULT, skipping traffic calculation"); + } else { + // check traffic calculator status + OptOutTrafficCalculator.TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(currentDeltaMessages); + if (trafficStatus == OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING) { + LOGGER.error("OptOut Delta Production has hit DELAYED_PROCESSING status, stopping production"); + this.setDelayedProcessingOverride(); + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, true); + } } - // Upload and delete - uploadDeltaAndDeleteMessages(deltaStream, deltaName, windowStart, sqsMessages); - deltasProduced++; - totalEntriesProcessed += messages.size(); + // Upload delta file if there are non-blacklisted messages + if (!currentDeltaMessages.isEmpty()) { + uploadDeltaAndDeleteMessages(deltaStream, deltaName, windowStart, currentDeltaMessages); + deltasProduced++; + } + + // Upload dropped request file if there are blacklisted messages + if (!droppedRequestMessages.isEmpty()) { + this.uploadDroppedRequestsAndDeleteMessages(droppedRequestStream, currentDroppedRequestName, windowStart, droppedRequestMessages); + droppedRequestFilesProduced++; + droppedRequestMessages.clear(); + } LOGGER.info("Produced delta for window [{}, {}] with {} messages", windowStart, windowStart + this.deltaWindowSeconds, messages.size()); } long totalDuration = OptOutUtils.nowEpochSeconds() - jobStartTime; - LOGGER.info("Delta production complete: took {}s, produced {} deltas, processed {} entries", + LOGGER.info("Delta production complete: took {}s, produced {} deltas, processed {} entries, produced {} dropped request files, processed {} dropped requests", totalDuration, deltasProduced, totalEntriesProcessed); - return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, stoppedDueToRecentMessages); + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, stoppedDueToRecentMessages); } /** @@ -537,6 +608,82 @@ private void uploadDeltaAndDeleteMessages(ByteArrayOutputStream deltaStream, Str } } + /** + * Writes a dropped request entry to the dropped request stream. + */ + private void writeDroppedRequestEntry(JsonArray droppedRequestArray, SqsParsedMessage parsed) throws IOException { + String messageBody = parsed.getOriginalMessage().body(); + JsonObject messageJson = new JsonObject(messageBody); + droppedRequestArray.add(messageJson); + } + + // Upload a dropped request file to S3 and delete messages from SQS after successful upload + private void uploadDroppedRequestsAndDeleteMessages(JsonArray droppedRequestStream, String droppedRequestName, Long windowStart, List messages) throws IOException { + try { + // upload + byte[] droppedRequestData = droppedRequestStream.encode().getBytes(); + + LOGGER.info("SQS Dropped Requests Upload - fileName: {}, s3Path: {}, size: {} bytes, messages: {}, window: [{}, {})", + droppedRequestName, droppedRequestData.length, messages.size(), windowStart, windowStart + this.deltaWindowSeconds); + + boolean uploadSucceeded = false; + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(droppedRequestData)) { + this.cloudStorageDroppedRequests.upload(inputStream, droppedRequestName); + LOGGER.info("Successfully uploaded dropped requests to S3: {}", droppedRequestName); + uploadSucceeded = true; + + // publish event + this.counterDroppedRequestFilesProduced.increment(); + this.counterDroppedRequestsProcessed.increment(messages.size()); + } catch (Exception uploadEx) { + LOGGER.error("Failed to upload dropped requests to S3: " + uploadEx.getMessage(), uploadEx); + throw new IOException("S3 upload failed", uploadEx); + } + + // CRITICAL: Only delete messages from SQS after successful S3 upload + if (uploadSucceeded && !messages.isEmpty()) { + LOGGER.info("Deleting {} messages from SQS after successful S3 upload", messages.size()); + SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, messages); + } + + // Clear the array + droppedRequestStream.clear(); + + } catch (Exception ex) { + LOGGER.error("Error uploading dropped requests: " + ex.getMessage(), ex); + throw new IOException("Dropped requests upload failed", ex); + } + } + + /** + * Upload a JSON config file to S3 containing the following: + * {"manual_override": "DELAYED_PROCESSING"} + * Manual override file is at the root of the S3 bucket + */ + private void setDelayedProcessingOverride() { + try { + JsonObject config = new JsonObject().put("manual_override", "DELAYED_PROCESSING"); + this.cloudStorage.upload(new ByteArrayInputStream(config.encode().getBytes()), this.manualOverrideS3Path); + } catch (Exception e) { + LOGGER.error("Error setting delayed processing override", e); + } + } + + /** + * Check if there is a manual override set in S3 for DEFAULT or DELAYED_PROCESSING status + * Manual override file is at the root of the S3 bucket + */ + private String getManualOverride() { + try { + InputStream inputStream = this.cloudStorage.download(this.manualOverrideS3Path); + JsonObject configJson = Utils.toJsonObject(inputStream); + return configJson.getString("manual_override", ""); + } catch (Exception e) { + LOGGER.error("Error checking for manual override in S3: " + e.getMessage(), e); + return ""; + } + } + private void publishDeltaProducedEvent(String newDelta) { vertx.eventBus().publish(this.eventDeltaProduced, newDelta); LOGGER.info("Published delta.produced event for: {}", newDelta); diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index a46023f..414db4b 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -4,6 +4,8 @@ import com.uid2.shared.cloud.ICloudStorage; import com.uid2.shared.optout.OptOutCloudSync; import com.uid2.shared.vertx.VertxUtils; +import com.uid2.shared.optout.OptOutEntry; +import com.uid2.shared.optout.OptOutCollection; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; @@ -14,12 +16,19 @@ import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.InputStream; +import java.io.ByteArrayInputStream; import java.util.*; +import java.nio.file.Files; +import java.nio.file.Path; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import org.mockito.ArgumentCaptor; /** * Integration tests for OptOutSqsLogProducer deltaproduce endpoint. @@ -33,12 +42,18 @@ public class OptOutSqsLogProducerTest { private SqsClient sqsClient; private ICloudStorage cloudStorage; + private ICloudStorage cloudStorageDroppedRequests; private OptOutCloudSync cloudSync; private static final String TEST_QUEUE_URL = "https://sqs.test.amazonaws.com/123456789/test"; private static final String TEST_API_KEY = "test-api-key"; private static final String VALID_HASH_BASE64 = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="; private static final String VALID_ID_BASE64 = "AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE="; + private static final String TRAFFIC_FILTER_CONFIG_PATH = "./traffic-filter.json"; + private static final String TRAFFIC_CALC_CONFIG_PATH = "./traffic-calc.json"; + private static final String MANUAL_OVERRIDE_S3_PATH = "manual-override.json"; + private static final String S3_DELTA_PREFIX = "sqs-delta"; + private static final String TEST_BUCKET_DROPPED_REQUESTS = "test-bucket-dropped-requests"; @Before public void setup(TestContext context) throws Exception { @@ -47,6 +62,7 @@ public void setup(TestContext context) throws Exception { // Create mocks sqsClient = mock(SqsClient.class); cloudStorage = mock(ICloudStorage.class); + cloudStorageDroppedRequests = mock(ICloudStorage.class); cloudSync = mock(OptOutCloudSync.class); JsonObject config = VertxUtils.getJsonConfig(vertx); @@ -54,7 +70,13 @@ public void setup(TestContext context) throws Exception { .put(Const.Config.OptOutSqsVisibilityTimeoutProp, 240) .put(Const.Config.OptOutProducerBufferSizeProp, 65536) .put(Const.Config.OptOutProducerReplicaIdProp, 1) - .put(Const.Config.OptOutInternalApiTokenProp, TEST_API_KEY); + .put(Const.Config.OptOutInternalApiTokenProp, TEST_API_KEY) + .put(Const.Config.OptOutInternalApiTokenProp, TEST_API_KEY) + .put(Const.Config.TrafficFilterConfigPathProp, TRAFFIC_FILTER_CONFIG_PATH) + .put(Const.Config.TrafficCalcConfigPathProp, TRAFFIC_CALC_CONFIG_PATH) + .put(Const.Config.ManualOverrideS3PathProp, MANUAL_OVERRIDE_S3_PATH) + .put(Const.Config.OptOutS3BucketDroppedRequestsProp, TEST_BUCKET_DROPPED_REQUESTS) + .put(Const.Config.OptOutSqsS3FolderProp, S3_DELTA_PREFIX); // Mock cloud sync to return proper S3 paths when(cloudSync.toCloudPath(anyString())) @@ -62,9 +84,36 @@ public void setup(TestContext context) throws Exception { // Mock S3 upload to succeed by default doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + doAnswer(inv -> null).when(cloudStorageDroppedRequests).upload(any(InputStream.class), anyString()); + + // Don't mock download with anyString() - let tests mock specific paths as needed + // Unmocked downloads will return null by default + + + try { + String traficFilterConfig = """ + { + "blacklist_requests": [ + ] + } + """; + createTrafficConfigFile(traficFilterConfig); + + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 100, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + } catch (Exception e) { + throw new RuntimeException(e); + } // Create producer with mock SqsClient - producer = new OptOutSqsLogProducer(config, cloudStorage, cloudSync, Const.Event.DeltaProduce, sqsClient); + producer = new OptOutSqsLogProducer(config, cloudStorage, cloudStorageDroppedRequests, cloudSync, Const.Event.DeltaProduce, sqsClient); // Deploy verticle Async async = context.async(); @@ -76,12 +125,53 @@ public void tearDown(TestContext context) { if (vertx != null) { vertx.close(context.asyncAssertSuccess()); } + if (Files.exists(Path.of(TRAFFIC_FILTER_CONFIG_PATH))) { + try { + Files.delete(Path.of(TRAFFIC_FILTER_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + if (Files.exists(Path.of(TRAFFIC_CALC_CONFIG_PATH))) { + try { + Files.delete(Path.of(TRAFFIC_CALC_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } - + + private void createTrafficConfigFile(String content) { + try { + Path configPath = Path.of(TRAFFIC_FILTER_CONFIG_PATH); + Files.writeString(configPath, content); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void createTrafficCalcConfigFile(String content) { + try { + Path configPath = Path.of(TRAFFIC_CALC_CONFIG_PATH); + Files.writeString(configPath, content); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private Message createMessage(String hash, String id, long timestampMs) { + return createMessage(hash, id, timestampMs, null, null, null, null); + } + + private Message createMessage(String hash, String id, long timestampMs, String email, String phone, String clientIp, String traceId) { JsonObject body = new JsonObject() .put("identity_hash", hash) .put("advertising_id", id); + + if (email != null) body.put("email", email); + if (phone != null) body.put("phone", phone); + if (clientIp != null) body.put("client_ip", clientIp); + if (traceId != null) body.put("trace_id", traceId); Map attrs = new HashMap<>(); attrs.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(timestampMs)); @@ -287,7 +377,7 @@ public void testDeltaProduceEndpoint_allMessagesTooRecent(TestContext context) { JsonObject result = finalStatus.getJsonObject("result"); context.assertNotNull(result); context.assertEquals("skipped", result.getString("status")); - context.assertEquals("All messages too recent", result.getString("reason")); + context.assertEquals("No deltas produced", result.getString("reason")); // No processing should occur try { @@ -331,9 +421,15 @@ public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime) ); - // Mock SQS to return messages + // Use CountDownLatch to control when the mock returns - ensures job stays running + java.util.concurrent.CountDownLatch processingLatch = new java.util.concurrent.CountDownLatch(1); + + // Mock SQS to wait on latch before returning - keeps job in RUNNING state when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenAnswer(inv -> { + processingLatch.await(); // Wait until test releases + return ReceiveMessageResponse.builder().messages(messages).build(); + }) .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) @@ -373,8 +469,10 @@ public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context .onComplete(context.asyncAssertSuccess(body -> { JsonObject response = new JsonObject(body.toString()); context.assertEquals("conflict", response.getString("status")); - context.assertTrue(response.getString("message").contains("already running")); + context.assertTrue(response.getString("reason").contains("already running")); + // Release the latch so first job can complete + processingLatch.countDown(); async.complete(); })); } @@ -460,5 +558,977 @@ public void testDeltaProduceEndpoint_autoClearCompletedJob(TestContext context) async.complete(); })); } + + @Test + public void testTrafficFilter_blacklistedMessagesAreDropped(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - update traffic filter config to blacklist specific IP and time range + long baseTime = System.currentTimeMillis() / 1000 - 400; // 400 seconds ago + String filterConfig = String.format(""" + { + "blacklist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(filterConfig); + + // Setup - create messages: some blacklisted, some not + long blacklistedTime = (baseTime) * 1000; // Within blacklist range + long normalTime = (baseTime - 200) * 1000; // Outside blacklist range + List messages = Arrays.asList( + // These should be dropped (blacklisted) + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime, null, null, "192.168.1.100", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime + 1000, null, null, "192.168.1.100", null), + // These should be processed normally + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime + 1000, null, null, "10.0.0.2", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should process 2 normal entries + context.assertEquals(2, result.getInteger("entries_processed")); + + // Should have 2 dropped requests + context.assertEquals(2, result.getInteger("dropped_requests_processed")); + + // Verify both delta and dropped request files were uploaded + try { + verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), anyString()); + verify(cloudStorageDroppedRequests, atLeastOnce()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testTrafficFilter_noBlacklistedMessages(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - traffic filter with a blacklisted IP + long baseTime = System.currentTimeMillis() / 1000 - 400; + String filterConfig = String.format(""" + { + "blacklist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(filterConfig); + + // Setup - create messages that don't match blacklist + long normalTime = (baseTime - 200) * 1000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime + 1000, null, null, "10.0.0.2", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + context.assertEquals(2, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("dropped_requests_processed")); + + // Should not upload dropped request file + try { + verify(cloudStorageDroppedRequests, never()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testTrafficFilter_allMessagesBlacklisted(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - traffic filter with a blacklisted IP + long baseTime = System.currentTimeMillis() / 1000 - 400; + String filterConfig = String.format(""" + { + "blacklist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(filterConfig); + + // Setup - create messages that are blacklisted + long blacklistedTime = baseTime * 1000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime, null, null, "192.168.1.100", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime + 1000, null, null, "192.168.1.100", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // No entries processed (all blacklisted) + context.assertEquals(0, result.getInteger("entries_processed")); + + // All messages dropped + context.assertEquals(2, result.getInteger("dropped_requests_processed")); + + // Should upload dropped request file but not delta file + try { + verify(cloudStorageDroppedRequests, atLeastOnce()).upload(any(InputStream.class), anyString()); + verify(cloudStorage, never()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testTrafficFilter_messagesWithoutClientIp(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - traffic filter with a blacklisted IP + long baseTime = System.currentTimeMillis() / 1000 - 400; + String filterConfig = String.format(""" + { + "blacklist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(filterConfig); + + // Create messages without client IP (should not be blacklisted) + long time = baseTime * 1000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, time, null, null, null, null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Message should be processed (not blacklisted due to missing IP) + context.assertEquals(1, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("dropped_requests_processed")); + + async.complete(); + })); + } + + @Test + public void testTrafficFilterConfig_reloadOnEachBatch(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - initial config with no blacklist + String initialConfig = """ + { + "blacklist_requests": [] + } + """; + createTrafficConfigFile(initialConfig); + + // Setup - create messages + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "192.168.1.100", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Act & Assert - first request - should process normally + int port = Const.Port.ServicePortForOptOut + 1; + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals(1, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("dropped_requests_processed")); + + // Update config to blacklist the IP + try { + long baseTime = System.currentTimeMillis() / 1000 - 400; + String updatedConfig = String.format(""" + { + "blacklist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(updatedConfig); + + // Reset mocks for second request + reset(sqsClient, cloudStorage, cloudStorageDroppedRequests); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + doAnswer(inv -> null).when(cloudStorageDroppedRequests).upload(any(InputStream.class), anyString()); + + // Act & Assert - second request - should now be blacklisted + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body2 -> { + JsonObject response2 = new JsonObject(body2.toString()); + context.assertEquals("accepted", response2.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus2 -> { + context.assertEquals("completed", finalStatus2.getString("state")); + JsonObject result2 = finalStatus2.getJsonObject("result"); + // Now should be blacklisted + context.assertEquals(0, result2.getInteger("entries_processed")); + context.assertEquals(1, result2.getInteger("dropped_requests_processed")); + async.complete(); + })); + } catch (Exception e) { + context.fail(e); + } + })); + } + + @Test + public void testTrafficCalculator_defaultStatus(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - traffic calc config with required fields + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 100, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + + // Setup - no manual override + when(cloudStorage.download(anyString())) + .thenReturn(null); + + // Setup - create messages that will result in DEFAULT status + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime + 1000, null, null, "10.0.0.2", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should process messages normally (DEFAULT status) + context.assertEquals(2, result.getInteger("entries_processed")); + context.assertTrue(result.getInteger("deltas_produced") >= 1); + + // Verify upload happened + try { + verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testManualOverride_delayedProcessing(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - mock manual override set to DELAYED_PROCESSING + JsonObject manualOverride = new JsonObject().put("manual_override", "DELAYED_PROCESSING"); + doReturn(new java.io.ByteArrayInputStream(manualOverride.encode().getBytes())) + .when(cloudStorage).download(MANUAL_OVERRIDE_S3_PATH); // At root of bucket + + // Setup - create messages (won't be processed due to override) + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime + 1000, null, null, "10.0.0.2", null) + ); + + List allMessages = new ArrayList<>(messages); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(allMessages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should not process anything - manual override checked at start + context.assertEquals(0, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("deltas_produced")); + + // No SQS deletions should occur (messages not processed) + verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } + + @Test + public void testManualOverride_default_bypassesTrafficCalculation(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - setup time: current time + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create delta files with timestamps distributed over 48 hours + List timestamps = new ArrayList<>(); + + // Past window: t-47h to t-25h (add 10 entries) + for (int i = 0; i < 10; i++) { + timestamps.add(t - 47*3600 + i * 1000); + } + + // Current window: t-23h to t-1h (add 100 entries - 10x past) + for (int i = 0; i < 100; i++) { + timestamps.add(t - 23*3600 + i * 1000); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + // Setup - mock manual override set to DEFAULT + JsonObject manualOverride = new JsonObject().put("manual_override", "DEFAULT"); + + // Mock S3 operations for this test + // Use doAnswer to create fresh streams on each call (streams are consumed on read) + doReturn(Arrays.asList("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .when(cloudStorage).list("sqs-delta"); + doAnswer(inv -> new ByteArrayInputStream(deltaFileBytes)) + .when(cloudStorage).download("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"); + doAnswer(inv -> new java.io.ByteArrayInputStream(manualOverride.encode().getBytes())) + .when(cloudStorage).download("manual-override.json"); + + // Setup - SQS messages, 10 messages in same window + long oldTime = (t - 600) * 1000; + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime - (i * 1000), null, null, "10.0.0." + i, null)); + } + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should process 10 messages and produce 1 delta (all in same window) + context.assertEquals(10, result.getInteger("entries_processed")); + context.assertEquals(1, result.getInteger("deltas_produced")); + + verify(sqsClient, atLeastOnce()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } + + @Test + public void testTrafficCalculator_detectsSpikeInCurrentWindow(TestContext context) throws Exception { + Async async = context.async(); + + // Setup time + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create historical delta files showing low baseline traffic (2 records from 24-48h ago) + List timestamps = new ArrayList<>(); + timestamps.add(t - 36*3600); // 36 hours ago + timestamps.add(t - 36*3600 + 1000); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + // Reset cloudStorage mock to ensure clean state + reset(cloudStorage); + + // Re-mock S3 upload (needed after reset) + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + + // Mock S3 operations for historical data + // Use doAnswer to create fresh streams on each call + doReturn(Arrays.asList("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_baseline.dat")) + .when(cloudStorage).list("sqs-delta"); + doAnswer(inv -> new ByteArrayInputStream(deltaFileBytes)) + .when(cloudStorage).download("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_baseline.dat"); + + // No manual override set (returns null) + doReturn(null).when(cloudStorage).download("manual-override.json"); + + // Setup SQS messages in 2 different 5-minute windows + long oldTime = (t - 600) * 1000; // 10 minutes ago + + // Window 1 (oldest): Low traffic (10 messages) - within baseline, should process + List window1Messages = new ArrayList<>(); + long window1Time = oldTime - 600*1000; // 20 minutes ago + for (int i = 0; i < 10; i++) { + window1Messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + window1Time - (i * 1000), null, null, "10.0.0." + i, null)); + } + + // Window 2: High traffic spike (600 messages) - exceeds threshold (baseline=100 * multiplier=5 = 500) + // NOTE: The traffic calculator is called with currentDeltaMessages (window 2's messages), + // so it calculates the spike AFTER processing all of window 2's messages (but the delta is not uploaded). + List window2Messages = new ArrayList<>(); + long window2Time = oldTime - 300*1000; // 15 minutes ago (5 min after window1) + for (int i = 0; i < 600; i++) { + window2Messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + window2Time - (i * 100), null, null, "10.0.1." + (i % 256), null)); + } + + // Combine all messages (oldest first for SQS ordering) + List allMessages = new ArrayList<>(); + allMessages.addAll(window1Messages); + allMessages.addAll(window2Messages); + + // Mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(allMessages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Act & Assert + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("skipped", result.getString("status")); + + // Expected behavior: + // SqsWindowReader groups all 610 messages together (window 1 + window 2) + // Traffic calculator detects spike (>=500 threshold) → DELAYED_PROCESSING + // encodeSkippedResult preserves the actual counts + context.assertEquals(610, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("deltas_produced")); + + // Verify manual override was set to DELAYED_PROCESSING on S3 + try { + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor streamCaptor = ArgumentCaptor.forClass(InputStream.class); + verify(cloudStorage, atLeastOnce()).upload(streamCaptor.capture(), pathCaptor.capture()); + + // Check if manual-override.json was uploaded + boolean overrideSet = false; + for (int i = 0; i < pathCaptor.getAllValues().size(); i++) { + if (pathCaptor.getAllValues().get(i).equals("manual-override.json")) { + overrideSet = true; + break; + } + } + context.assertTrue(overrideSet, "Manual override should be set to DELAYED_PROCESSING after detecting spike"); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testManualOverride_notSet(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - mock no manual override file + when(cloudStorage.download(anyString())) + .thenReturn(null); + + // Setup - create messages + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "10.0.0.1", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should process normally with traffic calc (no override) + context.assertEquals(1, result.getInteger("entries_processed")); + + async.complete(); + })); + } + + @Test + public void testS3UploadFailure_messagesNotDeletedFromSqs(TestContext context) throws Exception { + Async async = context.async(); + + // Create messages to process + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime + 1000, null, null, "10.0.0.2", null) + ); + + // Mock SQS to return messages + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + // Mock S3 upload to FAIL + doThrow(new RuntimeException("S3 upload failed - simulated error")) + .when(cloudStorage).upload(any(InputStream.class), anyString()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Start job + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + // Job should fail due to S3 error + context.assertEquals("failed", finalStatus.getString("state")); + context.assertTrue(finalStatus.getString("error").contains("S3") || + finalStatus.getString("error").contains("upload")); + + // CRITICAL: Messages should NOT be deleted from SQS when upload fails + verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } + + @Test + public void testStatusEndpoint_showsRunningJob(TestContext context) throws Exception { + Async async = context.async(); + + // Create messages + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime) + ); + + // Use CountDownLatch to keep job running + java.util.concurrent.CountDownLatch processingLatch = new java.util.concurrent.CountDownLatch(1); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenAnswer(inv -> { + processingLatch.await(); + return ReceiveMessageResponse.builder().messages(messages).build(); + }) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Start job + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + // Immediately check status - should show "running" + return vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.GET, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString() + "/status") + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()); + }) + .compose(resp -> { + context.assertEquals(200, resp.statusCode()); + return resp.body(); + }) + .onComplete(context.asyncAssertSuccess(body -> { + JsonObject status = new JsonObject(body.toString()); + context.assertEquals("running", status.getString("state")); + context.assertNotNull(status.getString("start_time")); + + // Release latch so job can complete + processingLatch.countDown(); + async.complete(); + })); + } + + @Test + public void testStatusEndpoint_showsFailedJob(TestContext context) throws Exception { + Async async = context.async(); + + // Create messages + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime) + ); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + // Make S3 upload fail + doThrow(new RuntimeException("Simulated S3 failure")) + .when(cloudStorage).upload(any(InputStream.class), anyString()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Start job and wait for it to fail + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> pollForCompletion(context, port, 100, 50)) + .compose(finalStatus -> { + context.assertEquals("failed", finalStatus.getString("state")); + + // Now call status endpoint directly to verify failed state is persisted + return vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.GET, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString() + "/status") + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()); + }) + .compose(resp -> { + context.assertEquals(200, resp.statusCode()); + return resp.body(); + }) + .onComplete(context.asyncAssertSuccess(body -> { + JsonObject status = new JsonObject(body.toString()); + context.assertEquals("failed", status.getString("state")); + context.assertNotNull(status.getString("error")); + context.assertNotNull(status.getString("start_time")); + context.assertNotNull(status.getString("end_time")); + context.assertNotNull(status.getInteger("duration_seconds")); + + async.complete(); + })); + } + + @Test + public void testDeltaProduceEndpoint_invalidApiKey(TestContext context) { + Async async = context.async(); + + int port = Const.Port.ServicePortForOptOut + 1; + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer wrong-api-key") + .send()) + .compose(resp -> { + context.assertEquals(401, resp.statusCode()); + return resp.body(); + }) + .onComplete(context.asyncAssertSuccess(body -> { + // Should not call SQS when unauthorized + verify(sqsClient, never()).receiveMessage(any(ReceiveMessageRequest.class)); + async.complete(); + })); + } + + /** + * Create delta file bytes with specified timestamps + */ + private byte[] createDeltaFileBytes(List timestamps) throws Exception { + // Create OptOutEntry objects using newTestEntry + List entries = new ArrayList<>(); + + long idCounter = 1000; // Use incrementing IDs for test entries + for (long timestamp : timestamps) { + entries.add(OptOutEntry.newTestEntry(idCounter++, timestamp)); + } + + // Create OptOutCollection + OptOutCollection collection = new OptOutCollection(entries.toArray(new OptOutEntry[0])); + return collection.getStore(); + } } From 695f6b4602a9af0f6654bf7488e49a4a0d674532 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Tue, 2 Dec 2025 02:41:53 -0700 Subject: [PATCH 02/20] threshold in reading --- .../optout/vertx/OptOutSqsLogProducer.java | 15 ++- .../optout/vertx/OptOutTrafficCalculator.java | 8 ++ .../uid2/optout/vertx/SqsWindowReader.java | 44 +++++++- .../vertx/OptOutSqsLogProducerTest.java | 105 ++++++++++++++++++ 4 files changed, 163 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index ae2619b..af1d937 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -174,10 +174,10 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, I this.trafficCalculator = new OptOutTrafficCalculator(cloudStorage, jsonConfig.getString(Const.Config.OptOutSqsS3FolderProp), jsonConfig.getString(Const.Config.TrafficCalcConfigPathProp)); this.manualOverrideS3Path = jsonConfig.getString(Const.Config.ManualOverrideS3PathProp); - // Initialize window reader + // Initialize window reader with traffic threshold this.windowReader = new SqsWindowReader( this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, - this.visibilityTimeout, this.deltaWindowSeconds + this.visibilityTimeout, this.deltaWindowSeconds, this.trafficCalculator.getThreshold() ); } @@ -290,6 +290,8 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { try { this.trafficCalculator.reloadTrafficCalcConfig(); + // Update window reader's message limit to match new threshold + this.windowReader.setMaxMessagesPerWindow(this.trafficCalculator.getThreshold()); } catch (MalformedTrafficCalcConfigException e) { LOGGER.error("Error reloading traffic calculator config: " + e.getMessage(), e); resp.setStatusCode(500) @@ -298,7 +300,6 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { return; } - DeltaProduceJobStatus existingJob = currentJob.get(); // If there's an existing job, check if it's still running @@ -427,6 +428,14 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { break; } + // if message limit exceeded, treat as traffic spike + if (windowResult.exceededMessageLimit()) { + LOGGER.error("Message limit exceeded ({} messages) - triggering DELAYED_PROCESSING to prevent memory exhaustion", + windowResult.getMessages().size()); + this.setDelayedProcessingOverride(); + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, true); + } + // Create delta file buffer String deltaName = OptOutUtils.newDeltaFileName(this.replicaId); ByteArrayOutputStream deltaStream = new ByteArrayOutputStream(); diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java index 7795021..a7d73d3 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java @@ -515,6 +515,14 @@ TrafficStatus determineStatus(int sumCurrent, int baselineTraffic) { return TrafficStatus.DEFAULT; } + /** + * Get the traffic threshold (baseline × multiplier). + * Used for early termination + */ + public int getThreshold() { + return this.baselineTraffic * this.thresholdMultiplier; + } + /** * Get cache statistics for monitoring */ diff --git a/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java b/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java index 782bb93..6aa18a0 100644 --- a/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java +++ b/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java @@ -1,5 +1,7 @@ package com.uid2.optout.vertx; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.Message; @@ -11,23 +13,35 @@ * Handles accumulation of all messages for a window before returning. */ public class SqsWindowReader { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsWindowReader.class); + private final SqsClient sqsClient; private final String queueUrl; private final int maxMessagesPerPoll; private final int visibilityTimeout; private final int deltaWindowSeconds; private final SqsBatchProcessor batchProcessor; + private int maxMessagesPerWindow; public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll, - int visibilityTimeout, int deltaWindowSeconds) { + int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerWindow) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.maxMessagesPerPoll = maxMessagesPerPoll; this.visibilityTimeout = visibilityTimeout; this.deltaWindowSeconds = deltaWindowSeconds; + this.maxMessagesPerWindow = maxMessagesPerWindow; this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds); } + /** + * Update the max messages limit (e.g., after config reload). + */ + public void setMaxMessagesPerWindow(int maxMessagesPerWindow) { + this.maxMessagesPerWindow = maxMessagesPerWindow; + LOGGER.info("Updated maxMessagesPerWindow to {}", maxMessagesPerWindow); + } + /** * Result of reading messages for a 5-minute window. */ @@ -35,17 +49,21 @@ public static class WindowReadResult { private final List messages; private final long windowStart; private final boolean stoppedDueToRecentMessages; + private final boolean exceededMessageLimit; - public WindowReadResult(List messages, long windowStart, boolean stoppedDueToRecentMessages) { + public WindowReadResult(List messages, long windowStart, + boolean stoppedDueToRecentMessages, boolean exceededMessageLimit) { this.messages = messages; this.windowStart = windowStart; this.stoppedDueToRecentMessages = stoppedDueToRecentMessages; + this.exceededMessageLimit = exceededMessageLimit; } public List getMessages() { return messages; } public long getWindowStart() { return windowStart; } public boolean isEmpty() { return messages.isEmpty(); } public boolean stoppedDueToRecentMessages() { return stoppedDueToRecentMessages; } + public boolean exceededMessageLimit() { return exceededMessageLimit; } } /** @@ -54,6 +72,7 @@ public WindowReadResult(List messages, long windowStart, boole * - We discover the next window * - Queue is empty (no more messages) * - Messages are too recent (all messages younger than 5 minutes) + * - Message count exceeds maxMessagesPerWindow * * @return WindowReadResult with messages for the window, or empty if done */ @@ -62,13 +81,20 @@ public WindowReadResult readWindow() { long currentWindowStart = 0; while (true) { + + if (windowMessages.size() >= maxMessagesPerWindow) { + LOGGER.warn("Message limit exceeded: {} messages >= limit {}. Stopping to prevent memory exhaustion.", + windowMessages.size(), maxMessagesPerWindow); + return new WindowReadResult(windowMessages, currentWindowStart, false, true); + } + // Read one batch from SQS (up to 10 messages) List rawBatch = SqsMessageOperations.receiveMessagesFromSqs( this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout); if (rawBatch.isEmpty()) { // Queue empty - return what we have - return new WindowReadResult(windowMessages, currentWindowStart, false); + return new WindowReadResult(windowMessages, currentWindowStart, false, false); } // Process batch: parse, validate, filter @@ -77,7 +103,7 @@ public WindowReadResult readWindow() { if (batchResult.isEmpty()) { if (batchResult.shouldStopProcessing()) { // Messages too recent - return what we have - return new WindowReadResult(windowMessages, currentWindowStart, true); + return new WindowReadResult(windowMessages, currentWindowStart, true, false); } // corrupt messages deleted, read next messages continue; @@ -99,13 +125,19 @@ public WindowReadResult readWindow() { } windowMessages.add(msg); + + // Check limit after each message addition + if (windowMessages.size() >= maxMessagesPerWindow) { + LOGGER.warn("Message limit exceeded during batch: {} messages >= limit {}", + windowMessages.size(), maxMessagesPerWindow); + return new WindowReadResult(windowMessages, currentWindowStart, false, true); + } } if (newWindow) { // close current window and return - return new WindowReadResult(windowMessages, currentWindowStart, false); + return new WindowReadResult(windowMessages, currentWindowStart, false, false); } } } } - diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index 414db4b..bfada92 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -1159,6 +1159,18 @@ public void testManualOverride_default_bypassesTrafficCalculation(TestContext co public void testTrafficCalculator_detectsSpikeInCurrentWindow(TestContext context) throws Exception { Async async = context.async(); + // Use higher threshold (1000) so circuit breaker doesn't trigger before traffic calculator + // This tests the traffic calculator spike detection, not the circuit breaker + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 200, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + // Setup time long currentTime = System.currentTimeMillis() / 1000; long t = currentTime; @@ -1271,6 +1283,99 @@ public void testTrafficCalculator_detectsSpikeInCurrentWindow(TestContext contex })); } + @Test + public void testCircuitBreaker_stopsProcessingWhenMessageLimitExceeded(TestContext context) throws Exception { + Async async = context.async(); + + // Use low threshold (100) so circuit breaker triggers before traffic calculator + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 20, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + + // Reset cloudStorage mock to ensure clean state + reset(cloudStorage); + + // Re-mock S3 upload (needed after reset) + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + + // No manual override set (returns null) + doReturn(null).when(cloudStorage).download("manual-override.json"); + + // Setup time + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create 200 messages - exceeds threshold (20 * 5 = 100) + long oldTime = (t - 600) * 1000; // 10 minutes ago + List messages = new ArrayList<>(); + for (int i = 0; i < 200; i++) { + messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + oldTime - (i * 1000), null, null, "10.0.0." + (i % 256), null)); + } + + // Mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Act & Assert + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("skipped", result.getString("status")); + + // Expected behavior: + // SqsWindowReader hits maxMessagesPerWindow limit (100) during reading + // Circuit breaker triggers DELAYED_PROCESSING immediately + // Processing stops before any messages are counted as processed + context.assertEquals(0, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("deltas_produced")); + + // Verify manual override was set to DELAYED_PROCESSING on S3 + try { + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(String.class); + verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), pathCaptor.capture()); + + // Check if manual-override.json was uploaded + boolean overrideSet = pathCaptor.getAllValues().stream() + .anyMatch(path -> path.equals("manual-override.json")); + context.assertTrue(overrideSet, "Circuit breaker should set DELAYED_PROCESSING override"); + } catch (Exception e) { + context.fail(e); + } + + // Verify NO messages were deleted from SQS (processing stopped before completion) + verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } + @Test public void testManualOverride_notSet(TestContext context) throws Exception { Async async = context.async(); From 9dab244c2f096e9a54613842025b48ab15e85a35 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Tue, 2 Dec 2025 18:49:31 -0700 Subject: [PATCH 03/20] factgoring in the queue attributes adjustment --- .../optout/vertx/OptOutSqsLogProducer.java | 7 +- .../optout/vertx/OptOutTrafficCalculator.java | 32 ++- .../optout/vertx/SqsMessageOperations.java | 91 ++++++++ .../vertx/OptOutSqsLogProducerTest.java | 72 ++++--- .../vertx/OptOutTrafficCalculatorTest.java | 197 ++++++++++++++++++ 5 files changed, 361 insertions(+), 38 deletions(-) diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index af1d937..575371e 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -472,8 +472,11 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { } else if (getManualOverride().equals("DEFAULT")) { LOGGER.info("Manual override set to DEFAULT, skipping traffic calculation"); } else { - // check traffic calculator status - OptOutTrafficCalculator.TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(currentDeltaMessages); + // Get queue attributes (including invisible messages) for traffic calculation + SqsMessageOperations.QueueAttributes queueAttributes = SqsMessageOperations.getQueueAttributes(this.sqsClient, this.queueUrl); + + // check traffic calculator status (including invisible messages in case of multiple consumers) + OptOutTrafficCalculator.TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(currentDeltaMessages, queueAttributes); if (trafficStatus == OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING) { LOGGER.error("OptOut Delta Production has hit DELAYED_PROCESSING status, stopping production"); this.setDelayedProcessingOverride(); diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java index a7d73d3..53b7695 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java @@ -193,10 +193,26 @@ List> parseAllowlistRanges(JsonObject config) throws MalformedTraffic * Uses the newest delta file timestamp to anchor the 24-hour delta traffic window, * and the oldest queue timestamp to anchor the 5-minute queue window. * - * @param sqsMessages List of SQS messages + * @param sqsMessages List of SQS messages in the current batch * @return TrafficStatus (DELAYED_PROCESSING or DEFAULT) */ public TrafficStatus calculateStatus(List sqsMessages) { + return calculateStatus(sqsMessages, null); + } + + /** + * Calculate traffic status based on delta files, SQS queue messages, and queue attributes. + * + * Uses the newest delta file timestamp to anchor the 24-hour delta traffic window, + * and the oldest queue timestamp to anchor the 5-minute queue window. + * + * The invisible message count from queue attributes is added in case of multiple consumers. + * + * @param sqsMessages List of SQS messages in the current batch + * @param queueAttributes SQS queue attributes including invisible message count (may be null) + * @return TrafficStatus (DELAYED_PROCESSING or DEFAULT) + */ + public TrafficStatus calculateStatus(List sqsMessages, SqsMessageOperations.QueueAttributes queueAttributes) { try { // Get list of delta files from S3 (sorted newest to oldest) @@ -258,11 +274,21 @@ public TrafficStatus calculateStatus(List sqsMessages) { sum += sqsCount; } + // Add invisible messages from queue attributes (messages being processed by other consumers) + // These represent in-flight work that will soon become delta records + int invisibleMessages = 0; + if (queueAttributes != null) { + invisibleMessages = queueAttributes.getApproximateNumberOfMessagesNotVisible(); + sum += invisibleMessages; + LOGGER.info("Traffic calculation: adding {} invisible SQS messages to sum (queue: {})", + invisibleMessages, queueAttributes); + } + // Determine status TrafficStatus status = determineStatus(sum, this.baselineTraffic); - LOGGER.info("Traffic calculation complete: sum={}, baselineTraffic={}, thresholdMultiplier={}, status={}", - sum, this.baselineTraffic, this.thresholdMultiplier, status); + LOGGER.info("Traffic calculation complete: sum={} (including {} invisible), baselineTraffic={}, thresholdMultiplier={}, status={}", + sum, invisibleMessages, this.baselineTraffic, this.thresholdMultiplier, status); return status; diff --git a/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java b/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java index 6c2715b..104215c 100644 --- a/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java +++ b/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Utility class for SQS message operations. @@ -15,6 +16,96 @@ public class SqsMessageOperations { private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageOperations.class); private static final int SQS_MAX_DELETE_BATCH_SIZE = 10; + /** + * Result of getting queue attributes from SQS. + */ + public static class QueueAttributes { + private final int approximateNumberOfMessages; + private final int approximateNumberOfMessagesNotVisible; + private final int approximateNumberOfMessagesDelayed; + + public QueueAttributes(int approximateNumberOfMessages, + int approximateNumberOfMessagesNotVisible, + int approximateNumberOfMessagesDelayed) { + this.approximateNumberOfMessages = approximateNumberOfMessages; + this.approximateNumberOfMessagesNotVisible = approximateNumberOfMessagesNotVisible; + this.approximateNumberOfMessagesDelayed = approximateNumberOfMessagesDelayed; + } + + /** Number of messages available for retrieval from the queue (visible messages) */ + public int getApproximateNumberOfMessages() { + return approximateNumberOfMessages; + } + + /** Number of messages that are in flight (being processed by consumers, invisible) */ + public int getApproximateNumberOfMessagesNotVisible() { + return approximateNumberOfMessagesNotVisible; + } + + /** Number of messages in the queue that are delayed and not available yet */ + public int getApproximateNumberOfMessagesDelayed() { + return approximateNumberOfMessagesDelayed; + } + + /** Total messages in queue = visible + invisible + delayed */ + public int getTotalMessages() { + return approximateNumberOfMessages + approximateNumberOfMessagesNotVisible + approximateNumberOfMessagesDelayed; + } + + @Override + public String toString() { + return String.format("QueueAttributes{visible=%d, invisible=%d, delayed=%d, total=%d}", + approximateNumberOfMessages, approximateNumberOfMessagesNotVisible, + approximateNumberOfMessagesDelayed, getTotalMessages()); + } + } + + /** + * Gets queue attributes from SQS including message counts. + * + * @param sqsClient The SQS client + * @param queueUrl The queue URL + * @return QueueAttributes with message counts, or null if failed + */ + public static QueueAttributes getQueueAttributes(SqsClient sqsClient, String queueUrl) { + try { + GetQueueAttributesRequest request = GetQueueAttributesRequest.builder() + .queueUrl(queueUrl) + .attributeNames( + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED + ) + .build(); + + GetQueueAttributesResponse response = sqsClient.getQueueAttributes(request); + Map attrs = response.attributes(); + + int visible = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES), 0); + int invisible = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE), 0); + int delayed = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED), 0); + + QueueAttributes queueAttributes = new QueueAttributes(visible, invisible, delayed); + LOGGER.debug("Queue attributes: {}", queueAttributes); + return queueAttributes; + + } catch (Exception e) { + LOGGER.error("Error getting queue attributes from SQS", e); + return null; + } + } + + private static int parseIntOrDefault(String value, int defaultValue) { + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return defaultValue; + } + } + /** * Receives all available messages from an SQS queue up to a maximum number of batches. * diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index bfada92..5942bbe 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -16,8 +16,6 @@ import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.InputStream; import java.io.ByteArrayInputStream; import java.util.*; @@ -86,6 +84,16 @@ public void setup(TestContext context) throws Exception { doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); doAnswer(inv -> null).when(cloudStorageDroppedRequests).upload(any(InputStream.class), anyString()); + // Mock getQueueAttributes by default (returns zero messages) + Map defaultQueueAttrs = new HashMap<>(); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, "0"); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, "0"); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, "0"); + doReturn(GetQueueAttributesResponse.builder() + .attributes(defaultQueueAttrs) + .build()) + .when(sqsClient).getQueueAttributes(any(GetQueueAttributesRequest.class)); + // Don't mock download with anyString() - let tests mock specific paths as needed // Unmocked downloads will return null by default @@ -1159,12 +1167,12 @@ public void testManualOverride_default_bypassesTrafficCalculation(TestContext co public void testTrafficCalculator_detectsSpikeInCurrentWindow(TestContext context) throws Exception { Async async = context.async(); - // Use higher threshold (1000) so circuit breaker doesn't trigger before traffic calculator - // This tests the traffic calculator spike detection, not the circuit breaker + // Threshold = baseline * multiplier = 100 * 5 = 500 + // We have 610 messages, which exceeds 500, so spike should be detected String trafficCalcConfig = """ { "traffic_calc_evaluation_window_seconds": 86400, - "traffic_calc_baseline_traffic": 200, + "traffic_calc_baseline_traffic": 100, "traffic_calc_threshold_multiplier": 5, "traffic_calc_allowlist_ranges": [] } @@ -1197,31 +1205,18 @@ public void testTrafficCalculator_detectsSpikeInCurrentWindow(TestContext contex // No manual override set (returns null) doReturn(null).when(cloudStorage).download("manual-override.json"); - // Setup SQS messages in 2 different 5-minute windows - long oldTime = (t - 600) * 1000; // 10 minutes ago - - // Window 1 (oldest): Low traffic (10 messages) - within baseline, should process - List window1Messages = new ArrayList<>(); - long window1Time = oldTime - 600*1000; // 20 minutes ago - for (int i = 0; i < 10; i++) { - window1Messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, - window1Time - (i * 1000), null, null, "10.0.0." + i, null)); - } - - // Window 2: High traffic spike (600 messages) - exceeds threshold (baseline=100 * multiplier=5 = 500) - // NOTE: The traffic calculator is called with currentDeltaMessages (window 2's messages), - // so it calculates the spike AFTER processing all of window 2's messages (but the delta is not uploaded). - List window2Messages = new ArrayList<>(); - long window2Time = oldTime - 300*1000; // 15 minutes ago (5 min after window1) - for (int i = 0; i < 600; i++) { - window2Messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, - window2Time - (i * 100), null, null, "10.0.1." + (i % 256), null)); - } - - // Combine all messages (oldest first for SQS ordering) + // Setup SQS messages + long baseTime = (t - 600) * 1000; + List allMessages = new ArrayList<>(); - allMessages.addAll(window1Messages); - allMessages.addAll(window2Messages); + // Create 610 messages with timestamps spread over ~4 minutes (within the 5-minute window) + for (int i = 0; i < 610; i++) { + // Timestamps range from (t-600) to (t-600-240) seconds = t-600 to t-840 + // All within a single 5-minute window for traffic calculation, and all > 5 min old + long timestampMs = baseTime - (i * 400); // ~400ms apart going backwards, total span ~244 seconds + allMessages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + timestampMs, null, null, "10.0.0." + (i % 256), null)); + } // Mock SQS operations when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) @@ -1229,6 +1224,16 @@ public void testTrafficCalculator_detectsSpikeInCurrentWindow(TestContext contex .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Mock getQueueAttributes to return zero invisible messages (doesn't affect the spike detection) + Map queueAttrs = new HashMap<>(); + queueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, "0"); + queueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, "0"); + queueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, "0"); + doReturn(GetQueueAttributesResponse.builder() + .attributes(queueAttrs) + .build()) + .when(sqsClient).getQueueAttributes(any(GetQueueAttributesRequest.class)); int port = Const.Port.ServicePortForOptOut + 1; @@ -1254,10 +1259,11 @@ public void testTrafficCalculator_detectsSpikeInCurrentWindow(TestContext contex context.assertEquals("skipped", result.getString("status")); // Expected behavior: - // SqsWindowReader groups all 610 messages together (window 1 + window 2) - // Traffic calculator detects spike (>=500 threshold) → DELAYED_PROCESSING - // encodeSkippedResult preserves the actual counts - context.assertEquals(610, result.getInteger("entries_processed")); + // All 610 messages are within a single 5-minute window + // Traffic calculator counts them all and detects spike (>=500 threshold) + // DELAYED_PROCESSING is triggered, no delta uploaded + // The entries_processed count reflects how many were read before spike detection + context.assertTrue(result.getInteger("entries_processed") <= 610); context.assertEquals(0, result.getInteger("deltas_produced")); // Verify manual override was set to DELAYED_PROCESSING on S3 diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java index 0824c3b..9e3a351 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java @@ -1357,4 +1357,201 @@ void testCalculateStatus_timestampsCached() throws Exception { assertEquals(2, stats.get("total_cached_timestamps")); } + // ============================================================================ + // SECTION 10: Tests for calculateStatus() with QueueAttributes + // ============================================================================ + + /** + * Create a QueueAttributes object for testing + */ + private SqsMessageOperations.QueueAttributes createQueueAttributes(int visible, int invisible, int delayed) { + return new SqsMessageOperations.QueueAttributes(visible, invisible, delayed); + } + + @Test + void testCalculateStatus_withQueueAttributes_nullAttributes() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - null queue attributes should behave same as single-parameter version + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null); + + // Assert - DEFAULT (same as without queue attributes) + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_withQueueAttributes_invisibleMessagesAddedToSum() throws Exception { + // Setup - create delta files with entries just under threshold + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create 490 entries (just under threshold of 500 = 5 * 100) + List timestamps = new ArrayList<>(); + for (int i = 0; i < 490; i++) { + timestamps.add(t - 23*3600 + i * 60); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - without invisible messages, should be DEFAULT + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus statusWithoutInvisible = calculator.calculateStatus(sqsMessages, null); + + // With invisible messages that push over threshold, should be DELAYED_PROCESSING + // 490 (delta) + 1 (sqs) + 10 (invisible) = 501 >= 500 + SqsMessageOperations.QueueAttributes queueAttributes = createQueueAttributes(5, 10, 0); + OptOutTrafficCalculator.TrafficStatus statusWithInvisible = calculator.calculateStatus(sqsMessages, queueAttributes); + + // Assert + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, statusWithoutInvisible); + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, statusWithInvisible); + } + + @Test + void testCalculateStatus_withQueueAttributes_zeroInvisibleMessages() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600, t - 7200); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - queue attributes with zero invisible messages + List sqsMessages = Arrays.asList(createSqsMessage(t)); + SqsMessageOperations.QueueAttributes queueAttributes = createQueueAttributes(100, 0, 50); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes); + + // Assert - same result as without queue attributes (only invisible is added to sum) + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_withQueueAttributes_largeInvisibleCount() throws Exception { + // Setup - create delta files with minimal entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - large number of invisible messages alone should trigger DELAYED_PROCESSING + // threshold = 5 * 100 = 500 + // 1 (delta) + 1 (sqs) + 500 (invisible) = 502 >= 500 + List sqsMessages = Arrays.asList(createSqsMessage(t)); + SqsMessageOperations.QueueAttributes queueAttributes = createQueueAttributes(0, 500, 0); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes); + + // Assert - DELAYED_PROCESSING due to invisible messages + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); + } + + @Test + void testCalculateStatus_withQueueAttributes_delayedMessagesNotAdded() throws Exception { + // Setup - create delta files with entries near threshold + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create 495 entries + List timestamps = new ArrayList<>(); + for (int i = 0; i < 495; i++) { + timestamps.add(t - 23*3600 + i * 60); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - delayed messages should NOT be added to sum (only invisible) + // 495 (delta) + 1 (sqs) + 0 (invisible) = 496 < 500 + // If delayed was added: 496 + 1000 = 1496 >= 500 (would fail) + List sqsMessages = Arrays.asList(createSqsMessage(t)); + SqsMessageOperations.QueueAttributes queueAttributes = createQueueAttributes(100, 0, 1000); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes); + + // Assert - DEFAULT (delayed messages not counted) + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + // ============================================================================ + // SECTION 11: Tests for QueueAttributes class + // ============================================================================ + + @Test + void testQueueAttributes_getters() { + // Setup + SqsMessageOperations.QueueAttributes attrs = createQueueAttributes(10, 20, 5); + + // Assert + assertEquals(10, attrs.getApproximateNumberOfMessages()); + assertEquals(20, attrs.getApproximateNumberOfMessagesNotVisible()); + assertEquals(5, attrs.getApproximateNumberOfMessagesDelayed()); + assertEquals(35, attrs.getTotalMessages()); + } + + @Test + void testQueueAttributes_toString() { + // Setup + SqsMessageOperations.QueueAttributes attrs = createQueueAttributes(100, 50, 25); + + // Act + String str = attrs.toString(); + + // Assert - should contain all values + assertTrue(str.contains("visible=100")); + assertTrue(str.contains("invisible=50")); + assertTrue(str.contains("delayed=25")); + assertTrue(str.contains("total=175")); + } + + @Test + void testQueueAttributes_zeroValues() { + // Setup + SqsMessageOperations.QueueAttributes attrs = createQueueAttributes(0, 0, 0); + + // Assert + assertEquals(0, attrs.getApproximateNumberOfMessages()); + assertEquals(0, attrs.getApproximateNumberOfMessagesNotVisible()); + assertEquals(0, attrs.getApproximateNumberOfMessagesDelayed()); + assertEquals(0, attrs.getTotalMessages()); + } + } From 0fbbc6f245f253cde04ccc2ab2aa76d5c99080ab Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Thu, 4 Dec 2025 19:37:34 +0000 Subject: [PATCH 04/20] [CI Pipeline] Released Snapshot version: 4.5.1-alpha-114-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2d85c1f..5844231 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.0 + 4.5.1-alpha-114-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From 18262c365f2f92564a7e8445662ede8ed33060ce Mon Sep 17 00:00:00 2001 From: Ian Nara <135270994+Ian-Nara@users.noreply.github.com> Date: Thu, 4 Dec 2025 12:39:30 -0700 Subject: [PATCH 05/20] Update .trivyignore --- .trivyignore | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/.trivyignore b/.trivyignore index 223846f..374b230 100644 --- a/.trivyignore +++ b/.trivyignore @@ -5,5 +5,8 @@ # UID2-6097 CVE-2025-59375 exp:2025-12-15 -# UID2-6128 -CVE-2025-55163 exp:2025-11-30 +# UID2-6340 +CVE-2025-64720 exp:2025-12-16 + +# UID2-6340 +CVE-2025-65018 exp:2025-12-16 From f7cecdee81ae5b43e1902a13dfcd4521aae0b66f Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Thu, 4 Dec 2025 19:41:54 +0000 Subject: [PATCH 06/20] [CI Pipeline] Released Snapshot version: 4.5.2-alpha-115-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 5844231..48e5552 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.1-alpha-114-SNAPSHOT + 4.5.2-alpha-115-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From d23c792bf56f65e6c259ad5f25b8c16d7f0dff1b Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Thu, 4 Dec 2025 14:05:40 -0700 Subject: [PATCH 07/20] update naming --- .../vertx/OptOutSqsLogProducerTest.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index 5942bbe..172c71c 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -101,7 +101,7 @@ public void setup(TestContext context) throws Exception { try { String traficFilterConfig = """ { - "blacklist_requests": [ + "denylist_requests": [ ] } """; @@ -571,11 +571,11 @@ public void testDeltaProduceEndpoint_autoClearCompletedJob(TestContext context) public void testTrafficFilter_blacklistedMessagesAreDropped(TestContext context) throws Exception { Async async = context.async(); - // Setup - update traffic filter config to blacklist specific IP and time range + // Setup - update traffic filter config to denyhlist specific IP and time range long baseTime = System.currentTimeMillis() / 1000 - 400; // 400 seconds ago String filterConfig = String.format(""" { - "blacklist_requests": [ + "denylist_requests": [ { "range": [%d, %d], "IPs": ["192.168.1.100"] @@ -586,8 +586,8 @@ public void testTrafficFilter_blacklistedMessagesAreDropped(TestContext context) createTrafficConfigFile(filterConfig); // Setup - create messages: some blacklisted, some not - long blacklistedTime = (baseTime) * 1000; // Within blacklist range - long normalTime = (baseTime - 200) * 1000; // Outside blacklist range + long blacklistedTime = (baseTime) * 1000; // Within denyhlist range + long normalTime = (baseTime - 200) * 1000; // Outside denyhlist range List messages = Arrays.asList( // These should be dropped (blacklisted) createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime, null, null, "192.168.1.100", null), @@ -653,7 +653,7 @@ public void testTrafficFilter_noBlacklistedMessages(TestContext context) throws long baseTime = System.currentTimeMillis() / 1000 - 400; String filterConfig = String.format(""" { - "blacklist_requests": [ + "denylist_requests": [ { "range": [%d, %d], "IPs": ["192.168.1.100"] @@ -663,7 +663,7 @@ public void testTrafficFilter_noBlacklistedMessages(TestContext context) throws """, baseTime - 100, baseTime + 100); createTrafficConfigFile(filterConfig); - // Setup - create messages that don't match blacklist + // Setup - create messages that don't match denyhlist long normalTime = (baseTime - 200) * 1000; List messages = Arrays.asList( createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime, null, null, "10.0.0.1", null), @@ -721,7 +721,7 @@ public void testTrafficFilter_allMessagesBlacklisted(TestContext context) throws long baseTime = System.currentTimeMillis() / 1000 - 400; String filterConfig = String.format(""" { - "blacklist_requests": [ + "denylist_requests": [ { "range": [%d, %d], "IPs": ["192.168.1.100"] @@ -793,7 +793,7 @@ public void testTrafficFilter_messagesWithoutClientIp(TestContext context) throw long baseTime = System.currentTimeMillis() / 1000 - 400; String filterConfig = String.format(""" { - "blacklist_requests": [ + "denylist_requests": [ { "range": [%d, %d], "IPs": ["192.168.1.100"] @@ -851,10 +851,10 @@ public void testTrafficFilter_messagesWithoutClientIp(TestContext context) throw public void testTrafficFilterConfig_reloadOnEachBatch(TestContext context) throws Exception { Async async = context.async(); - // Setup - initial config with no blacklist + // Setup - initial config with no denyhlist String initialConfig = """ { - "blacklist_requests": [] + "denylist_requests": [] } """; createTrafficConfigFile(initialConfig); @@ -895,12 +895,12 @@ public void testTrafficFilterConfig_reloadOnEachBatch(TestContext context) throw context.assertEquals(1, result.getInteger("entries_processed")); context.assertEquals(0, result.getInteger("dropped_requests_processed")); - // Update config to blacklist the IP + // Update config to denyhlist the IP try { long baseTime = System.currentTimeMillis() / 1000 - 400; String updatedConfig = String.format(""" { - "blacklist_requests": [ + "denylist_requests": [ { "range": [%d, %d], "IPs": ["192.168.1.100"] From d03eb732946f3c64d4d6714fbf6d60a2f545ed15 Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Thu, 4 Dec 2025 21:08:40 +0000 Subject: [PATCH 08/20] [CI Pipeline] Released Snapshot version: 4.5.3-alpha-117-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 48e5552..6b11c19 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.2-alpha-115-SNAPSHOT + 4.5.3-alpha-117-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From b09ac28fc31addb40d82c351bc6353232820795a Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Thu, 4 Dec 2025 15:11:07 -0700 Subject: [PATCH 09/20] update logging --- .../com/uid2/optout/vertx/OptOutServiceVerticle.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java index 7507323..e01c49e 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java @@ -365,21 +365,23 @@ private void handleQueue(RoutingContext routingContext) { String email = body != null ? body.getString(EMAIL) : null; String phone = body != null ? body.getString(PHONE) : null; - HttpServerResponse resp = routingContext.response(); - // while old delta production is enabled, response is handled by replicate logic // Validate parameters - same as replicate if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) { + LOGGER.warn("handleQueue: Invalid identity_hash parameter"); // this.sendBadRequestError(resp); return; } if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) { + LOGGER.warn("handleQueue: Invalid advertising_id parameter"); + // this.sendBadRequestError(resp); return; } if (!this.isGetOrPost(req)) { + LOGGER.warn("handleQueue: Invalid HTTP method: {}", req.method()); // this.sendBadRequestError(resp); return; } @@ -408,8 +410,6 @@ private void handleQueue(RoutingContext routingContext) { } }, res -> { if (res.failed()) { - // this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage()); - LOGGER.error("Failed to queue message: " + res.cause().getMessage()); } else { String messageId = (String) res.result(); @@ -426,8 +426,7 @@ private void handleQueue(RoutingContext routingContext) { } }); } catch (Exception ex) { - // this.sendInternalServerError(resp, ex.getMessage()); - LOGGER.error("Error processing queue request: " + ex.getMessage(), ex); + LOGGER.error("handleQueue: Error processing queue request"); } } From de8ae4a999ab38dcf2114156ef245bdb3cb19a13 Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Thu, 4 Dec 2025 22:13:57 +0000 Subject: [PATCH 10/20] [CI Pipeline] Released Snapshot version: 4.5.4-alpha-118-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 6b11c19..4bab073 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.3-alpha-117-SNAPSHOT + 4.5.4-alpha-118-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From 7603858972a3809e67607571f8903bb430416b49 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Fri, 5 Dec 2025 14:19:16 -0700 Subject: [PATCH 11/20] Update name --- .../optout/vertx/OptOutSqsLogProducer.java | 4 +-- .../vertx/OptOutSqsLogProducerTest.java | 36 +++++++++---------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index 24d5ab7..b1fc864 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -488,13 +488,13 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { } } - // Upload delta file if there are non-blacklisted messages + // Upload delta file if there are non-denylisted messages if (!currentDeltaMessages.isEmpty()) { uploadDeltaAndDeleteMessages(deltaStream, deltaName, windowStart, currentDeltaMessages); deltasProduced++; } - // Upload dropped request file if there are blacklisted messages + // Upload dropped request file if there are denylisted messages if (!droppedRequestMessages.isEmpty()) { this.uploadDroppedRequestsAndDeleteMessages(droppedRequestStream, currentDroppedRequestName, windowStart, droppedRequestMessages); droppedRequestFilesProduced++; diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index 172c71c..7b5900f 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -568,7 +568,7 @@ public void testDeltaProduceEndpoint_autoClearCompletedJob(TestContext context) } @Test - public void testTrafficFilter_blacklistedMessagesAreDropped(TestContext context) throws Exception { + public void testTrafficFilter_denylistedMessagesAreDropped(TestContext context) throws Exception { Async async = context.async(); // Setup - update traffic filter config to denyhlist specific IP and time range @@ -585,13 +585,13 @@ public void testTrafficFilter_blacklistedMessagesAreDropped(TestContext context) """, baseTime - 100, baseTime + 100); createTrafficConfigFile(filterConfig); - // Setup - create messages: some blacklisted, some not - long blacklistedTime = (baseTime) * 1000; // Within denyhlist range + // Setup - create messages: some denylisted, some not + long denylistedTime = (baseTime) * 1000; // Within denyhlist range long normalTime = (baseTime - 200) * 1000; // Outside denyhlist range List messages = Arrays.asList( - // These should be dropped (blacklisted) - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime, null, null, "192.168.1.100", null), - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime + 1000, null, null, "192.168.1.100", null), + // These should be dropped (denylisted) + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, denylistedTime, null, null, "192.168.1.100", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, denylistedTime + 1000, null, null, "192.168.1.100", null), // These should be processed normally createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime, null, null, "10.0.0.1", null), createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime + 1000, null, null, "10.0.0.2", null) @@ -649,7 +649,7 @@ public void testTrafficFilter_blacklistedMessagesAreDropped(TestContext context) public void testTrafficFilter_noBlacklistedMessages(TestContext context) throws Exception { Async async = context.async(); - // Setup - traffic filter with a blacklisted IP + // Setup - traffic filter with a denylisted IP long baseTime = System.currentTimeMillis() / 1000 - 400; String filterConfig = String.format(""" { @@ -717,7 +717,7 @@ public void testTrafficFilter_noBlacklistedMessages(TestContext context) throws public void testTrafficFilter_allMessagesBlacklisted(TestContext context) throws Exception { Async async = context.async(); - // Setup - traffic filter with a blacklisted IP + // Setup - traffic filter with a denylisted IP long baseTime = System.currentTimeMillis() / 1000 - 400; String filterConfig = String.format(""" { @@ -731,11 +731,11 @@ public void testTrafficFilter_allMessagesBlacklisted(TestContext context) throws """, baseTime - 100, baseTime + 100); createTrafficConfigFile(filterConfig); - // Setup - create messages that are blacklisted - long blacklistedTime = baseTime * 1000; + // Setup - create messages that are denylisted + long denylistedTime = baseTime * 1000; List messages = Arrays.asList( - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime, null, null, "192.168.1.100", null), - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, blacklistedTime + 1000, null, null, "192.168.1.100", null) + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, denylistedTime, null, null, "192.168.1.100", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, denylistedTime + 1000, null, null, "192.168.1.100", null) ); // Setup - mock SQS operations @@ -767,7 +767,7 @@ public void testTrafficFilter_allMessagesBlacklisted(TestContext context) throws JsonObject result = finalStatus.getJsonObject("result"); context.assertEquals("success", result.getString("status")); - // No entries processed (all blacklisted) + // No entries processed (all denylisted) context.assertEquals(0, result.getInteger("entries_processed")); // All messages dropped @@ -789,7 +789,7 @@ public void testTrafficFilter_allMessagesBlacklisted(TestContext context) throws public void testTrafficFilter_messagesWithoutClientIp(TestContext context) throws Exception { Async async = context.async(); - // Setup - traffic filter with a blacklisted IP + // Setup - traffic filter with a denylisted IP long baseTime = System.currentTimeMillis() / 1000 - 400; String filterConfig = String.format(""" { @@ -803,7 +803,7 @@ public void testTrafficFilter_messagesWithoutClientIp(TestContext context) throw """, baseTime - 100, baseTime + 100); createTrafficConfigFile(filterConfig); - // Create messages without client IP (should not be blacklisted) + // Create messages without client IP (should not be denylisted) long time = baseTime * 1000; List messages = Arrays.asList( createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, time, null, null, null, null) @@ -839,7 +839,7 @@ public void testTrafficFilter_messagesWithoutClientIp(TestContext context) throw JsonObject result = finalStatus.getJsonObject("result"); context.assertEquals("success", result.getString("status")); - // Message should be processed (not blacklisted due to missing IP) + // Message should be processed (not denylisted due to missing IP) context.assertEquals(1, result.getInteger("entries_processed")); context.assertEquals(0, result.getInteger("dropped_requests_processed")); @@ -923,7 +923,7 @@ public void testTrafficFilterConfig_reloadOnEachBatch(TestContext context) throw doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); doAnswer(inv -> null).when(cloudStorageDroppedRequests).upload(any(InputStream.class), anyString()); - // Act & Assert - second request - should now be blacklisted + // Act & Assert - second request - should now be denylisted vertx.createHttpClient() .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", Endpoints.OPTOUT_DELTA_PRODUCE.toString()) @@ -942,7 +942,7 @@ public void testTrafficFilterConfig_reloadOnEachBatch(TestContext context) throw .onComplete(context.asyncAssertSuccess(finalStatus2 -> { context.assertEquals("completed", finalStatus2.getString("state")); JsonObject result2 = finalStatus2.getJsonObject("result"); - // Now should be blacklisted + // Now should be denylisted context.assertEquals(0, result2.getInteger("entries_processed")); context.assertEquals(1, result2.getInteger("dropped_requests_processed")); async.complete(); From 29a16acbe2d952d8d1de9b401216af82e7af9b10 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Fri, 5 Dec 2025 14:46:50 -0700 Subject: [PATCH 12/20] remove if block for DEFAULT --- src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index b1fc864..2228e8b 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -473,8 +473,6 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { if (getManualOverride().equals("DELAYED_PROCESSING")) { LOGGER.info("Manual override set to DELAYED_PROCESSING, stopping production"); return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, true); - } else if (getManualOverride().equals("DEFAULT")) { - LOGGER.info("Manual override set to DEFAULT, skipping traffic calculation"); } else { // Get queue attributes (including invisible messages) for traffic calculation SqsMessageOperations.QueueAttributes queueAttributes = SqsMessageOperations.getQueueAttributes(this.sqsClient, this.queueUrl); From a200825452de669a77ba150cf2c882b72988733e Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Fri, 5 Dec 2025 21:51:04 +0000 Subject: [PATCH 13/20] [CI Pipeline] Released Snapshot version: 4.5.5-alpha-119-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index f049a39..2c9c8a3 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.4-alpha-118-SNAPSHOT + 4.5.5-alpha-119-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From bcf170952af7dfe094f4b5b2818e02de02df810e Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Fri, 5 Dec 2025 23:47:58 +0000 Subject: [PATCH 14/20] [CI Pipeline] Released Snapshot version: 4.5.6-alpha-120-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2c9c8a3..1392438 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.5-alpha-119-SNAPSHOT + 4.5.6-alpha-120-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From 5b7124ee674c365f0291dcc8ad400ae6a147412f Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Fri, 5 Dec 2025 16:51:49 -0700 Subject: [PATCH 15/20] commons logging missing? --- pom.xml | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 2c9c8a3..a75be3c 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.5-alpha-119-SNAPSHOT + 4.5.4-alpha-118-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout @@ -160,6 +160,11 @@ software.amazon.awssdk sqs + + commons-logging + commons-logging + 1.2 + From 1e631350e432264b209dc3dd8bea1cfecf9d9db9 Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Fri, 5 Dec 2025 23:56:13 +0000 Subject: [PATCH 16/20] [CI Pipeline] Released Snapshot version: 4.5.7-alpha-121-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index c8d70f5..b9ea09a 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.6-alpha-120-SNAPSHOT + 4.5.7-alpha-121-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From 5442674473e7ce469c18f53bbac058d934c04951 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Fri, 5 Dec 2025 17:40:41 -0700 Subject: [PATCH 17/20] whitespace --- .trivyignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.trivyignore b/.trivyignore index 11f08b2..82ae41f 100644 --- a/.trivyignore +++ b/.trivyignore @@ -10,3 +10,4 @@ CVE-2025-64720 exp:2026-06-05 # UID2-6340 CVE-2025-65018 exp:2026-06-05 + From d2364e0424837e204603ec6a4a713ba060f59e37 Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Fri, 5 Dec 2025 18:40:08 -0700 Subject: [PATCH 18/20] improve config error logging details --- .../com/uid2/optout/vertx/OptOutTrafficFilter.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java index e8bd04b..f9ba816 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java @@ -123,6 +123,12 @@ List parseFilterRules(JsonObject config) throws MalformedTraf range.add(end); } + // log error and throw exception if range is not 2 elements + if (range.size() != 2) { + LOGGER.error("Invalid traffic filter rule: range is not 2 elements: {}", ruleJson.encode()); + throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule: range is not 2 elements"); + } + // parse IPs var ipAddressesJson = ruleJson.getJsonArray("IPs"); List ipAddresses = new ArrayList<>(); @@ -132,8 +138,14 @@ List parseFilterRules(JsonObject config) throws MalformedTraf } } + // log error and throw exception if IPs is empty + if (ipAddresses.size() == 0) { + LOGGER.error("Invalid traffic filter rule: IPs is empty: {}", ruleJson.encode()); + throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule: IPs is empty"); + } + // log error and throw exception if rule is invalid - if (range.size() != 2 || ipAddresses.size() == 0 || range.get(1) - range.get(0) > 86400) { // range must be 24 hours or less + if (range.get(1) - range.get(0) > 86400) { // range must be 24 hours or less LOGGER.error("Invalid traffic filter rule, range must be 24 hours or less: {}", ruleJson.encode()); throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule, range must be 24 hours or less"); } From 03fe8a9dc3a7d2726ff0f152a231f243604d0a70 Mon Sep 17 00:00:00 2001 From: Release Workflow Date: Sat, 6 Dec 2025 01:43:40 +0000 Subject: [PATCH 19/20] [CI Pipeline] Released Snapshot version: 4.5.8-alpha-122-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index b9ea09a..21b5420 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.7-alpha-121-SNAPSHOT + 4.5.8-alpha-122-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout From 1bc84d5fef6926919e3b3e3df5c2217dca26d74f Mon Sep 17 00:00:00 2001 From: Ian-Nara Date: Sat, 6 Dec 2025 16:15:42 -0700 Subject: [PATCH 20/20] remove unneeded variable --- .../java/com/uid2/optout/vertx/OptOutSqsLogProducer.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index 2228e8b..e2afddf 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -92,7 +92,6 @@ public class OptOutSqsLogProducer extends AbstractVerticle { private final int visibilityTimeout; private final int deltaWindowSeconds; // Time window for each delta file (5 minutes = 300 seconds) private final int jobTimeoutSeconds; - private final int maxMessagesPerFile; // Memory protection: max messages per delta file private final int listenPort; private final String internalApiKey; private final InternalAuthMiddleware internalAuth; @@ -129,10 +128,6 @@ public class OptOutSqsLogProducer extends AbstractVerticle { // Helper for reading complete 5-minute windows from SQS private final SqsWindowReader windowReader; - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync) throws IOException, MalformedTrafficCalcConfigException, MalformedTrafficFilterConfigException { - this(jsonConfig, cloudStorage, cloudSync, Const.Event.DeltaProduce); - } - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced) throws IOException, MalformedTrafficCalcConfigException, MalformedTrafficFilterConfigException { this(jsonConfig, cloudStorage, null, cloudSync, eventDeltaProduced, null); } @@ -160,7 +155,6 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, I this.visibilityTimeout = jsonConfig.getInteger(Const.Config.OptOutSqsVisibilityTimeoutProp, 240); // 4 minutes default this.deltaWindowSeconds = 300; // Fixed 5 minutes for all deltas this.jobTimeoutSeconds = jsonConfig.getInteger(Const.Config.OptOutDeltaJobTimeoutSecondsProp, 10800); // 3 hours default - this.maxMessagesPerFile = jsonConfig.getInteger(Const.Config.OptOutMaxMessagesPerFileProp, 10000); // Memory protection limit // HTTP server configuration - use port offset + 1 to avoid conflicts this.listenPort = Const.Port.ServicePortForOptOut + Utils.getPortOffset() + 1; @@ -181,7 +175,6 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, I this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout, this.deltaWindowSeconds, this.trafficCalculator.getThreshold() ); - LOGGER.info("OptOutSqsLogProducer initialized with maxMessagesPerFile: {}", this.maxMessagesPerFile); } @Override @@ -414,7 +407,7 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { boolean stoppedDueToRecentMessages = false; long jobStartTime = OptOutUtils.nowEpochSeconds(); - LOGGER.info("Starting delta production from SQS queue (maxMessagesPerFile: {})", this.maxMessagesPerFile); + LOGGER.info("Starting delta production from SQS queue"); // Read and process windows until done while (true) {