diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java index 2818ce4f8..e86015c97 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java @@ -183,6 +183,17 @@ public static List batchProcessor(final SQSEvent event, return batchProcessor(event, suppressException, handlerInstance, false, nonRetryableExceptions); } + @SafeVarargs + public static List batchProcessor(final SQSEvent event, + final boolean suppressException, + final Class> handler, + final boolean deleteNonRetryableMessageFromQueue, + final Class... nonRetryableExceptions) { + + SqsMessageHandler handlerInstance = instantiatedHandler(handler); + return batchProcessor(event, suppressException, handlerInstance, deleteNonRetryableMessageFromQueue, nonRetryableExceptions); + } + /** * This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent} * diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java index 2801ded55..73e91c3a7 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java @@ -29,7 +29,11 @@ && placedOnSqsEventRequestHandler(pjp)) { SQSEvent sqsEvent = (SQSEvent) proceedArgs[0]; - batchProcessor(sqsEvent, sqsBatch.suppressException(), sqsBatch.value(), sqsBatch.nonRetryableExceptions()); + batchProcessor(sqsEvent, + sqsBatch.suppressException(), + sqsBatch.value(), + sqsBatch.deleteNonRetryableMessageFromQueue(), + sqsBatch.nonRetryableExceptions()); } return pjp.proceed(proceedArgs); diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsBatchProcessorTest.java index cb866b0f5..eb1128320 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SqsUtilsBatchProcessorTest.java @@ -242,6 +242,36 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() { verify(sqsClient).sendMessageBatch(any(Consumer.class)); } + @Test + void shouldBatchProcessAndDeleteNonRetryableException() { + String failedId = "2e1424d4-f796-459a-8184-9c92662be6da"; + HashMap attributes = new HashMap<>(); + + attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" + + " \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" + + " \"maxReceiveCount\": 2\n" + + "}"); + + when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder() + .attributes(attributes) + .build()); + + List batchProcessor = batchProcessor(event, false, (message) -> { + if (failedId.equals(message.getMessageId())) { + throw new IllegalStateException("Failed processing"); + } + + interactionClient.listQueues(); + return "Success"; + }, true, IllegalStateException.class, IllegalArgumentException.class); + + Assertions.assertThat(batchProcessor) + .hasSize(1); + + verify(sqsClient, times(0)).sendMessageBatch(any(Consumer.class)); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + public class FailureSampleInnerSqsHandler implements SqsMessageHandler { @Override public String process(SQSEvent.SQSMessage message) { diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandlerWithNonRetryableHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandlerWithNonRetryableHandler.java index a13049ed9..282c35897 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandlerWithNonRetryableHandler.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandlerWithNonRetryableHandler.java @@ -26,6 +26,10 @@ public String process(SQSMessage message) { throw new IllegalArgumentException("Invalid message and was moved to DLQ"); } + if("2e1424d4-f796-459a-9696-9c92662ba5da".equals(message.getMessageId())) { + throw new RuntimeException("Invalid message and should be reprocessed"); + } + mockedRandom.nextInt(); return "Success"; } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandlerWithNonRetryableHandlerWithDelete.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandlerWithNonRetryableHandlerWithDelete.java new file mode 100644 index 000000000..ed0516253 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/SqsMessageHandlerWithNonRetryableHandlerWithDelete.java @@ -0,0 +1,39 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.SqsBatch; +import software.amazon.lambda.powertools.sqs.SqsMessageHandler; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.mockedRandom; + +public class SqsMessageHandlerWithNonRetryableHandlerWithDelete implements RequestHandler { + + @Override + @SqsBatch(value = InnerMessageHandler.class, + nonRetryableExceptions = {IllegalStateException.class, IllegalArgumentException.class}, + deleteNonRetryableMessageFromQueue = true) + public String handleRequest(final SQSEvent sqsEvent, + final Context context) { + return "Success"; + } + + private class InnerMessageHandler implements SqsMessageHandler { + + @Override + public String process(SQSMessage message) { + if(message.getMessageId().isEmpty()) { + throw new IllegalArgumentException("Invalid message and was moved to DLQ"); + } + + if("2e1424d4-f796-459a-9696-9c92662ba5da".equals(message.getMessageId())) { + throw new RuntimeException("Invalid message and should be reprocessed"); + } + + mockedRandom.nextInt(); + return "Success"; + } + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java index d44599337..e53817e76 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java @@ -12,6 +12,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest; @@ -23,11 +24,13 @@ import software.amazon.lambda.powertools.sqs.handlers.PartialBatchPartialFailureHandler; import software.amazon.lambda.powertools.sqs.handlers.PartialBatchSuccessHandler; import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandlerWithNonRetryableHandler; +import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandlerWithNonRetryableHandlerWithDelete; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatExceptionOfType; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -131,6 +134,137 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() { verify(sqsClient).sendMessageBatch(any(Consumer.class)); } + @Test + void shouldBatchProcessAndDeleteNonRetryableExceptionMessage() { + requestHandler = new SqsMessageHandlerWithNonRetryableHandlerWithDelete(); + event.getRecords().get(0).setMessageId(""); + + requestHandler.handleRequest(event, context); + + verify(mockedRandom).nextInt(); + ArgumentCaptor captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class); + verify(sqsClient).deleteMessageBatch(captor.capture()); + verify(sqsClient, never()).sendMessageBatch(any(Consumer.class)); + verify(sqsClient, never()).getQueueAttributes(any(GetQueueAttributesRequest.class)); + + assertThat(captor.getValue()) + .satisfies(deleteMessageBatchRequest -> assertThat(deleteMessageBatchRequest.entries()) + .hasSize(2) + .extracting("id") + .contains("", "2e1424d4-f796-459a-8184-9c92662be6da")); + } + + @Test + void shouldBatchProcessAndFailWithExceptionForNonRetryableExceptionAndNoDlq() { + requestHandler = new SqsMessageHandlerWithNonRetryableHandler(); + + event.getRecords().get(0).setMessageId(""); + event.getRecords().forEach(sqsMessage -> sqsMessage.setEventSourceArn(sqsMessage.getEventSourceArn() + "-temp")); + + when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder() + .build()); + + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> requestHandler.handleRequest(event, context)) + .satisfies(e -> { + assertThat(e.getExceptions()) + .hasSize(1) + .extracting("detailMessage") + .containsExactly("Invalid message and was moved to DLQ"); + + assertThat(e.getFailures()) + .hasSize(1) + .extracting("messageId") + .containsExactly(""); + + assertThat(e.successMessageReturnValues()) + .hasSize(1) + .contains("Success"); + }); + + verify(mockedRandom).nextInt(); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + verify(sqsClient, never()).sendMessageBatch(any(Consumer.class)); + } + + @Test + void shouldBatchProcessAndFailWithExceptionForNonRetryableExceptionWhenFailedParsingPolicy() { + requestHandler = new SqsMessageHandlerWithNonRetryableHandler(); + event.getRecords().get(0).setMessageId(""); + event.getRecords().forEach(sqsMessage -> sqsMessage.setEventSourceArn(sqsMessage.getEventSourceArn() + "-temp-queue")); + + HashMap attributes = new HashMap<>(); + + attributes.put(QueueAttributeName.REDRIVE_POLICY, "MalFormedRedrivePolicy"); + + when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder() + .attributes(attributes) + .build()); + + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> requestHandler.handleRequest(event, context)) + .satisfies(e -> { + assertThat(e.getExceptions()) + .hasSize(1) + .extracting("detailMessage") + .containsExactly("Invalid message and was moved to DLQ"); + + assertThat(e.getFailures()) + .hasSize(1) + .extracting("messageId") + .containsExactly(""); + + assertThat(e.successMessageReturnValues()) + .hasSize(1) + .contains("Success"); + }); + + verify(mockedRandom).nextInt(); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + verify(sqsClient, never()).sendMessageBatch(any(Consumer.class)); + } + + @Test + void shouldBatchProcessAndMoveNonRetryableExceptionToDlqAndThrowException() throws IOException { + requestHandler = new SqsMessageHandlerWithNonRetryableHandler(); + event = MAPPER.readValue(this.getClass().getResource("/threeMessageSqsBatchEvent.json"), SQSEvent.class); + + event.getRecords().get(1).setMessageId(""); + + HashMap attributes = new HashMap<>(); + + attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" + + " \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" + + " \"maxReceiveCount\": 2\n" + + "}"); + + when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder() + .attributes(attributes) + .build()); + + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> requestHandler.handleRequest(event, context)) + .satisfies(e -> { + assertThat(e.getExceptions()) + .hasSize(1) + .extracting("detailMessage") + .containsExactly("Invalid message and should be reprocessed"); + + assertThat(e.getFailures()) + .hasSize(1) + .extracting("messageId") + .containsExactly("2e1424d4-f796-459a-9696-9c92662ba5da"); + + assertThat(e.successMessageReturnValues()) + .hasSize(1) + .contains("Success"); + }); + + verify(mockedRandom).nextInt(); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + verify(sqsClient).sendMessageBatch(any(Consumer.class)); + } + private void setupContext() { when(context.getFunctionName()).thenReturn("testFunction"); when(context.getInvokedFunctionArn()).thenReturn("testArn"); diff --git a/powertools-sqs/src/test/resources/threeMessageSqsBatchEvent.json b/powertools-sqs/src/test/resources/threeMessageSqsBatchEvent.json new file mode 100644 index 000000000..b3b61da3b --- /dev/null +++ b/powertools-sqs/src/test/resources/threeMessageSqsBatchEvent.json @@ -0,0 +1,70 @@ +{ + "records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + }, + { + "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": { + "Attribute3" : { + "binaryValue" : "MTEwMA==", + "dataType" : "Binary" + }, + "Attribute2" : { + "stringValue" : "123", + "dataType" : "Number" + }, + "Attribute1" : { + "stringValue" : "AttributeValue1", + "dataType" : "String" + } + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + }, + { + "messageId": "2e1424d4-f796-459a-9696-9c92662ba5da", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": { + "Attribute2" : { + "stringValue" : "123", + "dataType" : "Number" + } + }, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file