diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java index 3b6dd25ba6f..51278588b7e 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/BatchingMap.java @@ -67,18 +67,18 @@ public void forEach(BiConsumer> batchContextMap.forEach(action); } - public Map> flushableRequests(String batchKey) { - return batchContextMap.get(batchKey).flushableRequests(); + public Map> extractBatchIfReady(String batchKey) { + return batchContextMap.get(batchKey).extractBatchIfReady(); } - public Map> flushableRequestsOnByteLimitBeforeAdd(String batchKey, + public Map> extractBatchIfSizeExceeded(String batchKey, RequestT request) { - return batchContextMap.get(batchKey).flushableRequestsOnByteLimitBeforeAdd(request); + return batchContextMap.get(batchKey).extractBatchIfSizeExceeded(request); } - public Map> flushableScheduledRequests(String batchKey, + public Map> extractEntriesForScheduledFlush(String batchKey, int maxBatchItems) { - return batchContextMap.get(batchKey).flushableScheduledRequests(maxBatchItems); + return batchContextMap.get(batchKey).extractEntriesForScheduledFlush(maxBatchItems); } public void cancelScheduledFlush(String batchKey) { diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java index 80a55763c07..c3155f4ed67 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchBuffer.java @@ -64,10 +64,10 @@ public RequestBatchBuffer(ScheduledFuture scheduledFlush, this.maxBatchSizeInBytes = maxBatchSizeInBytes; } - public Map> flushableRequests() { + public Map> extractBatchIfReady() { synchronized (flushLock) { return (isByteSizeThresholdCrossed(0) || isMaxBatchSizeLimitReached()) - ? extractFlushedEntries(maxBatchItems) + ? extractEntries(maxBatchItems) : Collections.emptyMap(); } } @@ -77,12 +77,12 @@ private boolean isMaxBatchSizeLimitReached() { return idToBatchContext.size() >= maxBatchItems; } - public Map> flushableRequestsOnByteLimitBeforeAdd(RequestT request) { + public Map> extractBatchIfSizeExceeded(RequestT request) { synchronized (flushLock) { if (maxBatchSizeInBytes > 0 && !idToBatchContext.isEmpty()) { int incomingRequestBytes = RequestPayloadCalculator.calculateMessageSize(request).orElse(0); if (isByteSizeThresholdCrossed(incomingRequestBytes)) { - return extractFlushedEntries(maxBatchItems); + return extractEntries(maxBatchItems); } } return Collections.emptyMap(); @@ -100,16 +100,16 @@ private boolean isByteSizeThresholdCrossed(int incomingRequestBytes) { return totalPayloadSize > maxBatchSizeInBytes; } - public Map> flushableScheduledRequests(int maxBatchItems) { + public Map> extractEntriesForScheduledFlush(int maxBatchItems) { synchronized (flushLock) { if (!idToBatchContext.isEmpty()) { - return extractFlushedEntries(maxBatchItems); + return extractEntries(maxBatchItems); } return Collections.emptyMap(); } } - private Map> extractFlushedEntries(int maxBatchItems) { + private Map> extractEntries(int maxBatchItems) { LinkedHashMap> requestEntries = new LinkedHashMap<>(); String nextEntry; while (requestEntries.size() < maxBatchItems && hasNextBatchEntry()) { diff --git a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java index af7ed4149f0..dd1de65cd2c 100644 --- a/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java +++ b/services/sqs/src/main/java/software/amazon/awssdk/services/sqs/internal/batchmanager/RequestBatchManager.java @@ -72,9 +72,9 @@ public CompletableFuture batchRequest(RequestT request) { String batchKey = getBatchKey(request); // Handle potential byte size overflow only if there are request in map and if feature enabled if (requestsAndResponsesMaps.contains(batchKey) && batchConfiguration.maxBatchBytesSize() > 0) { - Optional.of(requestsAndResponsesMaps.flushableRequestsOnByteLimitBeforeAdd(batchKey, request)) - .filter(flushableRequests -> !flushableRequests.isEmpty()) - .ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests)); + Optional.of(requestsAndResponsesMaps.extractBatchIfSizeExceeded(batchKey, request)) + .filter(extractedEntries -> !extractedEntries.isEmpty()) + .ifPresent(extractedEntries -> manualFlushBuffer(batchKey, extractedEntries)); } // Add request and response to the map, scheduling a flush if necessary @@ -86,9 +86,9 @@ public CompletableFuture batchRequest(RequestT request) { response); // Immediately flush if the batch is full - Optional.of(requestsAndResponsesMaps.flushableRequests(batchKey)) - .filter(flushableRequests -> !flushableRequests.isEmpty()) - .ifPresent(flushableRequests -> manualFlushBuffer(batchKey, flushableRequests)); + Optional.of(requestsAndResponsesMaps.extractBatchIfReady(batchKey)) + .filter(extractedEntries -> !extractedEntries.isEmpty()) + .ifPresent(extractedEntries -> manualFlushBuffer(batchKey, extractedEntries)); } catch (Exception e) { response.completeExceptionally(e); @@ -153,21 +153,21 @@ private ScheduledFuture scheduleBufferFlush(String batchKey, long timeOutInMs } private void performScheduledFlush(String batchKey) { - Map> flushableRequests = - requestsAndResponsesMaps.flushableScheduledRequests(batchKey, maxBatchItems); - if (!flushableRequests.isEmpty()) { - flushBuffer(batchKey, flushableRequests); + Map> extractedEntries = + requestsAndResponsesMaps.extractEntriesForScheduledFlush(batchKey, maxBatchItems); + if (!extractedEntries.isEmpty()) { + flushBuffer(batchKey, extractedEntries); } } public void close() { requestsAndResponsesMaps.forEach((batchKey, batchBuffer) -> { requestsAndResponsesMaps.cancelScheduledFlush(batchKey); - Map> flushableRequests = - requestsAndResponsesMaps.flushableRequests(batchKey); + Map> + extractedEntries = requestsAndResponsesMaps.extractBatchIfReady(batchKey); - while (!flushableRequests.isEmpty()) { - flushBuffer(batchKey, flushableRequests); + while (!extractedEntries.isEmpty()) { + flushBuffer(batchKey, extractedEntries); } }); diff --git a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java index df9402e85b1..f1e276f23d8 100644 --- a/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java +++ b/services/sqs/src/test/java/software/amazon/awssdk/services/sqs/batchmanager/RequestBatchBufferTest.java @@ -52,23 +52,23 @@ void whenPutRequestThenBufferContainsRequest() { } @Test - void whenFlushableRequestsThenReturnRequestsUpToMaxBatchItems() { + void whenExtractBatchIfReadyThenReturnRequestsUpToMaxBatchItems() { batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); - Map> flushedRequests = batchBuffer.flushableRequests(); - assertEquals(1, flushedRequests.size()); - assertTrue(flushedRequests.containsKey("0")); + Map> extractedEntries = batchBuffer.extractBatchIfReady(); + assertEquals(1, extractedEntries.size()); + assertTrue(extractedEntries.containsKey("0")); } @Test - void whenFlushableScheduledRequestsThenReturnAllRequests() { + void whenExtractEntriesForScheduledFlushThenReturnAllRequests() { batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 10, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); CompletableFuture response = new CompletableFuture<>(); batchBuffer.put("request1", response); - Map> flushedRequests = batchBuffer.flushableScheduledRequests(1); - assertEquals(1, flushedRequests.size()); - assertTrue(flushedRequests.containsKey("0")); + Map> extractedEntries = batchBuffer.extractEntriesForScheduledFlush(1); + assertEquals(1, extractedEntries.size()); + assertTrue(extractedEntries.containsKey("0")); } @Test @@ -119,20 +119,20 @@ void whenClearBufferThenBufferIsEmpty() { } @Test - void whenExtractFlushedEntriesThenReturnCorrectEntries() { + void whenExtractEntriesThenReturnCorrectEntries() { batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 5, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); for (int i = 0; i < 5; i++) { batchBuffer.put("request" + i, new CompletableFuture<>()); } - Map> flushedEntries = batchBuffer.flushableRequests(); - assertEquals(5, flushedEntries.size()); + Map> extractedEntries = batchBuffer.extractBatchIfReady(); + assertEquals(5, extractedEntries.size()); } @Test void whenHasNextBatchEntryThenReturnTrue() { batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.put("request1", new CompletableFuture<>()); - assertTrue(batchBuffer.flushableRequests().containsKey("0")); + assertTrue(batchBuffer.extractBatchIfReady().containsKey("0")); } @@ -140,7 +140,7 @@ void whenHasNextBatchEntryThenReturnTrue() { void whenNextBatchEntryThenReturnNextEntryId() { batchBuffer = new RequestBatchBuffer<>(scheduledFlush, 1, MAX_SEND_MESSAGE_PAYLOAD_SIZE_BYTES, maxBufferSize); batchBuffer.put("request1", new CompletableFuture<>()); - assertEquals("0", batchBuffer.flushableRequests().keySet().iterator().next()); + assertEquals("0", batchBuffer.extractBatchIfReady().keySet().iterator().next()); } @Test @@ -151,9 +151,9 @@ void whenRequestPassedWithLessBytesinArgs_thenCheckForSizeOnly_andDonotFlush() { batchBuffer.put(SendMessageRequest.builder().build(), new CompletableFuture<>()); } - Map> flushedEntries = - batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("Hi").build()); - assertEquals(0, flushedEntries.size()); + Map> extractedEntries = + batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("Hi").build()); + assertEquals(0, extractedEntries.size()); } @@ -166,9 +166,9 @@ void testFlushWhenPayloadExceedsMaxSize() { String largeMessageBody = createLargeString('a',245_760); batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), new CompletableFuture<>()); - Map> flushedEntries = - batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build()); - assertEquals(1, flushedEntries.size()); + Map> extractedEntries = + batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("NewMessage").build()); + assertEquals(1, extractedEntries.size()); } @Test @@ -181,11 +181,11 @@ void testFlushWhenCumulativePayloadExceedsMaxSize() { new CompletableFuture<>()); batchBuffer.put(SendMessageRequest.builder().messageBody(largeMessageBody).build(), new CompletableFuture<>()); - Map> flushedEntries = - batchBuffer.flushableRequestsOnByteLimitBeforeAdd(SendMessageRequest.builder().messageBody("NewMessage").build()); + Map> extractedEntries = + batchBuffer.extractBatchIfSizeExceeded(SendMessageRequest.builder().messageBody("NewMessage").build()); //Flushes both the messages since thier sum is greater than 256Kb - assertEquals(2, flushedEntries.size()); + assertEquals(2, extractedEntries.size()); }