diff --git a/pom.xml b/pom.xml index 7e3305b..21b5420 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ com.uid2 uid2-optout - 4.5.0 + 4.5.8-alpha-122-SNAPSHOT uid2-optout https://github.com/IABTechLab/uid2-optout @@ -160,6 +160,11 @@ software.amazon.awssdk sqs + + commons-logging + commons-logging + 1.2 + diff --git a/src/main/java/com/uid2/optout/Main.java b/src/main/java/com/uid2/optout/Main.java index dbccd32..f6bf5ca 100644 --- a/src/main/java/com/uid2/optout/Main.java +++ b/src/main/java/com/uid2/optout/Main.java @@ -1,6 +1,8 @@ package com.uid2.optout; import com.uid2.optout.vertx.*; +import com.uid2.optout.vertx.OptOutTrafficFilter.MalformedTrafficFilterConfigException; +import com.uid2.optout.vertx.OptOutTrafficCalculator.MalformedTrafficCalcConfigException; import com.uid2.shared.ApplicationVersion; import com.uid2.shared.Utils; import com.uid2.shared.attest.AttestationResponseHandler; @@ -27,7 +29,6 @@ import io.vertx.config.ConfigRetriever; import io.vertx.core.*; import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.http.impl.HttpUtils; import io.vertx.core.json.JsonObject; import io.vertx.micrometer.MetricsDomain; import org.slf4j.Logger; @@ -296,14 +297,23 @@ public void run(String[] args) throws IOException { fsSqs = CloudUtils.createStorage(optoutBucket, sqsConfig); } - // Deploy SQS log producer with its own storage instance - OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, sqsCs); + // Create SQS-specific cloud storage instance for dropped requests (different bucket) + String optoutBucketDroppedRequests = this.config.getString(Const.Config.OptOutS3BucketDroppedRequestsProp); + ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, config); + + // Deploy SQS log producer with its own storage instance + OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null); futs.add(this.deploySingleInstance(sqsLogProducer)); LOGGER.info("SQS log producer deployed - bucket: {}, folder: {}", this.config.getString(Const.Config.OptOutS3BucketProp), sqsFolder); } catch (IOException e) { LOGGER.error("Failed to initialize SQS log producer: " + e.getMessage(), e); + LOGGER.error("Failed to initialize SQS log producer, delta production will be disabled: " + e.getMessage(), e); + } catch (MalformedTrafficFilterConfigException e) { + LOGGER.error("The traffic filter config is malformed, refusing to process messages, delta production will be disabled: " + e.getMessage(), e); + } catch (MalformedTrafficCalcConfigException e) { + LOGGER.error("The traffic calc config is malformed, refusing to process messages, delta production will be disabled: " + e.getMessage(), e); } } diff --git a/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java b/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java index 7f2eb8f..47af49a 100644 --- a/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java +++ b/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java @@ -1,23 +1,32 @@ package com.uid2.optout.vertx; +import io.vertx.core.json.JsonObject; + /** - * Result object containing statistics from delta production. + * Data class containing statistics from delta production. + * + * This class holds the counts and provides JSON encoding methods. + * API response status is determined by the caller based on these statistics. */ public class DeltaProductionResult { private final int deltasProduced; private final int entriesProcessed; - + private final int droppedRequestFilesProduced; + private final int droppedRequestsProcessed; + /* * indicates that there are still messages in the queue, however, * not enough time has elapsed to produce a delta file. * We produce in batches of (5 minutes) */ - private final boolean stoppedDueToMessagesTooRecent; + private final boolean stoppedDueToRecentMessages; - public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToMessagesTooRecent) { + public DeltaProductionResult(int deltasProduced, int entriesProcessed, int droppedRequestFilesProduced, int droppedRequestsProcessed, boolean stoppedDueToRecentMessages) { this.deltasProduced = deltasProduced; this.entriesProcessed = entriesProcessed; - this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent; + this.droppedRequestFilesProduced = droppedRequestFilesProduced; + this.droppedRequestsProcessed = droppedRequestsProcessed; + this.stoppedDueToRecentMessages = stoppedDueToRecentMessages; } public int getDeltasProduced() { @@ -28,8 +37,40 @@ public int getEntriesProcessed() { return entriesProcessed; } - public boolean stoppedDueToMessagesTooRecent() { - return stoppedDueToMessagesTooRecent; + public boolean stoppedDueToRecentMessages() { + return stoppedDueToRecentMessages; + } + + public int getDroppedRequestFilesProduced() { + return droppedRequestFilesProduced; + } + + public int getDroppedRequestsProcessed() { + return droppedRequestsProcessed; + } + + /** + * Convert to JSON with just the production counts. + */ + public JsonObject toJson() { + return new JsonObject() + .put("deltas_produced", deltasProduced) + .put("entries_processed", entriesProcessed) + .put("dropped_request_files_produced", droppedRequestFilesProduced) + .put("dropped_requests_processed", droppedRequestsProcessed); + } + + /** + * Convert to JSON with status and counts. + */ + public JsonObject toJsonWithStatus(String status) { + return toJson().put("status", status); } -} + /** + * Convert to JSON with status, reason/error, and counts. + */ + public JsonObject toJsonWithStatus(String status, String reasonKey, String reasonValue) { + return toJsonWithStatus(status).put(reasonKey, reasonValue); + } +} diff --git a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java index 7507323..e01c49e 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java @@ -365,21 +365,23 @@ private void handleQueue(RoutingContext routingContext) { String email = body != null ? body.getString(EMAIL) : null; String phone = body != null ? body.getString(PHONE) : null; - HttpServerResponse resp = routingContext.response(); - // while old delta production is enabled, response is handled by replicate logic // Validate parameters - same as replicate if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) { + LOGGER.warn("handleQueue: Invalid identity_hash parameter"); // this.sendBadRequestError(resp); return; } if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) { + LOGGER.warn("handleQueue: Invalid advertising_id parameter"); + // this.sendBadRequestError(resp); return; } if (!this.isGetOrPost(req)) { + LOGGER.warn("handleQueue: Invalid HTTP method: {}", req.method()); // this.sendBadRequestError(resp); return; } @@ -408,8 +410,6 @@ private void handleQueue(RoutingContext routingContext) { } }, res -> { if (res.failed()) { - // this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage()); - LOGGER.error("Failed to queue message: " + res.cause().getMessage()); } else { String messageId = (String) res.result(); @@ -426,8 +426,7 @@ private void handleQueue(RoutingContext routingContext) { } }); } catch (Exception ex) { - // this.sendInternalServerError(resp, ex.getMessage()); - LOGGER.error("Error processing queue request: " + ex.getMessage(), ex); + LOGGER.error("handleQueue: Error processing queue request"); } } diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index 910a2c6..e2afddf 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -2,6 +2,8 @@ import com.uid2.optout.Const; import com.uid2.optout.auth.InternalAuthMiddleware; +import com.uid2.optout.vertx.OptOutTrafficCalculator.MalformedTrafficCalcConfigException; +import com.uid2.optout.vertx.OptOutTrafficFilter.MalformedTrafficFilterConfigException; import com.uid2.shared.Utils; import com.uid2.shared.cloud.ICloudStorage; import com.uid2.shared.health.HealthComponent; @@ -14,6 +16,9 @@ import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; +import io.vertx.core.json.JsonArray; +import java.time.Instant; +import java.time.temporal.ChronoUnit; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; @@ -22,6 +27,7 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.ArrayList; @@ -80,15 +86,18 @@ public class OptOutSqsLogProducer extends AbstractVerticle { private final String eventDeltaProduced; private final int replicaId; private final ICloudStorage cloudStorage; + private final ICloudStorage cloudStorageDroppedRequests; private final OptOutCloudSync cloudSync; private final int maxMessagesPerPoll; private final int visibilityTimeout; private final int deltaWindowSeconds; // Time window for each delta file (5 minutes = 300 seconds) private final int jobTimeoutSeconds; - private final int maxMessagesPerFile; // Memory protection: max messages per delta file private final int listenPort; private final String internalApiKey; private final InternalAuthMiddleware internalAuth; + private final OptOutTrafficFilter trafficFilter; + private final OptOutTrafficCalculator trafficCalculator; + private final String manualOverrideS3Path; private Counter counterDeltaProduced = Counter .builder("uid2_optout_sqs_delta_produced_total") @@ -100,6 +109,16 @@ public class OptOutSqsLogProducer extends AbstractVerticle { .description("counter for how many optout entries are processed from SQS") .register(Metrics.globalRegistry); + private Counter counterDroppedRequestFilesProduced = Counter + .builder("uid2_optout_sqs_dropped_request_files_produced_total") + .description("counter for how many optout dropped request files are produced from SQS") + .register(Metrics.globalRegistry); + + private Counter counterDroppedRequestsProcessed = Counter + .builder("uid2_optout_sqs_dropped_requests_processed_total") + .description("counter for how many optout dropped requests are processed from SQS") + .register(Metrics.globalRegistry); + private ByteBuffer buffer; private boolean shutdownInProgress = false; @@ -109,19 +128,16 @@ public class OptOutSqsLogProducer extends AbstractVerticle { // Helper for reading complete 5-minute windows from SQS private final SqsWindowReader windowReader; - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync) throws IOException { - this(jsonConfig, cloudStorage, cloudSync, Const.Event.DeltaProduce); - } - - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced) throws IOException { - this(jsonConfig, cloudStorage, cloudSync, eventDeltaProduced, null); + public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced) throws IOException, MalformedTrafficCalcConfigException, MalformedTrafficFilterConfigException { + this(jsonConfig, cloudStorage, null, cloudSync, eventDeltaProduced, null); } // Constructor for testing - allows injecting mock SqsClient - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced, SqsClient sqsClient) throws IOException { + public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, ICloudStorage cloudStorageDroppedRequests, OptOutCloudSync cloudSync, String eventDeltaProduced, SqsClient sqsClient) throws IOException, MalformedTrafficCalcConfigException, MalformedTrafficFilterConfigException { this.eventDeltaProduced = eventDeltaProduced; this.replicaId = OptOutUtils.getReplicaId(jsonConfig); this.cloudStorage = cloudStorage; + this.cloudStorageDroppedRequests = cloudStorageDroppedRequests; this.cloudSync = cloudSync; // Initialize SQS client @@ -139,7 +155,6 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O this.visibilityTimeout = jsonConfig.getInteger(Const.Config.OptOutSqsVisibilityTimeoutProp, 240); // 4 minutes default this.deltaWindowSeconds = 300; // Fixed 5 minutes for all deltas this.jobTimeoutSeconds = jsonConfig.getInteger(Const.Config.OptOutDeltaJobTimeoutSecondsProp, 10800); // 3 hours default - this.maxMessagesPerFile = jsonConfig.getInteger(Const.Config.OptOutMaxMessagesPerFileProp, 10000); // Memory protection limit // HTTP server configuration - use port offset + 1 to avoid conflicts this.listenPort = Const.Port.ServicePortForOptOut + Utils.getPortOffset() + 1; @@ -150,13 +165,16 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O int bufferSize = jsonConfig.getInteger(Const.Config.OptOutProducerBufferSizeProp); this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); + + this.trafficFilter = new OptOutTrafficFilter(jsonConfig.getString(Const.Config.TrafficFilterConfigPathProp)); + this.trafficCalculator = new OptOutTrafficCalculator(cloudStorage, jsonConfig.getString(Const.Config.OptOutSqsS3FolderProp), jsonConfig.getString(Const.Config.TrafficCalcConfigPathProp)); + this.manualOverrideS3Path = jsonConfig.getString(Const.Config.ManualOverrideS3PathProp); - // Initialize window reader with memory protection limit + // Initialize window reader with traffic threshold this.windowReader = new SqsWindowReader( this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, - this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile + this.visibilityTimeout, this.deltaWindowSeconds, this.trafficCalculator.getThreshold() ); - LOGGER.info("OptOutSqsLogProducer initialized with maxMessagesPerFile: {}", this.maxMessagesPerFile); } @Override @@ -231,10 +249,7 @@ private void handleDeltaProduceStatus(RoutingContext routingContext) { if (job == null) { resp.setStatusCode(200) .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("state", "idle") - .put("message", "No job running on this pod") - .encode()); + .end(new JsonObject().put("status", "idle").put("message", "No job running on this pod").encode()); return; } @@ -259,7 +274,28 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { LOGGER.info("Delta production job requested via /deltaproduce endpoint"); - + try { + this.trafficFilter.reloadTrafficFilterConfig(); + } catch (MalformedTrafficFilterConfigException e) { + LOGGER.error("Error reloading traffic filter config: " + e.getMessage(), e); + resp.setStatusCode(500) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(new JsonObject().put("status", "failed").put("error", e.getMessage()).encode()); + return; + } + + try { + this.trafficCalculator.reloadTrafficCalcConfig(); + // Update window reader's message limit to match new threshold + this.windowReader.setMaxMessagesPerWindow(this.trafficCalculator.getThreshold()); + } catch (MalformedTrafficCalcConfigException e) { + LOGGER.error("Error reloading traffic calculator config: " + e.getMessage(), e); + resp.setStatusCode(500) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(new JsonObject().put("status", "failed").put("error", e.getMessage()).encode()); + return; + } + DeltaProduceJobStatus existingJob = currentJob.get(); // If there's an existing job, check if it's still running @@ -269,11 +305,7 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { LOGGER.warn("Delta production job already running on this pod"); resp.setStatusCode(409) .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "conflict") - .put("message", "A delta production job is already running on this pod") - .put("current_job", existingJob.toJson()) - .encode()); + .end(new JsonObject().put("status", "conflict").put("reason", "A delta production job is already running on this pod").encode()); return; } @@ -286,10 +318,7 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { if (!currentJob.compareAndSet(existingJob, newJob)) { resp.setStatusCode(409) .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "conflict") - .put("message", "Job state changed, please retry") - .encode()); + .end(new JsonObject().put("status", "conflict").put("reason", "Job state changed, please retry").encode()); return; } @@ -300,10 +329,7 @@ private void handleDeltaProduceStart(RoutingContext routingContext) { // Return immediately with 202 Accepted resp.setStatusCode(202) .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "accepted") - .put("message", "Delta production job started on this pod") - .encode()); + .end(new JsonObject().put("status", "accepted").put("message", "Delta production job started on this pod").encode()); } /** @@ -339,29 +365,21 @@ private JsonObject produceDeltasBlocking() throws Exception { if (this.shutdownInProgress) { throw new Exception("Producer is shutting down"); } - - JsonObject result = new JsonObject(); LOGGER.info("Starting delta production from SQS queue"); // Process messages until queue is empty or messages are too recent DeltaProductionResult deltaResult = this.produceBatchedDeltas(); // Determine status based on results - if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToMessagesTooRecent()) { - // No deltas produced because all messages were too recent - result.put("status", "skipped"); - result.put("reason", "All messages too recent"); - LOGGER.info("Delta production skipped: all messages too recent"); + if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToRecentMessages()) { + // No deltas produced - either messages too recent or traffic spike detected + LOGGER.info("Delta production skipped: {} entries processed", deltaResult.getEntriesProcessed()); + return deltaResult.toJsonWithStatus("skipped", "reason", "No deltas produced"); } else { - result.put("status", "success"); LOGGER.info("Delta production complete: {} deltas, {} entries", deltaResult.getDeltasProduced(), deltaResult.getEntriesProcessed()); + return deltaResult.toJsonWithStatus("success"); } - - result.put("deltas_produced", deltaResult.getDeltasProduced()); - result.put("entries_processed", deltaResult.getEntriesProcessed()); - - return result; } @@ -374,12 +392,22 @@ private JsonObject produceDeltasBlocking() throws Exception { * @throws IOException if delta production fails */ private DeltaProductionResult produceBatchedDeltas() throws IOException { + // Check for manual override at the start of the batch (and then between each delta window) + if (getManualOverride().equals("DELAYED_PROCESSING")) { + LOGGER.info("Manual override set to DELAYED_PROCESSING, stopping production"); + return new DeltaProductionResult(0, 0, 0, 0, false); + } + int deltasProduced = 0; int totalEntriesProcessed = 0; - boolean stoppedDueToMessagesTooRecent = false; + + int droppedRequestFilesProduced = 0; + int droppedRequestsProcessed = 0; + + boolean stoppedDueToRecentMessages = false; long jobStartTime = OptOutUtils.nowEpochSeconds(); - LOGGER.info("Starting delta production from SQS queue (maxMessagesPerFile: {})", this.maxMessagesPerFile); + LOGGER.info("Starting delta production from SQS queue"); // Read and process windows until done while (true) { @@ -392,41 +420,87 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException { // If no messages, we're done (queue empty or messages too recent) if (windowResult.isEmpty()) { - stoppedDueToMessagesTooRecent = windowResult.stoppedDueToMessagesTooRecent(); + stoppedDueToRecentMessages = windowResult.stoppedDueToRecentMessages(); LOGGER.info("Delta production complete - no more eligible messages"); break; } - // Produce delta for this window - long windowStart = windowResult.getWindowStart(); - List messages = windowResult.getMessages(); + // if message limit exceeded, treat as traffic spike + if (windowResult.exceededMessageLimit()) { + LOGGER.error("Message limit exceeded ({} messages) - triggering DELAYED_PROCESSING to prevent memory exhaustion", + windowResult.getMessages().size()); + this.setDelayedProcessingOverride(); + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, true); + } - // Create delta file + // Create delta file buffer String deltaName = OptOutUtils.newDeltaFileName(this.replicaId); ByteArrayOutputStream deltaStream = new ByteArrayOutputStream(); + + // Create dropped request file buffer + JsonArray droppedRequestStream = new JsonArray(); + String currentDroppedRequestName = String.format("%s%03d_%s_%08x.json", "optout-dropped-", replicaId, Instant.now().truncatedTo(ChronoUnit.SECONDS).toString().replace(':', '.'), OptOutUtils.rand.nextInt()); + + // Produce delta for this window + long windowStart = windowResult.getWindowStart(); + List messages = windowResult.getMessages(); + writeStartOfDelta(deltaStream, windowStart); // Write all messages - List sqsMessages = new ArrayList<>(); + List currentDeltaMessages = new ArrayList<>(); + List droppedRequestMessages = new ArrayList<>(); for (SqsParsedMessage msg : messages) { - writeOptOutEntry(deltaStream, msg.getHashBytes(), msg.getIdBytes(), msg.getTimestamp()); - sqsMessages.add(msg.getOriginalMessage()); + if (trafficFilter.isDenylisted(msg)) { + this.writeDroppedRequestEntry(droppedRequestStream, msg); + droppedRequestMessages.add(msg.getOriginalMessage()); + droppedRequestsProcessed++; + } else { + writeOptOutEntry(deltaStream, msg.getHashBytes(), msg.getIdBytes(), msg.getTimestamp()); + currentDeltaMessages.add(msg.getOriginalMessage()); + totalEntriesProcessed++; + } + } + + // check for manual override + if (getManualOverride().equals("DELAYED_PROCESSING")) { + LOGGER.info("Manual override set to DELAYED_PROCESSING, stopping production"); + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, true); + } else { + // Get queue attributes (including invisible messages) for traffic calculation + SqsMessageOperations.QueueAttributes queueAttributes = SqsMessageOperations.getQueueAttributes(this.sqsClient, this.queueUrl); + + // check traffic calculator status (including invisible messages in case of multiple consumers) + OptOutTrafficCalculator.TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(currentDeltaMessages, queueAttributes); + if (trafficStatus == OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING) { + LOGGER.error("OptOut Delta Production has hit DELAYED_PROCESSING status, stopping production"); + this.setDelayedProcessingOverride(); + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, true); + } } - // Upload and delete - uploadDeltaAndDeleteMessages(deltaStream, deltaName, windowStart, sqsMessages); - deltasProduced++; - totalEntriesProcessed += messages.size(); + // Upload delta file if there are non-denylisted messages + if (!currentDeltaMessages.isEmpty()) { + uploadDeltaAndDeleteMessages(deltaStream, deltaName, windowStart, currentDeltaMessages); + deltasProduced++; + } + + // Upload dropped request file if there are denylisted messages + if (!droppedRequestMessages.isEmpty()) { + this.uploadDroppedRequestsAndDeleteMessages(droppedRequestStream, currentDroppedRequestName, windowStart, droppedRequestMessages); + droppedRequestFilesProduced++; + droppedRequestMessages.clear(); + } LOGGER.info("Produced delta for window [{}, {}] with {} messages", windowStart, windowStart + this.deltaWindowSeconds, messages.size()); } long totalDuration = OptOutUtils.nowEpochSeconds() - jobStartTime; - LOGGER.info("Delta production complete: took {}s, produced {} deltas, processed {} entries", - totalDuration, deltasProduced, totalEntriesProcessed); + LOGGER.info("Delta production complete: took {}s, produced {} deltas, processed {} entries, produced {} dropped request files, processed {} dropped requests", + totalDuration, deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed); - return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, stoppedDueToMessagesTooRecent); + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, droppedRequestFilesProduced, droppedRequestsProcessed, stoppedDueToRecentMessages); } /** @@ -541,6 +615,82 @@ private void uploadDeltaAndDeleteMessages(ByteArrayOutputStream deltaStream, Str } } + /** + * Writes a dropped request entry to the dropped request stream. + */ + private void writeDroppedRequestEntry(JsonArray droppedRequestArray, SqsParsedMessage parsed) throws IOException { + String messageBody = parsed.getOriginalMessage().body(); + JsonObject messageJson = new JsonObject(messageBody); + droppedRequestArray.add(messageJson); + } + + // Upload a dropped request file to S3 and delete messages from SQS after successful upload + private void uploadDroppedRequestsAndDeleteMessages(JsonArray droppedRequestStream, String droppedRequestName, Long windowStart, List messages) throws IOException { + try { + // upload + byte[] droppedRequestData = droppedRequestStream.encode().getBytes(); + + LOGGER.info("SQS Dropped Requests Upload - fileName: {}, s3Path: {}, size: {} bytes, messages: {}, window: [{}, {})", + droppedRequestName, droppedRequestData.length, messages.size(), windowStart, windowStart + this.deltaWindowSeconds); + + boolean uploadSucceeded = false; + try (ByteArrayInputStream inputStream = new ByteArrayInputStream(droppedRequestData)) { + this.cloudStorageDroppedRequests.upload(inputStream, droppedRequestName); + LOGGER.info("Successfully uploaded dropped requests to S3: {}", droppedRequestName); + uploadSucceeded = true; + + // publish event + this.counterDroppedRequestFilesProduced.increment(); + this.counterDroppedRequestsProcessed.increment(messages.size()); + } catch (Exception uploadEx) { + LOGGER.error("Failed to upload dropped requests to S3: " + uploadEx.getMessage(), uploadEx); + throw new IOException("S3 upload failed", uploadEx); + } + + // CRITICAL: Only delete messages from SQS after successful S3 upload + if (uploadSucceeded && !messages.isEmpty()) { + LOGGER.info("Deleting {} messages from SQS after successful S3 upload", messages.size()); + SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, messages); + } + + // Clear the array + droppedRequestStream.clear(); + + } catch (Exception ex) { + LOGGER.error("Error uploading dropped requests: " + ex.getMessage(), ex); + throw new IOException("Dropped requests upload failed", ex); + } + } + + /** + * Upload a JSON config file to S3 containing the following: + * {"manual_override": "DELAYED_PROCESSING"} + * Manual override file is at the root of the S3 bucket + */ + private void setDelayedProcessingOverride() { + try { + JsonObject config = new JsonObject().put("manual_override", "DELAYED_PROCESSING"); + this.cloudStorage.upload(new ByteArrayInputStream(config.encode().getBytes()), this.manualOverrideS3Path); + } catch (Exception e) { + LOGGER.error("Error setting delayed processing override", e); + } + } + + /** + * Check if there is a manual override set in S3 for DEFAULT or DELAYED_PROCESSING status + * Manual override file is at the root of the S3 bucket + */ + private String getManualOverride() { + try { + InputStream inputStream = this.cloudStorage.download(this.manualOverrideS3Path); + JsonObject configJson = Utils.toJsonObject(inputStream); + return configJson.getString("manual_override", ""); + } catch (Exception e) { + LOGGER.error("Error checking for manual override in S3: " + e.getMessage(), e); + return ""; + } + } + private void publishDeltaProducedEvent(String newDelta) { vertx.eventBus().publish(this.eventDeltaProduced, newDelta); LOGGER.info("Published delta.produced event for: {}", newDelta); diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java index f0c7a7c..650126a 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java @@ -207,10 +207,26 @@ List> parseAllowlistRanges(JsonObject config) throws MalformedTraffic * Uses the newest delta file timestamp to anchor the 24-hour delta traffic window, * and the oldest queue timestamp to anchor the 5-minute queue window. * - * @param sqsMessages List of SQS messages + * @param sqsMessages List of SQS messages in the current batch * @return TrafficStatus (DELAYED_PROCESSING or DEFAULT) */ public TrafficStatus calculateStatus(List sqsMessages) { + return calculateStatus(sqsMessages, null); + } + + /** + * Calculate traffic status based on delta files, SQS queue messages, and queue attributes. + * + * Uses the newest delta file timestamp to anchor the 24-hour delta traffic window, + * and the oldest queue timestamp to anchor the 5-minute queue window. + * + * The invisible message count from queue attributes is added in case of multiple consumers. + * + * @param sqsMessages List of SQS messages in the current batch + * @param queueAttributes SQS queue attributes including invisible message count (may be null) + * @return TrafficStatus (DELAYED_PROCESSING or DEFAULT) + */ + public TrafficStatus calculateStatus(List sqsMessages, SqsMessageOperations.QueueAttributes queueAttributes) { try { // Get list of delta files from S3 (sorted newest to oldest) @@ -274,11 +290,21 @@ public TrafficStatus calculateStatus(List sqsMessages) { sum += sqsCount; } + // Add invisible messages from queue attributes (messages being processed by other consumers) + // These represent in-flight work that will soon become delta records + int invisibleMessages = 0; + if (queueAttributes != null) { + invisibleMessages = queueAttributes.getApproximateNumberOfMessagesNotVisible(); + sum += invisibleMessages; + LOGGER.info("Traffic calculation: adding {} invisible SQS messages to sum (queue: {})", + invisibleMessages, queueAttributes); + } + // Determine status TrafficStatus status = determineStatus(sum, this.baselineTraffic); - LOGGER.info("Traffic calculation complete: sum={}, baselineTraffic={}, thresholdMultiplier={}, status={}", - sum, this.baselineTraffic, this.thresholdMultiplier, status); + LOGGER.info("Traffic calculation complete: sum={} (including {} invisible), baselineTraffic={}, thresholdMultiplier={}, status={}", + sum, invisibleMessages, this.baselineTraffic, this.thresholdMultiplier, status); return status; @@ -556,6 +582,14 @@ TrafficStatus determineStatus(int sumCurrent, int baselineTraffic) { return TrafficStatus.DEFAULT; } + /** + * Get the traffic threshold (baseline × multiplier). + * Used for early termination + */ + public int getThreshold() { + return this.baselineTraffic * this.thresholdMultiplier; + } + /** * Get cache statistics for monitoring */ diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java index e8bd04b..f9ba816 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java @@ -123,6 +123,12 @@ List parseFilterRules(JsonObject config) throws MalformedTraf range.add(end); } + // log error and throw exception if range is not 2 elements + if (range.size() != 2) { + LOGGER.error("Invalid traffic filter rule: range is not 2 elements: {}", ruleJson.encode()); + throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule: range is not 2 elements"); + } + // parse IPs var ipAddressesJson = ruleJson.getJsonArray("IPs"); List ipAddresses = new ArrayList<>(); @@ -132,8 +138,14 @@ List parseFilterRules(JsonObject config) throws MalformedTraf } } + // log error and throw exception if IPs is empty + if (ipAddresses.size() == 0) { + LOGGER.error("Invalid traffic filter rule: IPs is empty: {}", ruleJson.encode()); + throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule: IPs is empty"); + } + // log error and throw exception if rule is invalid - if (range.size() != 2 || ipAddresses.size() == 0 || range.get(1) - range.get(0) > 86400) { // range must be 24 hours or less + if (range.get(1) - range.get(0) > 86400) { // range must be 24 hours or less LOGGER.error("Invalid traffic filter rule, range must be 24 hours or less: {}", ruleJson.encode()); throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule, range must be 24 hours or less"); } diff --git a/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java b/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java index 6c2715b..104215c 100644 --- a/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java +++ b/src/main/java/com/uid2/optout/vertx/SqsMessageOperations.java @@ -7,6 +7,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; /** * Utility class for SQS message operations. @@ -15,6 +16,96 @@ public class SqsMessageOperations { private static final Logger LOGGER = LoggerFactory.getLogger(SqsMessageOperations.class); private static final int SQS_MAX_DELETE_BATCH_SIZE = 10; + /** + * Result of getting queue attributes from SQS. + */ + public static class QueueAttributes { + private final int approximateNumberOfMessages; + private final int approximateNumberOfMessagesNotVisible; + private final int approximateNumberOfMessagesDelayed; + + public QueueAttributes(int approximateNumberOfMessages, + int approximateNumberOfMessagesNotVisible, + int approximateNumberOfMessagesDelayed) { + this.approximateNumberOfMessages = approximateNumberOfMessages; + this.approximateNumberOfMessagesNotVisible = approximateNumberOfMessagesNotVisible; + this.approximateNumberOfMessagesDelayed = approximateNumberOfMessagesDelayed; + } + + /** Number of messages available for retrieval from the queue (visible messages) */ + public int getApproximateNumberOfMessages() { + return approximateNumberOfMessages; + } + + /** Number of messages that are in flight (being processed by consumers, invisible) */ + public int getApproximateNumberOfMessagesNotVisible() { + return approximateNumberOfMessagesNotVisible; + } + + /** Number of messages in the queue that are delayed and not available yet */ + public int getApproximateNumberOfMessagesDelayed() { + return approximateNumberOfMessagesDelayed; + } + + /** Total messages in queue = visible + invisible + delayed */ + public int getTotalMessages() { + return approximateNumberOfMessages + approximateNumberOfMessagesNotVisible + approximateNumberOfMessagesDelayed; + } + + @Override + public String toString() { + return String.format("QueueAttributes{visible=%d, invisible=%d, delayed=%d, total=%d}", + approximateNumberOfMessages, approximateNumberOfMessagesNotVisible, + approximateNumberOfMessagesDelayed, getTotalMessages()); + } + } + + /** + * Gets queue attributes from SQS including message counts. + * + * @param sqsClient The SQS client + * @param queueUrl The queue URL + * @return QueueAttributes with message counts, or null if failed + */ + public static QueueAttributes getQueueAttributes(SqsClient sqsClient, String queueUrl) { + try { + GetQueueAttributesRequest request = GetQueueAttributesRequest.builder() + .queueUrl(queueUrl) + .attributeNames( + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, + QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED + ) + .build(); + + GetQueueAttributesResponse response = sqsClient.getQueueAttributes(request); + Map attrs = response.attributes(); + + int visible = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES), 0); + int invisible = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE), 0); + int delayed = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED), 0); + + QueueAttributes queueAttributes = new QueueAttributes(visible, invisible, delayed); + LOGGER.debug("Queue attributes: {}", queueAttributes); + return queueAttributes; + + } catch (Exception e) { + LOGGER.error("Error getting queue attributes from SQS", e); + return null; + } + } + + private static int parseIntOrDefault(String value, int defaultValue) { + if (value == null) { + return defaultValue; + } + try { + return Integer.parseInt(value); + } catch (NumberFormatException e) { + return defaultValue; + } + } + /** * Receives all available messages from an SQS queue up to a maximum number of batches. * diff --git a/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java b/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java index 75368c6..2149baa 100644 --- a/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java +++ b/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java @@ -21,20 +21,28 @@ public class SqsWindowReader { private final int maxMessagesPerPoll; private final int visibilityTimeout; private final int deltaWindowSeconds; - private final int maxMessagesPerFile; private final SqsBatchProcessor batchProcessor; - + private int maxMessagesPerWindow; + public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll, - int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerFile) { + int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerWindow) { this.sqsClient = sqsClient; this.queueUrl = queueUrl; this.maxMessagesPerPoll = maxMessagesPerPoll; this.visibilityTimeout = visibilityTimeout; this.deltaWindowSeconds = deltaWindowSeconds; - this.maxMessagesPerFile = maxMessagesPerFile; + this.maxMessagesPerWindow = maxMessagesPerWindow; this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds); - LOGGER.info("SqsWindowReader initialized with: maxMessagesPerFile: {}, maxMessagesPerPoll: {}, visibilityTimeout: {}, deltaWindowSeconds: {}", - maxMessagesPerFile, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds); + LOGGER.info("SqsWindowReader initialized with: maxMessagesPerWindow: {}, maxMessagesPerPoll: {}, visibilityTimeout: {}, deltaWindowSeconds: {}", + maxMessagesPerWindow, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds); + } + + /** + * Update the max messages limit (e.g., after config reload). + */ + public void setMaxMessagesPerWindow(int maxMessagesPerWindow) { + this.maxMessagesPerWindow = maxMessagesPerWindow; + LOGGER.info("Updated maxMessagesPerWindow to {}", maxMessagesPerWindow); } /** @@ -43,19 +51,22 @@ public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerP public static class WindowReadResult { private final List messages; private final long windowStart; - private final boolean stoppedDueToMessagesTooRecent; + private final boolean stoppedDueToRecentMessages; + private final boolean exceededMessageLimit; public WindowReadResult(List messages, long windowStart, - boolean stoppedDueToMessagesTooRecent) { + boolean stoppedDueToRecentMessages, boolean exceededMessageLimit) { this.messages = messages; this.windowStart = windowStart; - this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent; + this.stoppedDueToRecentMessages = stoppedDueToRecentMessages; + this.exceededMessageLimit = exceededMessageLimit; } public List getMessages() { return messages; } public long getWindowStart() { return windowStart; } public boolean isEmpty() { return messages.isEmpty(); } - public boolean stoppedDueToMessagesTooRecent() { return stoppedDueToMessagesTooRecent; } + public boolean stoppedDueToRecentMessages() { return stoppedDueToRecentMessages; } + public boolean exceededMessageLimit() { return exceededMessageLimit; } } /** @@ -64,7 +75,7 @@ public WindowReadResult(List messages, long windowStart, * - We discover the next window * - Queue is empty (no more messages) * - Messages are too recent (all messages younger than 5 minutes) - * - Message limit is reached (memory protection) + * - Message count exceeds maxMessagesPerWindow * * @return WindowReadResult with messages for the window, or empty if done */ @@ -73,11 +84,10 @@ public WindowReadResult readWindow() { long currentWindowStart = 0; while (true) { - // Check if we've hit the message limit - if (windowMessages.size() >= this.maxMessagesPerFile) { - LOGGER.warn("Window message limit reached ({} messages). Truncating window starting at {} for memory protection.", - this.maxMessagesPerFile, currentWindowStart); - return new WindowReadResult(windowMessages, currentWindowStart, false); + if (windowMessages.size() >= maxMessagesPerWindow) { + LOGGER.warn("Message limit exceeded: {} messages >= limit {}. Stopping to prevent memory exhaustion.", + windowMessages.size(), maxMessagesPerWindow); + return new WindowReadResult(windowMessages, currentWindowStart, false, true); } // Read one batch from SQS (up to 10 messages) @@ -86,7 +96,7 @@ public WindowReadResult readWindow() { if (rawBatch.isEmpty()) { // Queue empty - return what we have - return new WindowReadResult(windowMessages, currentWindowStart, false); + return new WindowReadResult(windowMessages, currentWindowStart, false, false); } // Process batch: parse, validate, filter @@ -95,7 +105,7 @@ public WindowReadResult readWindow() { if (batchResult.isEmpty()) { if (batchResult.shouldStopProcessing()) { // Messages too recent - return what we have - return new WindowReadResult(windowMessages, currentWindowStart, true); + return new WindowReadResult(windowMessages, currentWindowStart, true, false); } // corrupt messages deleted, read next messages continue; @@ -117,13 +127,19 @@ public WindowReadResult readWindow() { } windowMessages.add(msg); + + // Check limit after each message addition + if (windowMessages.size() >= maxMessagesPerWindow) { + LOGGER.warn("Message limit exceeded during batch: {} messages >= limit {}", + windowMessages.size(), maxMessagesPerWindow); + return new WindowReadResult(windowMessages, currentWindowStart, false, true); + } } if (newWindow) { // close current window and return - return new WindowReadResult(windowMessages, currentWindowStart, false); + return new WindowReadResult(windowMessages, currentWindowStart, false, false); } } } } - diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index a46023f..7b5900f 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -4,6 +4,8 @@ import com.uid2.shared.cloud.ICloudStorage; import com.uid2.shared.optout.OptOutCloudSync; import com.uid2.shared.vertx.VertxUtils; +import com.uid2.shared.optout.OptOutEntry; +import com.uid2.shared.optout.OptOutCollection; import io.vertx.core.Vertx; import io.vertx.core.json.JsonObject; import io.vertx.ext.unit.Async; @@ -15,11 +17,16 @@ import software.amazon.awssdk.services.sqs.model.*; import java.io.InputStream; +import java.io.ByteArrayInputStream; import java.util.*; +import java.nio.file.Files; +import java.nio.file.Path; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doAnswer; +import org.mockito.ArgumentCaptor; /** * Integration tests for OptOutSqsLogProducer deltaproduce endpoint. @@ -33,12 +40,18 @@ public class OptOutSqsLogProducerTest { private SqsClient sqsClient; private ICloudStorage cloudStorage; + private ICloudStorage cloudStorageDroppedRequests; private OptOutCloudSync cloudSync; private static final String TEST_QUEUE_URL = "https://sqs.test.amazonaws.com/123456789/test"; private static final String TEST_API_KEY = "test-api-key"; private static final String VALID_HASH_BASE64 = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="; private static final String VALID_ID_BASE64 = "AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE="; + private static final String TRAFFIC_FILTER_CONFIG_PATH = "./traffic-filter.json"; + private static final String TRAFFIC_CALC_CONFIG_PATH = "./traffic-calc.json"; + private static final String MANUAL_OVERRIDE_S3_PATH = "manual-override.json"; + private static final String S3_DELTA_PREFIX = "sqs-delta"; + private static final String TEST_BUCKET_DROPPED_REQUESTS = "test-bucket-dropped-requests"; @Before public void setup(TestContext context) throws Exception { @@ -47,6 +60,7 @@ public void setup(TestContext context) throws Exception { // Create mocks sqsClient = mock(SqsClient.class); cloudStorage = mock(ICloudStorage.class); + cloudStorageDroppedRequests = mock(ICloudStorage.class); cloudSync = mock(OptOutCloudSync.class); JsonObject config = VertxUtils.getJsonConfig(vertx); @@ -54,7 +68,13 @@ public void setup(TestContext context) throws Exception { .put(Const.Config.OptOutSqsVisibilityTimeoutProp, 240) .put(Const.Config.OptOutProducerBufferSizeProp, 65536) .put(Const.Config.OptOutProducerReplicaIdProp, 1) - .put(Const.Config.OptOutInternalApiTokenProp, TEST_API_KEY); + .put(Const.Config.OptOutInternalApiTokenProp, TEST_API_KEY) + .put(Const.Config.OptOutInternalApiTokenProp, TEST_API_KEY) + .put(Const.Config.TrafficFilterConfigPathProp, TRAFFIC_FILTER_CONFIG_PATH) + .put(Const.Config.TrafficCalcConfigPathProp, TRAFFIC_CALC_CONFIG_PATH) + .put(Const.Config.ManualOverrideS3PathProp, MANUAL_OVERRIDE_S3_PATH) + .put(Const.Config.OptOutS3BucketDroppedRequestsProp, TEST_BUCKET_DROPPED_REQUESTS) + .put(Const.Config.OptOutSqsS3FolderProp, S3_DELTA_PREFIX); // Mock cloud sync to return proper S3 paths when(cloudSync.toCloudPath(anyString())) @@ -62,9 +82,46 @@ public void setup(TestContext context) throws Exception { // Mock S3 upload to succeed by default doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + doAnswer(inv -> null).when(cloudStorageDroppedRequests).upload(any(InputStream.class), anyString()); + + // Mock getQueueAttributes by default (returns zero messages) + Map defaultQueueAttrs = new HashMap<>(); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, "0"); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, "0"); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, "0"); + doReturn(GetQueueAttributesResponse.builder() + .attributes(defaultQueueAttrs) + .build()) + .when(sqsClient).getQueueAttributes(any(GetQueueAttributesRequest.class)); + + // Don't mock download with anyString() - let tests mock specific paths as needed + // Unmocked downloads will return null by default + + + try { + String traficFilterConfig = """ + { + "denylist_requests": [ + ] + } + """; + createTrafficConfigFile(traficFilterConfig); + + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 100, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + } catch (Exception e) { + throw new RuntimeException(e); + } // Create producer with mock SqsClient - producer = new OptOutSqsLogProducer(config, cloudStorage, cloudSync, Const.Event.DeltaProduce, sqsClient); + producer = new OptOutSqsLogProducer(config, cloudStorage, cloudStorageDroppedRequests, cloudSync, Const.Event.DeltaProduce, sqsClient); // Deploy verticle Async async = context.async(); @@ -76,12 +133,53 @@ public void tearDown(TestContext context) { if (vertx != null) { vertx.close(context.asyncAssertSuccess()); } + if (Files.exists(Path.of(TRAFFIC_FILTER_CONFIG_PATH))) { + try { + Files.delete(Path.of(TRAFFIC_FILTER_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + if (Files.exists(Path.of(TRAFFIC_CALC_CONFIG_PATH))) { + try { + Files.delete(Path.of(TRAFFIC_CALC_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } } - + + private void createTrafficConfigFile(String content) { + try { + Path configPath = Path.of(TRAFFIC_FILTER_CONFIG_PATH); + Files.writeString(configPath, content); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void createTrafficCalcConfigFile(String content) { + try { + Path configPath = Path.of(TRAFFIC_CALC_CONFIG_PATH); + Files.writeString(configPath, content); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + private Message createMessage(String hash, String id, long timestampMs) { + return createMessage(hash, id, timestampMs, null, null, null, null); + } + + private Message createMessage(String hash, String id, long timestampMs, String email, String phone, String clientIp, String traceId) { JsonObject body = new JsonObject() .put("identity_hash", hash) .put("advertising_id", id); + + if (email != null) body.put("email", email); + if (phone != null) body.put("phone", phone); + if (clientIp != null) body.put("client_ip", clientIp); + if (traceId != null) body.put("trace_id", traceId); Map attrs = new HashMap<>(); attrs.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(timestampMs)); @@ -287,7 +385,7 @@ public void testDeltaProduceEndpoint_allMessagesTooRecent(TestContext context) { JsonObject result = finalStatus.getJsonObject("result"); context.assertNotNull(result); context.assertEquals("skipped", result.getString("status")); - context.assertEquals("All messages too recent", result.getString("reason")); + context.assertEquals("No deltas produced", result.getString("reason")); // No processing should occur try { @@ -331,9 +429,15 @@ public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime) ); - // Mock SQS to return messages + // Use CountDownLatch to control when the mock returns - ensures job stays running + java.util.concurrent.CountDownLatch processingLatch = new java.util.concurrent.CountDownLatch(1); + + // Mock SQS to wait on latch before returning - keeps job in RUNNING state when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenAnswer(inv -> { + processingLatch.await(); // Wait until test releases + return ReceiveMessageResponse.builder().messages(messages).build(); + }) .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) @@ -373,8 +477,10 @@ public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context .onComplete(context.asyncAssertSuccess(body -> { JsonObject response = new JsonObject(body.toString()); context.assertEquals("conflict", response.getString("status")); - context.assertTrue(response.getString("message").contains("already running")); + context.assertTrue(response.getString("reason").contains("already running")); + // Release the latch so first job can complete + processingLatch.countDown(); async.complete(); })); } @@ -460,5 +566,1080 @@ public void testDeltaProduceEndpoint_autoClearCompletedJob(TestContext context) async.complete(); })); } + + @Test + public void testTrafficFilter_denylistedMessagesAreDropped(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - update traffic filter config to denyhlist specific IP and time range + long baseTime = System.currentTimeMillis() / 1000 - 400; // 400 seconds ago + String filterConfig = String.format(""" + { + "denylist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(filterConfig); + + // Setup - create messages: some denylisted, some not + long denylistedTime = (baseTime) * 1000; // Within denyhlist range + long normalTime = (baseTime - 200) * 1000; // Outside denyhlist range + List messages = Arrays.asList( + // These should be dropped (denylisted) + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, denylistedTime, null, null, "192.168.1.100", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, denylistedTime + 1000, null, null, "192.168.1.100", null), + // These should be processed normally + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime + 1000, null, null, "10.0.0.2", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should process 2 normal entries + context.assertEquals(2, result.getInteger("entries_processed")); + + // Should have 2 dropped requests + context.assertEquals(2, result.getInteger("dropped_requests_processed")); + + // Verify both delta and dropped request files were uploaded + try { + verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), anyString()); + verify(cloudStorageDroppedRequests, atLeastOnce()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testTrafficFilter_noBlacklistedMessages(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - traffic filter with a denylisted IP + long baseTime = System.currentTimeMillis() / 1000 - 400; + String filterConfig = String.format(""" + { + "denylist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(filterConfig); + + // Setup - create messages that don't match denyhlist + long normalTime = (baseTime - 200) * 1000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, normalTime + 1000, null, null, "10.0.0.2", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + context.assertEquals(2, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("dropped_requests_processed")); + + // Should not upload dropped request file + try { + verify(cloudStorageDroppedRequests, never()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testTrafficFilter_allMessagesBlacklisted(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - traffic filter with a denylisted IP + long baseTime = System.currentTimeMillis() / 1000 - 400; + String filterConfig = String.format(""" + { + "denylist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(filterConfig); + + // Setup - create messages that are denylisted + long denylistedTime = baseTime * 1000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, denylistedTime, null, null, "192.168.1.100", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, denylistedTime + 1000, null, null, "192.168.1.100", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // No entries processed (all denylisted) + context.assertEquals(0, result.getInteger("entries_processed")); + + // All messages dropped + context.assertEquals(2, result.getInteger("dropped_requests_processed")); + + // Should upload dropped request file but not delta file + try { + verify(cloudStorageDroppedRequests, atLeastOnce()).upload(any(InputStream.class), anyString()); + verify(cloudStorage, never()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testTrafficFilter_messagesWithoutClientIp(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - traffic filter with a denylisted IP + long baseTime = System.currentTimeMillis() / 1000 - 400; + String filterConfig = String.format(""" + { + "denylist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(filterConfig); + + // Create messages without client IP (should not be denylisted) + long time = baseTime * 1000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, time, null, null, null, null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Message should be processed (not denylisted due to missing IP) + context.assertEquals(1, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("dropped_requests_processed")); + + async.complete(); + })); + } + + @Test + public void testTrafficFilterConfig_reloadOnEachBatch(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - initial config with no denyhlist + String initialConfig = """ + { + "denylist_requests": [] + } + """; + createTrafficConfigFile(initialConfig); + + // Setup - create messages + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "192.168.1.100", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Act & Assert - first request - should process normally + int port = Const.Port.ServicePortForOptOut + 1; + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals(1, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("dropped_requests_processed")); + + // Update config to denyhlist the IP + try { + long baseTime = System.currentTimeMillis() / 1000 - 400; + String updatedConfig = String.format(""" + { + "denylist_requests": [ + { + "range": [%d, %d], + "IPs": ["192.168.1.100"] + } + ] + } + """, baseTime - 100, baseTime + 100); + createTrafficConfigFile(updatedConfig); + + // Reset mocks for second request + reset(sqsClient, cloudStorage, cloudStorageDroppedRequests); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + doAnswer(inv -> null).when(cloudStorageDroppedRequests).upload(any(InputStream.class), anyString()); + + // Act & Assert - second request - should now be denylisted + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body2 -> { + JsonObject response2 = new JsonObject(body2.toString()); + context.assertEquals("accepted", response2.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus2 -> { + context.assertEquals("completed", finalStatus2.getString("state")); + JsonObject result2 = finalStatus2.getJsonObject("result"); + // Now should be denylisted + context.assertEquals(0, result2.getInteger("entries_processed")); + context.assertEquals(1, result2.getInteger("dropped_requests_processed")); + async.complete(); + })); + } catch (Exception e) { + context.fail(e); + } + })); + } + + @Test + public void testTrafficCalculator_defaultStatus(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - traffic calc config with required fields + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 100, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + + // Setup - no manual override + when(cloudStorage.download(anyString())) + .thenReturn(null); + + // Setup - create messages that will result in DEFAULT status + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime + 1000, null, null, "10.0.0.2", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should process messages normally (DEFAULT status) + context.assertEquals(2, result.getInteger("entries_processed")); + context.assertTrue(result.getInteger("deltas_produced") >= 1); + + // Verify upload happened + try { + verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testManualOverride_delayedProcessing(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - mock manual override set to DELAYED_PROCESSING + JsonObject manualOverride = new JsonObject().put("manual_override", "DELAYED_PROCESSING"); + doReturn(new java.io.ByteArrayInputStream(manualOverride.encode().getBytes())) + .when(cloudStorage).download(MANUAL_OVERRIDE_S3_PATH); // At root of bucket + + // Setup - create messages (won't be processed due to override) + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime + 1000, null, null, "10.0.0.2", null) + ); + + List allMessages = new ArrayList<>(messages); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(allMessages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should not process anything - manual override checked at start + context.assertEquals(0, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("deltas_produced")); + + // No SQS deletions should occur (messages not processed) + verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } + + @Test + public void testManualOverride_default_bypassesTrafficCalculation(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - setup time: current time + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create delta files with timestamps distributed over 48 hours + List timestamps = new ArrayList<>(); + + // Past window: t-47h to t-25h (add 10 entries) + for (int i = 0; i < 10; i++) { + timestamps.add(t - 47*3600 + i * 1000); + } + + // Current window: t-23h to t-1h (add 100 entries - 10x past) + for (int i = 0; i < 100; i++) { + timestamps.add(t - 23*3600 + i * 1000); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + // Setup - mock manual override set to DEFAULT + JsonObject manualOverride = new JsonObject().put("manual_override", "DEFAULT"); + + // Mock S3 operations for this test + // Use doAnswer to create fresh streams on each call (streams are consumed on read) + doReturn(Arrays.asList("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .when(cloudStorage).list("sqs-delta"); + doAnswer(inv -> new ByteArrayInputStream(deltaFileBytes)) + .when(cloudStorage).download("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"); + doAnswer(inv -> new java.io.ByteArrayInputStream(manualOverride.encode().getBytes())) + .when(cloudStorage).download("manual-override.json"); + + // Setup - SQS messages, 10 messages in same window + long oldTime = (t - 600) * 1000; + List messages = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime - (i * 1000), null, null, "10.0.0." + i, null)); + } + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should process 10 messages and produce 1 delta (all in same window) + context.assertEquals(10, result.getInteger("entries_processed")); + context.assertEquals(1, result.getInteger("deltas_produced")); + + verify(sqsClient, atLeastOnce()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } + + @Test + public void testTrafficCalculator_detectsSpikeInCurrentWindow(TestContext context) throws Exception { + Async async = context.async(); + + // Threshold = baseline * multiplier = 100 * 5 = 500 + // We have 610 messages, which exceeds 500, so spike should be detected + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 100, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + + // Setup time + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create historical delta files showing low baseline traffic (2 records from 24-48h ago) + List timestamps = new ArrayList<>(); + timestamps.add(t - 36*3600); // 36 hours ago + timestamps.add(t - 36*3600 + 1000); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + // Reset cloudStorage mock to ensure clean state + reset(cloudStorage); + + // Re-mock S3 upload (needed after reset) + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + + // Mock S3 operations for historical data + // Use doAnswer to create fresh streams on each call + doReturn(Arrays.asList("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_baseline.dat")) + .when(cloudStorage).list("sqs-delta"); + doAnswer(inv -> new ByteArrayInputStream(deltaFileBytes)) + .when(cloudStorage).download("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_baseline.dat"); + + // No manual override set (returns null) + doReturn(null).when(cloudStorage).download("manual-override.json"); + + // Setup SQS messages + long baseTime = (t - 600) * 1000; + + List allMessages = new ArrayList<>(); + // Create 610 messages with timestamps spread over ~4 minutes (within the 5-minute window) + for (int i = 0; i < 610; i++) { + // Timestamps range from (t-600) to (t-600-240) seconds = t-600 to t-840 + // All within a single 5-minute window for traffic calculation, and all > 5 min old + long timestampMs = baseTime - (i * 400); // ~400ms apart going backwards, total span ~244 seconds + allMessages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + timestampMs, null, null, "10.0.0." + (i % 256), null)); + } + + // Mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(allMessages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + // Mock getQueueAttributes to return zero invisible messages (doesn't affect the spike detection) + Map queueAttrs = new HashMap<>(); + queueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, "0"); + queueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, "0"); + queueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, "0"); + doReturn(GetQueueAttributesResponse.builder() + .attributes(queueAttrs) + .build()) + .when(sqsClient).getQueueAttributes(any(GetQueueAttributesRequest.class)); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Act & Assert + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("skipped", result.getString("status")); + + // Expected behavior: + // All 610 messages are within a single 5-minute window + // Traffic calculator counts them all and detects spike (>=500 threshold) + // DELAYED_PROCESSING is triggered, no delta uploaded + // The entries_processed count reflects how many were read before spike detection + context.assertTrue(result.getInteger("entries_processed") <= 610); + context.assertEquals(0, result.getInteger("deltas_produced")); + + // Verify manual override was set to DELAYED_PROCESSING on S3 + try { + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(String.class); + ArgumentCaptor streamCaptor = ArgumentCaptor.forClass(InputStream.class); + verify(cloudStorage, atLeastOnce()).upload(streamCaptor.capture(), pathCaptor.capture()); + + // Check if manual-override.json was uploaded + boolean overrideSet = false; + for (int i = 0; i < pathCaptor.getAllValues().size(); i++) { + if (pathCaptor.getAllValues().get(i).equals("manual-override.json")) { + overrideSet = true; + break; + } + } + context.assertTrue(overrideSet, "Manual override should be set to DELAYED_PROCESSING after detecting spike"); + } catch (Exception e) { + context.fail(e); + } + + async.complete(); + })); + } + + @Test + public void testCircuitBreaker_stopsProcessingWhenMessageLimitExceeded(TestContext context) throws Exception { + Async async = context.async(); + + // Use low threshold (100) so circuit breaker triggers before traffic calculator + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 20, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + + // Reset cloudStorage mock to ensure clean state + reset(cloudStorage); + + // Re-mock S3 upload (needed after reset) + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + + // No manual override set (returns null) + doReturn(null).when(cloudStorage).download("manual-override.json"); + + // Setup time + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create 200 messages - exceeds threshold (20 * 5 = 100) + long oldTime = (t - 600) * 1000; // 10 minutes ago + List messages = new ArrayList<>(); + for (int i = 0; i < 200; i++) { + messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + oldTime - (i * 1000), null, null, "10.0.0." + (i % 256), null)); + } + + // Mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Act & Assert + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("skipped", result.getString("status")); + + // Expected behavior: + // SqsWindowReader hits maxMessagesPerWindow limit (100) during reading + // Circuit breaker triggers DELAYED_PROCESSING immediately + // Processing stops before any messages are counted as processed + context.assertEquals(0, result.getInteger("entries_processed")); + context.assertEquals(0, result.getInteger("deltas_produced")); + + // Verify manual override was set to DELAYED_PROCESSING on S3 + try { + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(String.class); + verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), pathCaptor.capture()); + + // Check if manual-override.json was uploaded + boolean overrideSet = pathCaptor.getAllValues().stream() + .anyMatch(path -> path.equals("manual-override.json")); + context.assertTrue(overrideSet, "Circuit breaker should set DELAYED_PROCESSING override"); + } catch (Exception e) { + context.fail(e); + } + + // Verify NO messages were deleted from SQS (processing stopped before completion) + verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } + + @Test + public void testManualOverride_notSet(TestContext context) throws Exception { + Async async = context.async(); + + // Setup - mock no manual override file + when(cloudStorage.download(anyString())) + .thenReturn(null); + + // Setup - create messages + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "10.0.0.1", null) + ); + + // Setup - mock SQS operations + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + // Act & Assert - call endpoint via HTTP + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertEquals("success", result.getString("status")); + + // Should process normally with traffic calc (no override) + context.assertEquals(1, result.getInteger("entries_processed")); + + async.complete(); + })); + } + + @Test + public void testS3UploadFailure_messagesNotDeletedFromSqs(TestContext context) throws Exception { + Async async = context.async(); + + // Create messages to process + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, "10.0.0.1", null), + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime + 1000, null, null, "10.0.0.2", null) + ); + + // Mock SQS to return messages + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + // Mock S3 upload to FAIL + doThrow(new RuntimeException("S3 upload failed - simulated error")) + .when(cloudStorage).upload(any(InputStream.class), anyString()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Start job + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + // Job should fail due to S3 error + context.assertEquals("failed", finalStatus.getString("state")); + context.assertTrue(finalStatus.getString("error").contains("S3") || + finalStatus.getString("error").contains("upload")); + + // CRITICAL: Messages should NOT be deleted from SQS when upload fails + verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } + + @Test + public void testStatusEndpoint_showsRunningJob(TestContext context) throws Exception { + Async async = context.async(); + + // Create messages + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime) + ); + + // Use CountDownLatch to keep job running + java.util.concurrent.CountDownLatch processingLatch = new java.util.concurrent.CountDownLatch(1); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenAnswer(inv -> { + processingLatch.await(); + return ReceiveMessageResponse.builder().messages(messages).build(); + }) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Start job + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + // Immediately check status - should show "running" + return vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.GET, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString() + "/status") + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()); + }) + .compose(resp -> { + context.assertEquals(200, resp.statusCode()); + return resp.body(); + }) + .onComplete(context.asyncAssertSuccess(body -> { + JsonObject status = new JsonObject(body.toString()); + context.assertEquals("running", status.getString("state")); + context.assertNotNull(status.getString("start_time")); + + // Release latch so job can complete + processingLatch.countDown(); + async.complete(); + })); + } + + @Test + public void testStatusEndpoint_showsFailedJob(TestContext context) throws Exception { + Async async = context.async(); + + // Create messages + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime) + ); + + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + // Make S3 upload fail + doThrow(new RuntimeException("Simulated S3 failure")) + .when(cloudStorage).upload(any(InputStream.class), anyString()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Start job and wait for it to fail + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> pollForCompletion(context, port, 100, 50)) + .compose(finalStatus -> { + context.assertEquals("failed", finalStatus.getString("state")); + + // Now call status endpoint directly to verify failed state is persisted + return vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.GET, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString() + "/status") + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()); + }) + .compose(resp -> { + context.assertEquals(200, resp.statusCode()); + return resp.body(); + }) + .onComplete(context.asyncAssertSuccess(body -> { + JsonObject status = new JsonObject(body.toString()); + context.assertEquals("failed", status.getString("state")); + context.assertNotNull(status.getString("error")); + context.assertNotNull(status.getString("start_time")); + context.assertNotNull(status.getString("end_time")); + context.assertNotNull(status.getInteger("duration_seconds")); + + async.complete(); + })); + } + + @Test + public void testDeltaProduceEndpoint_invalidApiKey(TestContext context) { + Async async = context.async(); + + int port = Const.Port.ServicePortForOptOut + 1; + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer wrong-api-key") + .send()) + .compose(resp -> { + context.assertEquals(401, resp.statusCode()); + return resp.body(); + }) + .onComplete(context.asyncAssertSuccess(body -> { + // Should not call SQS when unauthorized + verify(sqsClient, never()).receiveMessage(any(ReceiveMessageRequest.class)); + async.complete(); + })); + } + + /** + * Create delta file bytes with specified timestamps + */ + private byte[] createDeltaFileBytes(List timestamps) throws Exception { + // Create OptOutEntry objects using newTestEntry + List entries = new ArrayList<>(); + + long idCounter = 1000; // Use incrementing IDs for test entries + for (long timestamp : timestamps) { + entries.add(OptOutEntry.newTestEntry(idCounter++, timestamp)); + } + + // Create OptOutCollection + OptOutCollection collection = new OptOutCollection(entries.toArray(new OptOutEntry[0])); + return collection.getStore(); + } } diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java index f977233..99dc950 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java @@ -1507,4 +1507,201 @@ void testCalculateStatus_timestampsCached() throws Exception { assertEquals(2, stats.get("total_cached_timestamps")); } + // ============================================================================ + // SECTION 10: Tests for calculateStatus() with QueueAttributes + // ============================================================================ + + /** + * Create a QueueAttributes object for testing + */ + private SqsMessageOperations.QueueAttributes createQueueAttributes(int visible, int invisible, int delayed) { + return new SqsMessageOperations.QueueAttributes(visible, invisible, delayed); + } + + @Test + void testCalculateStatus_withQueueAttributes_nullAttributes() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - null queue attributes should behave same as single-parameter version + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null); + + // Assert - DEFAULT (same as without queue attributes) + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_withQueueAttributes_invisibleMessagesAddedToSum() throws Exception { + // Setup - create delta files with entries just under threshold + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create 490 entries (just under threshold of 500 = 5 * 100) + List timestamps = new ArrayList<>(); + for (int i = 0; i < 490; i++) { + timestamps.add(t - 23*3600 + i * 60); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - without invisible messages, should be DEFAULT + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus statusWithoutInvisible = calculator.calculateStatus(sqsMessages, null); + + // With invisible messages that push over threshold, should be DELAYED_PROCESSING + // 490 (delta) + 1 (sqs) + 10 (invisible) = 501 >= 500 + SqsMessageOperations.QueueAttributes queueAttributes = createQueueAttributes(5, 10, 0); + OptOutTrafficCalculator.TrafficStatus statusWithInvisible = calculator.calculateStatus(sqsMessages, queueAttributes); + + // Assert + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, statusWithoutInvisible); + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, statusWithInvisible); + } + + @Test + void testCalculateStatus_withQueueAttributes_zeroInvisibleMessages() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600, t - 7200); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - queue attributes with zero invisible messages + List sqsMessages = Arrays.asList(createSqsMessage(t)); + SqsMessageOperations.QueueAttributes queueAttributes = createQueueAttributes(100, 0, 50); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes); + + // Assert - same result as without queue attributes (only invisible is added to sum) + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_withQueueAttributes_largeInvisibleCount() throws Exception { + // Setup - create delta files with minimal entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - large number of invisible messages alone should trigger DELAYED_PROCESSING + // threshold = 5 * 100 = 500 + // 1 (delta) + 1 (sqs) + 500 (invisible) = 502 >= 500 + List sqsMessages = Arrays.asList(createSqsMessage(t)); + SqsMessageOperations.QueueAttributes queueAttributes = createQueueAttributes(0, 500, 0); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes); + + // Assert - DELAYED_PROCESSING due to invisible messages + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); + } + + @Test + void testCalculateStatus_withQueueAttributes_delayedMessagesNotAdded() throws Exception { + // Setup - create delta files with entries near threshold + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create 495 entries + List timestamps = new ArrayList<>(); + for (int i = 0; i < 495; i++) { + timestamps.add(t - 23*3600 + i * 60); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - delayed messages should NOT be added to sum (only invisible) + // 495 (delta) + 1 (sqs) + 0 (invisible) = 496 < 500 + // If delayed was added: 496 + 1000 = 1496 >= 500 (would fail) + List sqsMessages = Arrays.asList(createSqsMessage(t)); + SqsMessageOperations.QueueAttributes queueAttributes = createQueueAttributes(100, 0, 1000); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes); + + // Assert - DEFAULT (delayed messages not counted) + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + // ============================================================================ + // SECTION 11: Tests for QueueAttributes class + // ============================================================================ + + @Test + void testQueueAttributes_getters() { + // Setup + SqsMessageOperations.QueueAttributes attrs = createQueueAttributes(10, 20, 5); + + // Assert + assertEquals(10, attrs.getApproximateNumberOfMessages()); + assertEquals(20, attrs.getApproximateNumberOfMessagesNotVisible()); + assertEquals(5, attrs.getApproximateNumberOfMessagesDelayed()); + assertEquals(35, attrs.getTotalMessages()); + } + + @Test + void testQueueAttributes_toString() { + // Setup + SqsMessageOperations.QueueAttributes attrs = createQueueAttributes(100, 50, 25); + + // Act + String str = attrs.toString(); + + // Assert - should contain all values + assertTrue(str.contains("visible=100")); + assertTrue(str.contains("invisible=50")); + assertTrue(str.contains("delayed=25")); + assertTrue(str.contains("total=175")); + } + + @Test + void testQueueAttributes_zeroValues() { + // Setup + SqsMessageOperations.QueueAttributes attrs = createQueueAttributes(0, 0, 0); + + // Assert + assertEquals(0, attrs.getApproximateNumberOfMessages()); + assertEquals(0, attrs.getApproximateNumberOfMessagesNotVisible()); + assertEquals(0, attrs.getApproximateNumberOfMessagesDelayed()); + assertEquals(0, attrs.getTotalMessages()); + } + }