From 990cb6376168adbc7d3365e430cccde4e50f2271 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Fri, 2 Oct 2020 18:56:08 +0200 Subject: [PATCH 01/12] Initial API skeleton for Partial SQS batch util --- powertools-sqs/pom.xml | 4 + .../lambda/powertools/sqs/PowertoolsSqs.java | 47 +++++++++++ .../sqs/SQSBatchProcessingException.java | 22 ++++++ .../powertools/sqs/SqsBatchProcessor.java | 16 ++++ .../powertools/sqs/SqsMessageHandler.java | 9 +++ .../powertools/sqs/internal/BatchContext.java | 77 +++++++++++++++++++ .../SqsMessageBatchProcessorAspect.java | 40 ++++++++++ .../sqs/handlers/PartialBatchHandler.java | 37 +++++++++ 8 files changed, 252 insertions(+) create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java create mode 100644 powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java diff --git a/powertools-sqs/pom.xml b/powertools-sqs/pom.xml index 5ea80f5cc..52867d191 100644 --- a/powertools-sqs/pom.xml +++ b/powertools-sqs/pom.xml @@ -57,6 +57,10 @@ software.amazon.payloadoffloading payloadoffloading-common + + software.amazon.awssdk + sqs + org.aspectj diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java index 1a60bdc71..5eaf24c41 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java @@ -13,6 +13,8 @@ */ package software.amazon.lambda.powertools.sqs; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; @@ -20,6 +22,8 @@ import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.lambda.powertools.sqs.internal.BatchContext; import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect; import software.amazon.payloadoffloading.PayloadS3Pointer; @@ -34,6 +38,7 @@ public final class PowertoolsSqs { private static final ObjectMapper objectMapper = new ObjectMapper(); + private static SqsClient client = SqsClient.create(); private PowertoolsSqs() { } @@ -80,6 +85,47 @@ public static R enrichedMessageFromS3(final SQSEvent sqsEvent, return returnValue; } + public static void defaultSqsClient(SqsClient client) { + PowertoolsSqs.client = client; + } + + public static SqsClient defaultSqsClient() { + return client; + } + + public static List partialBatchProcessor(final SQSEvent event, + boolean suppressException, + final Class> handler) { + + try { + return partialBatchProcessor(event, suppressException, handler.newInstance()); + } catch (IllegalAccessException | InstantiationException e) { + // LOG something + return Collections.emptyList(); + } + } + + public static List partialBatchProcessor(final SQSEvent event, + final boolean suppressException, + final SqsMessageHandler handler) { + final List handlerReturn = new ArrayList<>(); + + BatchContext batchContext = new BatchContext(defaultSqsClient()); + + for (SQSMessage message : event.getRecords()) { + try { + handlerReturn.add(handler.process(message)); + batchContext.addSuccess(message); + } catch (Exception e) { + batchContext.addFailure(message, e); + } + } + + batchContext.processSuccessAndReset(suppressException); + + return handlerReturn; + } + private static SQSMessage clonedMessage(final SQSMessage sqsMessage) { try { return objectMapper @@ -88,4 +134,5 @@ private static SQSMessage clonedMessage(final SQSMessage sqsMessage) { throw new RuntimeException(e); } } + } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java new file mode 100644 index 000000000..fa5d9a971 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java @@ -0,0 +1,22 @@ +package software.amazon.lambda.powertools.sqs; + +import java.util.ArrayList; +import java.util.List; + +import static java.util.stream.Collectors.joining; + +public class SQSBatchProcessingException extends RuntimeException { + + private final List exceptions; + + public SQSBatchProcessingException(List exceptions) { + this.exceptions = new ArrayList<>(exceptions); + } + + @Override + public String getMessage() { + return exceptions.stream() + .map(Throwable::getMessage) + .collect(joining("\n")); + } +} diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java new file mode 100644 index 000000000..24ea22a15 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java @@ -0,0 +1,16 @@ +package software.amazon.lambda.powertools.sqs; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +public @interface SqsBatchProcessor { + + Class> value(); + + boolean suppressException() default false; +} diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java new file mode 100644 index 000000000..8a626d233 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java @@ -0,0 +1,9 @@ +package software.amazon.lambda.powertools.sqs; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +@FunctionalInterface +public interface SqsMessageHandler { + + R process(SQSMessage message); +} diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java new file mode 100644 index 000000000..69ab227e9 --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java @@ -0,0 +1,77 @@ +package software.amazon.lambda.powertools.sqs.internal; + +import java.util.ArrayList; +import java.util.List; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; +import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException; + +import static java.util.stream.Collectors.toList; + +public final class BatchContext { + private List success = new ArrayList<>(); + private List failures = new ArrayList<>(); + private List exceptions = new ArrayList<>(); + private SqsClient client; + + public BatchContext(SqsClient client) { + this.client = client; + } + + public void addSuccess(SQSEvent.SQSMessage event) { + success.add(event); + } + + public void addFailure(SQSEvent.SQSMessage event, Exception e) { + failures.add(event); + exceptions.add(e); + } + + private boolean hasFailures() { + return !failures.isEmpty(); + } + + private void cleanUpAndReset() { + DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder() + .queueUrl(url()) + .entries(success.stream().map(m -> DeleteMessageBatchRequestEntry.builder() + .id(m.getMessageId()) + .receiptHandle(m.getReceiptHandle()) + .build()).collect(toList())) + .build(); + + client.deleteMessageBatch(request); + } + + private String url() { + String[] arnArray = success.get(0).getEventSourceArn().split(":"); + return client.getQueueUrl(GetQueueUrlRequest.builder() + .queueOwnerAWSAccountId(arnArray[1]) + .queueName(arnArray[2]) + .build()) + .queueUrl(); + } + + private void reset() { + success = new ArrayList<>(); + failures = new ArrayList<>(); + exceptions = new ArrayList<>(); + } + + public void processSuccessAndReset(final boolean suppressException) { + if (hasFailures() && !suppressException) { + SQSBatchProcessingException exception = new SQSBatchProcessingException(exceptions); + cleanUpAndReset(); + throw exception; + } else if (hasFailures()) { + // LOG + cleanUpAndReset(); + } else { + reset(); + } + } +} diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java new file mode 100644 index 000000000..9111c21fc --- /dev/null +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java @@ -0,0 +1,40 @@ +package software.amazon.lambda.powertools.sqs.internal; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.aspectj.lang.ProceedingJoinPoint; +import org.aspectj.lang.annotation.Around; +import org.aspectj.lang.annotation.Aspect; +import org.aspectj.lang.annotation.Pointcut; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.lambda.powertools.sqs.PowertoolsSqs; +import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; + +import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.placedOnSqsEventRequestHandler; + +@Aspect +public class SqsMessageBatchProcessorAspect { + private static final SqsClient client = SqsClient.create(); + private static BatchContext details = new BatchContext(PowertoolsSqs.defaultSqsClient()); + + @SuppressWarnings({"EmptyMethod"}) + @Pointcut("@annotation(sqsBatchProcessor)") + public void callAt(SqsBatchProcessor sqsBatchProcessor) { + } + + @Around(value = "callAt(sqsBatchProcessor) && execution(@SqsBatchProcessor * *.*(..))", argNames = "pjp,sqsBatchProcessor") + public Object around(ProceedingJoinPoint pjp, + SqsBatchProcessor sqsBatchProcessor) throws Throwable { + Object[] proceedArgs = pjp.getArgs(); + + if (isHandlerMethod(pjp) + && placedOnSqsEventRequestHandler(pjp)) { + + SQSEvent sqsEvent = (SQSEvent) proceedArgs[0]; + + PowertoolsSqs.partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value().newInstance()); + } + + return pjp.proceed(proceedArgs); + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java new file mode 100644 index 000000000..4eef24b0d --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java @@ -0,0 +1,37 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import java.util.List; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.LargeMessageHandler; +import software.amazon.lambda.powertools.sqs.PowertoolsSqs; +import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; +import software.amazon.lambda.powertools.sqs.SqsMessageHandler; + +public class PartialBatchHandler implements RequestHandler> { + + + @Override + @LargeMessageHandler + @SqsBatchProcessor(HandlerSqs.class) + public List handleRequest(SQSEvent sqsEvent, Context context) { + + List returnValues = + PowertoolsSqs.partialBatchProcessor(sqsEvent, false, HandlerSqs.class); + + // Do some processing on processed message + + return returnValues; + } + + private class HandlerSqs implements SqsMessageHandler { + + @Override + public String process(SQSEvent.SQSMessage message) { + // This is where you process message + return null; + } + } +} From 0752389ac1410c1d65c47f0021857dfe0f705d15 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sat, 3 Oct 2020 08:39:47 +0200 Subject: [PATCH 02/12] Better error handling --- .../lambda/powertools/sqs/PowertoolsSqs.java | 18 +++-- .../powertools/sqs/internal/BatchContext.java | 76 +++++++++++-------- ...Aspect.java => SqsLargeMessageAspect.java} | 4 +- .../SqsMessageBatchProcessorAspect.java | 9 +-- .../powertools/sqs/PowertoolsSqsTest.java | 12 +-- .../sqs/handlers/PartialBatchHandler.java | 1 - ...st.java => SqsLargeMessageAspectTest.java} | 18 ++--- 7 files changed, 74 insertions(+), 64 deletions(-) rename powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/{SqsMessageAspect.java => SqsLargeMessageAspect.java} (97%) rename powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/{SqsMessageAspectTest.java => SqsLargeMessageAspectTest.java} (96%) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java index 5eaf24c41..2eb813ba5 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java @@ -14,7 +14,6 @@ package software.amazon.lambda.powertools.sqs; import java.util.ArrayList; -import java.util.Collections; import java.util.List; import java.util.function.Function; import java.util.stream.Collectors; @@ -22,13 +21,15 @@ import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.lambda.powertools.sqs.internal.BatchContext; -import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect; +import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect; import software.amazon.payloadoffloading.PayloadS3Pointer; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; -import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.processMessages; +import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.processMessages; /** * A class of helper functions to add additional functionality to LargeMessageHandler. @@ -36,6 +37,7 @@ * {@see PowertoolsLogging} */ public final class PowertoolsSqs { + private static final Log LOG = LogFactory.getLog(PowertoolsSqs.class); private static final ObjectMapper objectMapper = new ObjectMapper(); private static SqsClient client = SqsClient.create(); @@ -79,7 +81,7 @@ public static R enrichedMessageFromS3(final SQSEvent sqsEvent, R returnValue = messageFunction.apply(sqsMessages); if (deleteS3Payload) { - s3Pointers.forEach(SqsMessageAspect::deleteMessage); + s3Pointers.forEach(SqsLargeMessageAspect::deleteMessage); } return returnValue; @@ -94,14 +96,15 @@ public static SqsClient defaultSqsClient() { } public static List partialBatchProcessor(final SQSEvent event, - boolean suppressException, + final boolean suppressException, final Class> handler) { try { return partialBatchProcessor(event, suppressException, handler.newInstance()); } catch (IllegalAccessException | InstantiationException e) { - // LOG something - return Collections.emptyList(); + LOG.error("Failed invoking process method on handler", e); + throw new RuntimeException("Unexpected error occurred. Please raise issue at " + + "https://github.com/awslabs/aws-lambda-powertools-java/issues", e); } } @@ -134,5 +137,4 @@ private static SQSMessage clonedMessage(final SQSMessage sqsMessage) { throw new RuntimeException(e); } } - } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java index 69ab227e9..51da34dfb 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java @@ -3,48 +3,73 @@ import java.util.ArrayList; import java.util.List; -import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException; +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static java.lang.String.format; import static java.util.stream.Collectors.toList; public final class BatchContext { - private List success = new ArrayList<>(); - private List failures = new ArrayList<>(); - private List exceptions = new ArrayList<>(); - private SqsClient client; + private static final Log LOG = LogFactory.getLog(BatchContext.class); + + private final List success = new ArrayList<>(); + private final List failures = new ArrayList<>(); + private final List exceptions = new ArrayList<>(); + private final SqsClient client; public BatchContext(SqsClient client) { this.client = client; } - public void addSuccess(SQSEvent.SQSMessage event) { + public void addSuccess(SQSMessage event) { success.add(event); } - public void addFailure(SQSEvent.SQSMessage event, Exception e) { + public void addFailure(SQSMessage event, Exception e) { failures.add(event); exceptions.add(e); } + public void processSuccessAndReset(final boolean suppressException) { + try { + if (hasFailures()) { + + deleteSuccessMessage(); + + if (suppressException) { + List messageIds = failures.stream().map(SQSMessage::getMessageId).collect(toList()); + LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. Failed messages %s", failures.size(), messageIds)); + } else { + throw new SQSBatchProcessingException(exceptions); + } + } + } finally { + reset(); + } + } + private boolean hasFailures() { return !failures.isEmpty(); } - private void cleanUpAndReset() { - DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder() - .queueUrl(url()) - .entries(success.stream().map(m -> DeleteMessageBatchRequestEntry.builder() - .id(m.getMessageId()) - .receiptHandle(m.getReceiptHandle()) - .build()).collect(toList())) - .build(); + private void deleteSuccessMessage() { + if (!success.isEmpty()) { + DeleteMessageBatchRequest request = DeleteMessageBatchRequest.builder() + .queueUrl(url()) + .entries(success.stream().map(m -> DeleteMessageBatchRequestEntry.builder() + .id(m.getMessageId()) + .receiptHandle(m.getReceiptHandle()) + .build()).collect(toList())) + .build(); - client.deleteMessageBatch(request); + client.deleteMessageBatch(request); + } } private String url() { @@ -57,21 +82,8 @@ private String url() { } private void reset() { - success = new ArrayList<>(); - failures = new ArrayList<>(); - exceptions = new ArrayList<>(); - } - - public void processSuccessAndReset(final boolean suppressException) { - if (hasFailures() && !suppressException) { - SQSBatchProcessingException exception = new SQSBatchProcessingException(exceptions); - cleanUpAndReset(); - throw exception; - } else if (hasFailures()) { - // LOG - cleanUpAndReset(); - } else { - reset(); - } + success.clear(); + failures.clear(); + exceptions.clear(); } } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspect.java similarity index 97% rename from powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java rename to powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspect.java index 68be09adb..64fafe38f 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspect.java @@ -28,9 +28,9 @@ import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; @Aspect -public class SqsMessageAspect { +public class SqsLargeMessageAspect { - private static final Log LOG = LogFactory.getLog(SqsMessageAspect.class); + private static final Log LOG = LogFactory.getLog(SqsLargeMessageAspect.class); private static AmazonS3 amazonS3 = AmazonS3ClientBuilder.defaultClient(); @SuppressWarnings({"EmptyMethod"}) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java index 9111c21fc..dd982b157 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java @@ -5,17 +5,14 @@ import org.aspectj.lang.annotation.Around; import org.aspectj.lang.annotation.Aspect; import org.aspectj.lang.annotation.Pointcut; -import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.lambda.powertools.sqs.PowertoolsSqs; import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; -import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.placedOnSqsEventRequestHandler; +import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.partialBatchProcessor; +import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.placedOnSqsEventRequestHandler; @Aspect public class SqsMessageBatchProcessorAspect { - private static final SqsClient client = SqsClient.create(); - private static BatchContext details = new BatchContext(PowertoolsSqs.defaultSqsClient()); @SuppressWarnings({"EmptyMethod"}) @Pointcut("@annotation(sqsBatchProcessor)") @@ -32,7 +29,7 @@ && placedOnSqsEventRequestHandler(pjp)) { SQSEvent sqsEvent = (SQSEvent) proceedArgs[0]; - PowertoolsSqs.partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value().newInstance()); + partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value().newInstance()); } return pjp.proceed(proceedArgs); diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java index 7528188a9..f91e9ff59 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java @@ -21,7 +21,7 @@ import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; -import software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect; +import software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import static java.util.Collections.singletonList; @@ -33,7 +33,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; +import static org.mockito.MockitoAnnotations.openMocks; class PowertoolsSqsTest { @@ -44,8 +44,8 @@ class PowertoolsSqsTest { @BeforeEach void setUp() throws IllegalAccessException { - initMocks(this); - writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true); + openMocks(this); + writeStaticField(SqsLargeMessageAspect.class, "amazonS3", amazonS3, true); } @Test @@ -122,7 +122,7 @@ public void shouldFailEntireBatchIfFailedDownloadingFromS3(RuntimeException exce String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; SQSEvent sqsEvent = messageWithBody(messageBody); - assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class) + assertThatExceptionOfType(SqsLargeMessageAspect.FailedProcessingLargePayloadException.class) .isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody())) .withCause(exception); @@ -145,7 +145,7 @@ public void close() throws IOException { String messageBody = "[\"software.amazon.payloadoffloading.PayloadS3Pointer\",{\"s3BucketName\":\"" + BUCKET_NAME + "\",\"s3Key\":\"" + BUCKET_KEY + "\"}]"; SQSEvent sqsEvent = messageWithBody(messageBody); - assertThatExceptionOfType(SqsMessageAspect.FailedProcessingLargePayloadException.class) + assertThatExceptionOfType(SqsLargeMessageAspect.FailedProcessingLargePayloadException.class) .isThrownBy(() -> PowertoolsSqs.enrichedMessageFromS3(sqsEvent, sqsMessages -> sqsMessages.get(0).getBody())) .withCauseInstanceOf(IOException.class); diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java index 4eef24b0d..83e9b6c3a 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java @@ -12,7 +12,6 @@ public class PartialBatchHandler implements RequestHandler> { - @Override @LargeMessageHandler @SqsBatchProcessor(HandlerSqs.class) diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspectTest.java similarity index 96% rename from powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java rename to powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspectTest.java index cafec7eef..1837686f1 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsLargeMessageAspectTest.java @@ -1,5 +1,9 @@ package software.amazon.lambda.powertools.sqs.internal; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.util.stream.Stream; + import com.amazonaws.AmazonServiceException; import com.amazonaws.SdkClientException; import com.amazonaws.services.lambda.runtime.Context; @@ -21,10 +25,6 @@ import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandler; import software.amazon.lambda.powertools.sqs.handlers.SqsNoDeleteMessageHandler; -import java.io.ByteArrayInputStream; -import java.io.IOException; -import java.util.stream.Stream; - import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import static java.util.Collections.singletonList; import static org.apache.commons.lang3.reflect.FieldUtils.writeStaticField; @@ -35,10 +35,10 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; -import static software.amazon.lambda.powertools.sqs.internal.SqsMessageAspect.FailedProcessingLargePayloadException; +import static org.mockito.MockitoAnnotations.openMocks; +import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.FailedProcessingLargePayloadException; -public class SqsMessageAspectTest { +public class SqsLargeMessageAspectTest { private RequestHandler requestHandler; @@ -53,9 +53,9 @@ public class SqsMessageAspectTest { @BeforeEach void setUp() throws IllegalAccessException { - initMocks(this); + openMocks(this); setupContext(); - writeStaticField(SqsMessageAspect.class, "amazonS3", amazonS3, true); + writeStaticField(SqsLargeMessageAspect.class, "amazonS3", amazonS3, true); requestHandler = new SqsMessageHandler(); } From 97e286093fce6395399102970881ab03b2b1a826 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sat, 3 Oct 2020 12:05:32 +0200 Subject: [PATCH 03/12] Initial Test cases setup and some refactorings --- .../lambda/powertools/sqs/PowertoolsSqs.java | 21 ++++- .../sqs/SQSBatchProcessingException.java | 40 +++++++++- .../powertools/sqs/internal/BatchContext.java | 5 +- .../SqsMessageBatchProcessorAspect.java | 2 +- .../sqs/PowertoolsSqsBatchProcessorTest.java | 76 +++++++++++++++++++ ...ava => PowertoolsSqsLargeMessageTest.java} | 2 +- .../powertools/sqs/SampleSqsHandler.java | 12 +++ .../test/resources/sampleSqsBatchEvent.json | 36 +++++++++ 8 files changed, 185 insertions(+), 9 deletions(-) create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java rename powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/{PowertoolsSqsTest.java => PowertoolsSqsLargeMessageTest.java} (99%) create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SampleSqsHandler.java create mode 100644 powertools-sqs/src/test/resources/sampleSqsBatchEvent.json diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java index 2eb813ba5..4738f065a 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java @@ -13,6 +13,8 @@ */ package software.amazon.lambda.powertools.sqs; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -100,8 +102,16 @@ public static List partialBatchProcessor(final SQSEvent event, final Class> handler) { try { - return partialBatchProcessor(event, suppressException, handler.newInstance()); - } catch (IllegalAccessException | InstantiationException e) { + SqsMessageHandler handlerInstance; + if (null == handler.getDeclaringClass()) { + handlerInstance = handler.newInstance(); + } else { + Constructor> constructor = handler.getDeclaredConstructor(handler.getDeclaringClass()); + constructor.setAccessible(true); + handlerInstance = constructor.newInstance(handler.getDeclaringClass().newInstance()); + } + return partialBatchProcessor(event, suppressException, handlerInstance); + } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) { LOG.error("Failed invoking process method on handler", e); throw new RuntimeException("Unexpected error occurred. Please raise issue at " + "https://github.com/awslabs/aws-lambda-powertools-java/issues", e); @@ -124,7 +134,12 @@ public static List partialBatchProcessor(final SQSEvent event, } } - batchContext.processSuccessAndReset(suppressException); + try { + batchContext.processSuccessAndReset(suppressException); + } catch (SQSBatchProcessingException e) { + e.addSuccessMessageReturnValues(handlerReturn); + throw e; + } return handlerReturn; } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java index fa5d9a971..7e17ab8ed 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java @@ -3,20 +3,56 @@ import java.util.ArrayList; import java.util.List; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; + import static java.util.stream.Collectors.joining; public class SQSBatchProcessingException extends RuntimeException { private final List exceptions; + private final List failures; + private final List returnValues; - public SQSBatchProcessingException(List exceptions) { + public SQSBatchProcessingException(final List exceptions, + final List failures, + final List successReturns) { this.exceptions = new ArrayList<>(exceptions); + this.failures = new ArrayList<>(failures); + this.returnValues = new ArrayList<>(successReturns); + } + + public List getExceptions() { + return exceptions; + } + + public List successMessageReturnValues() { + return returnValues; + } + + public List getFailures() { + return failures; } @Override public String getMessage() { return exceptions.stream() - .map(Throwable::getMessage) + .map(Throwable::toString) .collect(joining("\n")); } + + @Override + public void printStackTrace() { + for (Exception exception : exceptions) { + exception.printStackTrace(); + } + } + + @Override + public String toString() { + return getMessage(); + } + + void addSuccessMessageReturnValues(final List returnValues) { + this.returnValues.addAll(returnValues); + } } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java index 51da34dfb..54942f1ca 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java @@ -1,5 +1,6 @@ package software.amazon.lambda.powertools.sqs.internal; +import java.awt.geom.Area; import java.util.ArrayList; import java.util.List; @@ -36,7 +37,7 @@ public void addFailure(SQSMessage event, Exception e) { exceptions.add(e); } - public void processSuccessAndReset(final boolean suppressException) { + public void processSuccessAndReset(final boolean suppressException) { try { if (hasFailures()) { @@ -46,7 +47,7 @@ public void processSuccessAndReset(final boolean suppressException) { List messageIds = failures.stream().map(SQSMessage::getMessageId).collect(toList()); LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. Failed messages %s", failures.size(), messageIds)); } else { - throw new SQSBatchProcessingException(exceptions); + throw new SQSBatchProcessingException(exceptions, failures, new ArrayList()); } } } finally { diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java index dd982b157..59396e634 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java @@ -29,7 +29,7 @@ && placedOnSqsEventRequestHandler(pjp)) { SQSEvent sqsEvent = (SQSEvent) proceedArgs[0]; - partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value().newInstance()); + partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value()); } return pjp.proceed(proceedArgs); diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java new file mode 100644 index 000000000..229385dbe --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java @@ -0,0 +1,76 @@ +package software.amazon.lambda.powertools.sqs; + +import java.io.IOException; +import java.util.List; +import java.util.stream.Stream; + +import com.amazonaws.AmazonServiceException; +import com.amazonaws.SdkClientException; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ValueSource; +import software.amazon.awssdk.services.sqs.SqsClient; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; + +class PowertoolsSqsBatchProcessorTest { + + private final SqsClient sqsClient = mock(SqsClient.class); + private final SqsClient interactionClient = mock(SqsClient.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + private SQSEvent event; + + @BeforeEach + void setUp() throws IOException { + event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class); + PowertoolsSqs.defaultSqsClient(sqsClient); + } + + @Test + void shouldBatchProcessAndNotDeleteMessagesWhenAllSuccess() { + List returnValues = PowertoolsSqs.partialBatchProcessor(event, false, (message) -> { + interactionClient.listQueues(); + return "Success"; + }); + + assertThat(returnValues) + .hasSize(2) + .containsExactly("Success", "Success"); + + verify(interactionClient, times(2)).listQueues(); + verifyNoInteractions(sqsClient); + } + + @ParameterizedTest + @ValueSource(classes = {SampleInnerSqsHandler.class, SampleSqsHandler.class}) + void shouldBatchProcessViaClassAndNotDeleteMessagesWhenAllSuccess(Class> handler) { + List returnValues = PowertoolsSqs.partialBatchProcessor(event, false, handler); + + assertThat(returnValues) + .hasSize(2) + .containsExactly("0", "1"); + } + + private static Stream exception() { + return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")), + Arguments.of(new SdkClientException("Client Exception"))); + } + + public class SampleInnerSqsHandler implements SqsMessageHandler { + private int counter; + + @Override + public String process(SQSMessage message) { + return String.valueOf(counter++); + } + } +} \ No newline at end of file diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsLargeMessageTest.java similarity index 99% rename from powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java rename to powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsLargeMessageTest.java index f91e9ff59..1581e2c44 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsLargeMessageTest.java @@ -35,7 +35,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.openMocks; -class PowertoolsSqsTest { +class PowertoolsSqsLargeMessageTest { @Mock private AmazonS3 amazonS3; diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SampleSqsHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SampleSqsHandler.java new file mode 100644 index 000000000..517dbb686 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SampleSqsHandler.java @@ -0,0 +1,12 @@ +package software.amazon.lambda.powertools.sqs; + +import com.amazonaws.services.lambda.runtime.events.SQSEvent; + +public class SampleSqsHandler implements SqsMessageHandler { + private int counter; + + @Override + public String process(SQSEvent.SQSMessage message) { + return String.valueOf(counter++); + } +} diff --git a/powertools-sqs/src/test/resources/sampleSqsBatchEvent.json b/powertools-sqs/src/test/resources/sampleSqsBatchEvent.json new file mode 100644 index 000000000..8a5fbf309 --- /dev/null +++ b/powertools-sqs/src/test/resources/sampleSqsBatchEvent.json @@ -0,0 +1,36 @@ +{ + "records": [ + { + "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", + "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082649183", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082649185" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + }, + { + "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", + "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", + "body": "Test message.", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1545082650636", + "SenderId": "AIDAIENQZJOLO23YVJ4VO", + "ApproximateFirstReceiveTimestamp": "1545082650649" + }, + "messageAttributes": {}, + "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", + "eventSource": "aws:sqs", + "eventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", + "awsRegion": "us-east-2" + } + ] +} \ No newline at end of file From e6c9a9dcec47a0194887c02192526ed4374fce9d Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sat, 3 Oct 2020 12:40:22 +0200 Subject: [PATCH 04/12] Full Test cases coverage --- .../lambda/powertools/sqs/PowertoolsSqs.java | 10 ++ .../sqs/SQSBatchProcessingException.java | 16 +- .../sqs/PowertoolsSqsBatchProcessorTest.java | 170 ++++++++++++++++-- .../PartialBatchFailureSuppressedHandler.java | 32 ++++ .../sqs/handlers/PartialBatchHandler.java | 36 ---- .../PartialBatchPartialFailureHandler.java | 32 ++++ .../handlers/PartialBatchSuccessHandler.java | 28 +++ .../SqsMessageBatchProcessorAspectTest.java | 103 +++++++++++ 8 files changed, 367 insertions(+), 60 deletions(-) create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchFailureSuppressedHandler.java delete mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchPartialFailureHandler.java create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchSuccessHandler.java create mode 100644 powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java index 4738f065a..3ec9907f0 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java @@ -97,6 +97,11 @@ public static SqsClient defaultSqsClient() { return client; } + public static List partialBatchProcessor(final SQSEvent event, + final Class> handler) { + return partialBatchProcessor(event, false, handler); + } + public static List partialBatchProcessor(final SQSEvent event, final boolean suppressException, final Class> handler) { @@ -118,6 +123,11 @@ public static List partialBatchProcessor(final SQSEvent event, } } + public static List partialBatchProcessor(final SQSEvent event, + final SqsMessageHandler handler) { + return partialBatchProcessor(event, false, handler); + } + public static List partialBatchProcessor(final SQSEvent event, final boolean suppressException, final SqsMessageHandler handler) { diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java index 7e17ab8ed..e1813a1c3 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java @@ -16,6 +16,10 @@ public class SQSBatchProcessingException extends RuntimeException { public SQSBatchProcessingException(final List exceptions, final List failures, final List successReturns) { + super(exceptions.stream() + .map(Throwable::toString) + .collect(joining("\n"))); + this.exceptions = new ArrayList<>(exceptions); this.failures = new ArrayList<>(failures); this.returnValues = new ArrayList<>(successReturns); @@ -33,13 +37,6 @@ public List getFailures() { return failures; } - @Override - public String getMessage() { - return exceptions.stream() - .map(Throwable::toString) - .collect(joining("\n")); - } - @Override public void printStackTrace() { for (Exception exception : exceptions) { @@ -47,11 +44,6 @@ public void printStackTrace() { } } - @Override - public String toString() { - return getMessage(); - } - void addSuccessMessageReturnValues(final List returnValues) { this.returnValues.addAll(returnValues); } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java index 229385dbe..7f174dd86 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java @@ -2,42 +2,53 @@ import java.io.IOException; import java.util.List; -import java.util.stream.Stream; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.SdkClientException; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.ValueSource; import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; +import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.defaultSqsClient; +import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.partialBatchProcessor; class PowertoolsSqsBatchProcessorTest { - private final SqsClient sqsClient = mock(SqsClient.class); - private final SqsClient interactionClient = mock(SqsClient.class); + private static final SqsClient sqsClient = mock(SqsClient.class); + private static final SqsClient interactionClient = mock(SqsClient.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private SQSEvent event; @BeforeEach void setUp() throws IOException { + reset(sqsClient, interactionClient); event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class); - PowertoolsSqs.defaultSqsClient(sqsClient); + + when(sqsClient.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(GetQueueUrlResponse.builder() + .queueUrl("test") + .build()); + + defaultSqsClient(sqsClient); } @Test void shouldBatchProcessAndNotDeleteMessagesWhenAllSuccess() { - List returnValues = PowertoolsSqs.partialBatchProcessor(event, false, (message) -> { + List returnValues = partialBatchProcessor(event, false, (message) -> { interactionClient.listQueues(); return "Success"; }); @@ -53,16 +64,138 @@ void shouldBatchProcessAndNotDeleteMessagesWhenAllSuccess() { @ParameterizedTest @ValueSource(classes = {SampleInnerSqsHandler.class, SampleSqsHandler.class}) void shouldBatchProcessViaClassAndNotDeleteMessagesWhenAllSuccess(Class> handler) { - List returnValues = PowertoolsSqs.partialBatchProcessor(event, false, handler); + List returnValues = partialBatchProcessor(event, handler); assertThat(returnValues) .hasSize(2) .containsExactly("0", "1"); + + verifyNoInteractions(sqsClient); + } + + @Test + void shouldBatchProcessAndDeleteSuccessMessageOnPartialFailures() { + String failedId = "2e1424d4-f796-459a-8184-9c92662be6da"; + + SqsMessageHandler failedHandler = (message) -> { + if (failedId.equals(message.getMessageId())) { + throw new RuntimeException("Failed processing"); + } + + interactionClient.listQueues(); + return "Success"; + }; + + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> partialBatchProcessor(event, failedHandler)) + .satisfies(e -> { + + assertThat(e.successMessageReturnValues()) + .hasSize(1) + .contains("Success"); + + assertThat(e.getFailures()) + .hasSize(1) + .extracting("messageId") + .contains(failedId); + + assertThat(e.getExceptions()) + .hasSize(1) + .extracting("detailMessage") + .contains("Failed processing"); + }); + + verify(interactionClient).listQueues(); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + @Test + void shouldBatchProcessAndFullFailuresInBatch() { + SqsMessageHandler failedHandler = (message) -> { + throw new RuntimeException(message.getMessageId()); + }; + + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> partialBatchProcessor(event, failedHandler)) + .satisfies(e -> { + + assertThat(e.successMessageReturnValues()) + .isEmpty(); + + assertThat(e.getFailures()) + .hasSize(2) + .extracting("messageId") + .containsExactly("059f36b4-87a3-44ab-83d2-661975830a7d", + "2e1424d4-f796-459a-8184-9c92662be6da"); + + assertThat(e.getExceptions()) + .hasSize(2) + .extracting("detailMessage") + .containsExactly("059f36b4-87a3-44ab-83d2-661975830a7d", + "2e1424d4-f796-459a-8184-9c92662be6da"); + }); + + verifyNoInteractions(sqsClient); + } + + @Test + void shouldBatchProcessViaClassAndDeleteSuccessMessageOnPartialFailures() { + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> partialBatchProcessor(event, FailureSampleInnerSqsHandler.class)) + .satisfies(e -> { + + assertThat(e.successMessageReturnValues()) + .hasSize(1) + .contains("Success"); + + assertThat(e.getFailures()) + .hasSize(1) + .extracting("messageId") + .contains("2e1424d4-f796-459a-8184-9c92662be6da"); + + assertThat(e.getExceptions()) + .hasSize(1) + .extracting("detailMessage") + .contains("Failed processing"); + }); + + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + + @Test + void shouldBatchProcessAndSuppressExceptions() { + String failedId = "2e1424d4-f796-459a-8184-9c92662be6da"; + + SqsMessageHandler failedHandler = (message) -> { + if (failedId.equals(message.getMessageId())) { + throw new RuntimeException("Failed processing"); + } + + interactionClient.listQueues(); + return "Success"; + }; + + List returnValues = partialBatchProcessor(event, true, failedHandler); + + assertThat(returnValues) + .hasSize(1) + .contains("Success"); + + verify(interactionClient).listQueues(); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); } - private static Stream exception() { - return Stream.of(Arguments.of(new AmazonServiceException("Service Exception")), - Arguments.of(new SdkClientException("Client Exception"))); + @Test + void shouldBatchProcessViaClassAndSuppressExceptions() { + List returnValues = partialBatchProcessor(event, true, FailureSampleInnerSqsHandler.class); + + assertThat(returnValues) + .hasSize(1) + .contains("Success"); + + verify(interactionClient).listQueues(); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); } public class SampleInnerSqsHandler implements SqsMessageHandler { @@ -70,7 +203,20 @@ public class SampleInnerSqsHandler implements SqsMessageHandler { @Override public String process(SQSMessage message) { + interactionClient.listQueues(); return String.valueOf(counter++); } } + + public class FailureSampleInnerSqsHandler implements SqsMessageHandler { + @Override + public String process(SQSEvent.SQSMessage message) { + if ("2e1424d4-f796-459a-8184-9c92662be6da".equals(message.getMessageId())) { + throw new RuntimeException("Failed processing"); + } + + interactionClient.listQueues(); + return "Success"; + } + } } \ No newline at end of file diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchFailureSuppressedHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchFailureSuppressedHandler.java new file mode 100644 index 000000000..ea1cd8944 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchFailureSuppressedHandler.java @@ -0,0 +1,32 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; +import software.amazon.lambda.powertools.sqs.SqsMessageHandler; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.sqsClient; + +public class PartialBatchFailureSuppressedHandler implements RequestHandler { + @Override + @SqsBatchProcessor(value = InnerMessageHandler.class, suppressException = true) + public String handleRequest(final SQSEvent sqsEvent, + final Context context) { + return "Success"; + } + + private class InnerMessageHandler implements SqsMessageHandler { + + @Override + public String process(SQSMessage message) { + if ("2e1424d4-f796-459a-8184-9c92662be6da".equals(message.getMessageId())) { + throw new RuntimeException("2e1424d4-f796-459a-8184-9c92662be6da"); + } + + sqsClient.listQueues(); + return "Success"; + } + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java deleted file mode 100644 index 83e9b6c3a..000000000 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchHandler.java +++ /dev/null @@ -1,36 +0,0 @@ -package software.amazon.lambda.powertools.sqs.handlers; - -import java.util.List; - -import com.amazonaws.services.lambda.runtime.Context; -import com.amazonaws.services.lambda.runtime.RequestHandler; -import com.amazonaws.services.lambda.runtime.events.SQSEvent; -import software.amazon.lambda.powertools.sqs.LargeMessageHandler; -import software.amazon.lambda.powertools.sqs.PowertoolsSqs; -import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; -import software.amazon.lambda.powertools.sqs.SqsMessageHandler; - -public class PartialBatchHandler implements RequestHandler> { - - @Override - @LargeMessageHandler - @SqsBatchProcessor(HandlerSqs.class) - public List handleRequest(SQSEvent sqsEvent, Context context) { - - List returnValues = - PowertoolsSqs.partialBatchProcessor(sqsEvent, false, HandlerSqs.class); - - // Do some processing on processed message - - return returnValues; - } - - private class HandlerSqs implements SqsMessageHandler { - - @Override - public String process(SQSEvent.SQSMessage message) { - // This is where you process message - return null; - } - } -} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchPartialFailureHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchPartialFailureHandler.java new file mode 100644 index 000000000..43a569ca6 --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchPartialFailureHandler.java @@ -0,0 +1,32 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; +import software.amazon.lambda.powertools.sqs.SqsMessageHandler; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.sqsClient; + +public class PartialBatchPartialFailureHandler implements RequestHandler { + @Override + @SqsBatchProcessor(InnerMessageHandler.class) + public String handleRequest(final SQSEvent sqsEvent, + final Context context) { + return "Success"; + } + + private class InnerMessageHandler implements SqsMessageHandler { + + @Override + public String process(SQSMessage message) { + if ("2e1424d4-f796-459a-8184-9c92662be6da".equals(message.getMessageId())) { + throw new RuntimeException("2e1424d4-f796-459a-8184-9c92662be6da"); + } + + sqsClient.listQueues(); + return "Success"; + } + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchSuccessHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchSuccessHandler.java new file mode 100644 index 000000000..d44b084da --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchSuccessHandler.java @@ -0,0 +1,28 @@ +package software.amazon.lambda.powertools.sqs.handlers; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; +import software.amazon.lambda.powertools.sqs.SqsMessageHandler; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.sqsClient; + +public class PartialBatchSuccessHandler implements RequestHandler { + @Override + @SqsBatchProcessor(InnerMessageHandler.class) + public String handleRequest(final SQSEvent sqsEvent, + final Context context) { + return "Success"; + } + + private class InnerMessageHandler implements SqsMessageHandler { + + @Override + public String process(SQSMessage message) { + sqsClient.listQueues(); + return "Success"; + } + } +} diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java new file mode 100644 index 000000000..65dbbc14a --- /dev/null +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java @@ -0,0 +1,103 @@ +package software.amazon.lambda.powertools.sqs.internal; + +import java.io.IOException; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; +import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; +import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException; +import software.amazon.lambda.powertools.sqs.handlers.PartialBatchPartialFailureHandler; +import software.amazon.lambda.powertools.sqs.handlers.PartialBatchFailureSuppressedHandler; +import software.amazon.lambda.powertools.sqs.handlers.PartialBatchSuccessHandler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatExceptionOfType; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.reset; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.defaultSqsClient; + +public class SqsMessageBatchProcessorAspectTest { + public static final SqsClient sqsClient = mock(SqsClient.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private SQSEvent event; + private RequestHandler requestHandler; + + private Context context = mock(Context.class); + + @BeforeEach + void setUp() throws IOException { + defaultSqsClient(sqsClient); + reset(sqsClient); + setupContext(); + event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class); + + when(sqsClient.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(GetQueueUrlResponse.builder() + .queueUrl("test") + .build()); + + requestHandler = new PartialBatchSuccessHandler(); + } + + @Test + void shouldBatchProcessAllMessageSuccessfullyAndNotDeleteFromSQS() { + requestHandler.handleRequest(event, context); + + verify(sqsClient, times(2)).listQueues(); + verify(sqsClient, times(0)).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + @Test + void shouldBatchProcessMessageWithSuccessDeletedOnFailureInBatchFromSQS() { + requestHandler = new PartialBatchPartialFailureHandler(); + + assertThatExceptionOfType(SQSBatchProcessingException.class) + .isThrownBy(() -> requestHandler.handleRequest(event, context)) + .satisfies(e -> { + assertThat(e.getExceptions()) + .hasSize(1) + .extracting("detailMessage") + .containsExactly("2e1424d4-f796-459a-8184-9c92662be6da"); + + assertThat(e.getFailures()) + .hasSize(1) + .extracting("messageId") + .containsExactly("2e1424d4-f796-459a-8184-9c92662be6da"); + + assertThat(e.successMessageReturnValues()) + .hasSize(1) + .contains("Success"); + }); + + verify(sqsClient).listQueues(); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + @Test + void shouldBatchProcessMessageWithSuccessDeletedOnFailureWithSuppressionInBatchFromSQS() { + requestHandler = new PartialBatchFailureSuppressedHandler(); + + requestHandler.handleRequest(event, context); + + verify(sqsClient).listQueues(); + verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + private void setupContext() { + when(context.getFunctionName()).thenReturn("testFunction"); + when(context.getInvokedFunctionArn()).thenReturn("testArn"); + when(context.getFunctionVersion()).thenReturn("1"); + when(context.getMemoryLimitInMB()).thenReturn(10); + } +} \ No newline at end of file From 955f8299ac1cded910c8b70baf8a9ae8c02c81d3 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sat, 3 Oct 2020 21:24:31 +0200 Subject: [PATCH 05/12] Rename API method for batch processing --- .../lambda/powertools/sqs/PowertoolsSqs.java | 99 +++++++++++++------ .../sqs/SQSBatchProcessingException.java | 3 + .../powertools/sqs/SqsBatchProcessor.java | 4 +- .../powertools/sqs/SqsMessageHandler.java | 4 + .../SqsMessageBatchProcessorAspect.java | 4 +- .../sqs/PowertoolsSqsBatchProcessorTest.java | 16 +-- .../powertools/sqs/SampleSqsHandler.java | 2 +- .../sqs/handlers/LambdaHandlerApiGateway.java | 3 + .../SqsMessageBatchProcessorAspectTest.java | 14 ++- 9 files changed, 104 insertions(+), 45 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java index 3ec9907f0..b179df42d 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java @@ -14,7 +14,6 @@ package software.amazon.lambda.powertools.sqs; import java.lang.reflect.Constructor; -import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; import java.util.List; import java.util.function.Function; @@ -89,48 +88,63 @@ public static R enrichedMessageFromS3(final SQSEvent sqsEvent, return returnValue; } + /** + * Provides ability to set default {@link SqsClient} to be used by utility. + * If no default configuration is provided, client is instantiated via {@link SqsClient#create()} + * + * @param client {@link SqsClient} to be used by utility + */ public static void defaultSqsClient(SqsClient client) { PowertoolsSqs.client = client; } - public static SqsClient defaultSqsClient() { - return client; - } - - public static List partialBatchProcessor(final SQSEvent event, - final Class> handler) { - return partialBatchProcessor(event, false, handler); + /** + * @param event + * @param handler + * @param + * @return + */ + public static List batchProcessor(final SQSEvent event, + final Class> handler) { + return batchProcessor(event, false, handler); } - public static List partialBatchProcessor(final SQSEvent event, - final boolean suppressException, - final Class> handler) { + /** + * @param event + * @param suppressException + * @param handler + * @param + * @return + */ + public static List batchProcessor(final SQSEvent event, + final boolean suppressException, + final Class> handler) { - try { - SqsMessageHandler handlerInstance; - if (null == handler.getDeclaringClass()) { - handlerInstance = handler.newInstance(); - } else { - Constructor> constructor = handler.getDeclaredConstructor(handler.getDeclaringClass()); - constructor.setAccessible(true); - handlerInstance = constructor.newInstance(handler.getDeclaringClass().newInstance()); - } - return partialBatchProcessor(event, suppressException, handlerInstance); - } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) { - LOG.error("Failed invoking process method on handler", e); - throw new RuntimeException("Unexpected error occurred. Please raise issue at " + - "https://github.com/awslabs/aws-lambda-powertools-java/issues", e); - } + SqsMessageHandler handlerInstance = instantiatedHandler(handler); + return batchProcessor(event, suppressException, handlerInstance); } - public static List partialBatchProcessor(final SQSEvent event, - final SqsMessageHandler handler) { - return partialBatchProcessor(event, false, handler); + /** + * @param event + * @param handler + * @param + * @return + */ + public static List batchProcessor(final SQSEvent event, + final SqsMessageHandler handler) { + return batchProcessor(event, false, handler); } - public static List partialBatchProcessor(final SQSEvent event, - final boolean suppressException, - final SqsMessageHandler handler) { + /** + * @param event + * @param suppressException + * @param handler + * @param + * @return + */ + public static List batchProcessor(final SQSEvent event, + final boolean suppressException, + final SqsMessageHandler handler) { final List handlerReturn = new ArrayList<>(); BatchContext batchContext = new BatchContext(defaultSqsClient()); @@ -154,6 +168,27 @@ public static List partialBatchProcessor(final SQSEvent event, return handlerReturn; } + private static SqsClient defaultSqsClient() { + return client; + } + + private static SqsMessageHandler instantiatedHandler(final Class> handler) { + + try { + if (null == handler.getDeclaringClass()) { + return handler.newInstance(); + } + + final Constructor> constructor = handler.getDeclaredConstructor(handler.getDeclaringClass()); + constructor.setAccessible(true); + return constructor.newInstance(handler.getDeclaringClass().newInstance()); + } catch (Exception e) { + LOG.error("Failed creating handler instance", e); + throw new RuntimeException("Unexpected error occurred. Please raise issue at " + + "https://github.com/awslabs/aws-lambda-powertools-java/issues", e); + } + } + private static SQSMessage clonedMessage(final SQSMessage sqsMessage) { try { return objectMapper diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java index e1813a1c3..984e25038 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java @@ -7,6 +7,9 @@ import static java.util.stream.Collectors.joining; +/** + * + */ public class SQSBatchProcessingException extends RuntimeException { private final List exceptions; diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java index 24ea22a15..3a8768ff4 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java @@ -5,7 +5,9 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; - +/** + * + */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) public @interface SqsBatchProcessor { diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java index 8a626d233..1ceac32e6 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java @@ -2,6 +2,10 @@ import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; +/** + * + * @param + */ @FunctionalInterface public interface SqsMessageHandler { diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java index 59396e634..a6085219a 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspect.java @@ -8,7 +8,7 @@ import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; import static software.amazon.lambda.powertools.core.internal.LambdaHandlerProcessor.isHandlerMethod; -import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.partialBatchProcessor; +import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.batchProcessor; import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.placedOnSqsEventRequestHandler; @Aspect @@ -29,7 +29,7 @@ && placedOnSqsEventRequestHandler(pjp)) { SQSEvent sqsEvent = (SQSEvent) proceedArgs[0]; - partialBatchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value()); + batchProcessor(sqsEvent, sqsBatchProcessor.suppressException(), sqsBatchProcessor.value()); } return pjp.proceed(proceedArgs); diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java index 7f174dd86..002af9636 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java @@ -24,8 +24,8 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.batchProcessor; import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.defaultSqsClient; -import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.partialBatchProcessor; class PowertoolsSqsBatchProcessorTest { @@ -48,7 +48,7 @@ void setUp() throws IOException { @Test void shouldBatchProcessAndNotDeleteMessagesWhenAllSuccess() { - List returnValues = partialBatchProcessor(event, false, (message) -> { + List returnValues = batchProcessor(event, false, (message) -> { interactionClient.listQueues(); return "Success"; }); @@ -64,7 +64,7 @@ void shouldBatchProcessAndNotDeleteMessagesWhenAllSuccess() { @ParameterizedTest @ValueSource(classes = {SampleInnerSqsHandler.class, SampleSqsHandler.class}) void shouldBatchProcessViaClassAndNotDeleteMessagesWhenAllSuccess(Class> handler) { - List returnValues = partialBatchProcessor(event, handler); + List returnValues = batchProcessor(event, handler); assertThat(returnValues) .hasSize(2) @@ -87,7 +87,7 @@ void shouldBatchProcessAndDeleteSuccessMessageOnPartialFailures() { }; assertThatExceptionOfType(SQSBatchProcessingException.class) - .isThrownBy(() -> partialBatchProcessor(event, failedHandler)) + .isThrownBy(() -> batchProcessor(event, failedHandler)) .satisfies(e -> { assertThat(e.successMessageReturnValues()) @@ -116,7 +116,7 @@ void shouldBatchProcessAndFullFailuresInBatch() { }; assertThatExceptionOfType(SQSBatchProcessingException.class) - .isThrownBy(() -> partialBatchProcessor(event, failedHandler)) + .isThrownBy(() -> batchProcessor(event, failedHandler)) .satisfies(e -> { assertThat(e.successMessageReturnValues()) @@ -141,7 +141,7 @@ void shouldBatchProcessAndFullFailuresInBatch() { @Test void shouldBatchProcessViaClassAndDeleteSuccessMessageOnPartialFailures() { assertThatExceptionOfType(SQSBatchProcessingException.class) - .isThrownBy(() -> partialBatchProcessor(event, FailureSampleInnerSqsHandler.class)) + .isThrownBy(() -> batchProcessor(event, FailureSampleInnerSqsHandler.class)) .satisfies(e -> { assertThat(e.successMessageReturnValues()) @@ -176,7 +176,7 @@ void shouldBatchProcessAndSuppressExceptions() { return "Success"; }; - List returnValues = partialBatchProcessor(event, true, failedHandler); + List returnValues = batchProcessor(event, true, failedHandler); assertThat(returnValues) .hasSize(1) @@ -188,7 +188,7 @@ void shouldBatchProcessAndSuppressExceptions() { @Test void shouldBatchProcessViaClassAndSuppressExceptions() { - List returnValues = partialBatchProcessor(event, true, FailureSampleInnerSqsHandler.class); + List returnValues = batchProcessor(event, true, FailureSampleInnerSqsHandler.class); assertThat(returnValues) .hasSize(1) diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SampleSqsHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SampleSqsHandler.java index 517dbb686..d48cded5f 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SampleSqsHandler.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/SampleSqsHandler.java @@ -2,7 +2,7 @@ import com.amazonaws.services.lambda.runtime.events.SQSEvent; -public class SampleSqsHandler implements SqsMessageHandler { +public class SampleSqsHandler implements SqsMessageHandler { private int counter; @Override diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java index 39f216c0a..5a61c3514 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/LambdaHandlerApiGateway.java @@ -4,11 +4,14 @@ import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import software.amazon.lambda.powertools.sqs.LargeMessageHandler; +import software.amazon.lambda.powertools.sqs.SampleSqsHandler; +import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; public class LambdaHandlerApiGateway implements RequestHandler { @Override @LargeMessageHandler + @SqsBatchProcessor(value = SampleSqsHandler.class) public String handleRequest(APIGatewayProxyRequestEvent sqsEvent, Context context) { return sqsEvent.getBody(); } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java index 65dbbc14a..68999cc9a 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java @@ -4,6 +4,7 @@ import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.fasterxml.jackson.databind.ObjectMapper; import org.junit.jupiter.api.BeforeEach; @@ -13,8 +14,9 @@ import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse; import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException; -import software.amazon.lambda.powertools.sqs.handlers.PartialBatchPartialFailureHandler; +import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway; import software.amazon.lambda.powertools.sqs.handlers.PartialBatchFailureSuppressedHandler; +import software.amazon.lambda.powertools.sqs.handlers.PartialBatchPartialFailureHandler; import software.amazon.lambda.powertools.sqs.handlers.PartialBatchSuccessHandler; import static org.assertj.core.api.Assertions.assertThat; @@ -24,6 +26,7 @@ import static org.mockito.Mockito.reset; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.defaultSqsClient; @@ -94,6 +97,15 @@ void shouldBatchProcessMessageWithSuccessDeletedOnFailureWithSuppressionInBatchF verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); } + @Test + void shouldNotTakeEffectOnNonSqsEventHandler() { + LambdaHandlerApiGateway handlerApiGateway = new LambdaHandlerApiGateway(); + + handlerApiGateway.handleRequest(mock(APIGatewayProxyRequestEvent.class), context); + + verifyNoInteractions(sqsClient); + } + private void setupContext() { when(context.getFunctionName()).thenReturn("testFunction"); when(context.getInvokedFunctionArn()).thenReturn("testArn"); From 4edaf6116dac436c93662440795718d2bb69467d Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sat, 3 Oct 2020 21:51:39 +0200 Subject: [PATCH 06/12] java docs --- .../lambda/powertools/sqs/PowertoolsSqs.java | 131 ++++++++++++++---- .../sqs/SQSBatchProcessingException.java | 37 ++++- .../powertools/sqs/SqsBatchProcessor.java | 44 ++++++ .../powertools/sqs/SqsMessageHandler.java | 18 ++- .../powertools/sqs/internal/BatchContext.java | 31 ++--- 5 files changed, 207 insertions(+), 54 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java index b179df42d..6bfea05a9 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java @@ -33,9 +33,7 @@ import static software.amazon.lambda.powertools.sqs.internal.SqsLargeMessageAspect.processMessages; /** - * A class of helper functions to add additional functionality to LargeMessageHandler. - *

- * {@see PowertoolsLogging} + * A class of helper functions to add additional functionality to {@link SQSEvent} processing. */ public final class PowertoolsSqs { private static final Log LOG = LogFactory.getLog(PowertoolsSqs.class); @@ -99,10 +97,32 @@ public static void defaultSqsClient(SqsClient client) { } /** - * @param event - * @param handler - * @param - * @return + * This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent} + * + *

+ * Utility will take care of calling {@link SqsMessageHandler#process(SQSMessage)} method for each {@link SQSMessage} + * in the received {@link SQSEvent} + *

+ * + *

+ * If any exception is thrown from {@link SqsMessageHandler#process(SQSMessage)} during processing of a messages, + * Utility will take care of deleting all the successful messages from SQS. When one or more single message fails + * processing due to exception thrown from {@link SqsMessageHandler#process(SQSMessage)} + * {@link SQSBatchProcessingException} is thrown with all the details of successful and failed messages. + *

+ * If all the messages are successfully processes, No SQS messages are deleted explicitly but is rather delegated to + * Lambda execution context for deletion. + *

+ * + *

+ * If you dont want to utility to throw {@link SQSBatchProcessingException} in case of failures but rather suppress + * it, Refer {@link PowertoolsSqs#batchProcessor(SQSEvent, boolean, Class)} + *

+ * + * @param event {@link SQSEvent} received by lambda function. + * @param handler Class implementing {@link SqsMessageHandler} which will be called for each message in event. + * @return List of values returned by {@link SqsMessageHandler#process(SQSMessage)} while processing each message. + * @throws SQSBatchProcessingException if some messages fail during processing. */ public static List batchProcessor(final SQSEvent event, final Class> handler) { @@ -110,11 +130,31 @@ public static List batchProcessor(final SQSEvent event, } /** - * @param event - * @param suppressException - * @param handler - * @param - * @return + * This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent} + * + *

+ * Utility will take care of calling {@link SqsMessageHandler#process(SQSMessage)} method for each {@link SQSMessage} + * in the received {@link SQSEvent} + *

+ * + *

+ * If any exception is thrown from {@link SqsMessageHandler#process(SQSMessage)} during processing of a messages, + * Utility will take care of deleting all the successful messages from SQS. When one or more single message fails + * processing due to exception thrown from {@link SqsMessageHandler#process(SQSMessage)} + * {@link SQSBatchProcessingException} is thrown with all the details of successful and failed messages. + *

+ * Exception can also be suppressed if desired. + *

+ * If all the messages are successfully processes, No SQS messages are deleted explicitly but is rather delegated to + * Lambda execution context for deletion. + *

+ * + * @param event {@link SQSEvent} received by lambda function. + * @param suppressException if this is set to true, No {@link SQSBatchProcessingException} is thrown even on failed + * messages. + * @param handler Class implementing {@link SqsMessageHandler} which will be called for each message in event. + * @return List of values returned by {@link SqsMessageHandler#process(SQSMessage)} while processing each message. + * @throws SQSBatchProcessingException if some messages fail during processing and no suppression enabled. */ public static List batchProcessor(final SQSEvent event, final boolean suppressException, @@ -125,10 +165,32 @@ public static List batchProcessor(final SQSEvent event, } /** - * @param event - * @param handler - * @param - * @return + * This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent} + * + *

+ * Utility will take care of calling {@link SqsMessageHandler#process(SQSMessage)} method for each {@link SQSMessage} + * in the received {@link SQSEvent} + *

+ * + *

+ * If any exception is thrown from {@link SqsMessageHandler#process(SQSMessage)} during processing of a messages, + * Utility will take care of deleting all the successful messages from SQS. When one or more single message fails + * processing due to exception thrown from {@link SqsMessageHandler#process(SQSMessage)} + * {@link SQSBatchProcessingException} is thrown with all the details of successful and failed messages. + *

+ * If all the messages are successfully processes, No SQS messages are deleted explicitly but is rather delegated to + * Lambda execution context for deletion. + *

+ * + *

+ * If you dont want to utility to throw {@link SQSBatchProcessingException} in case of failures but rather suppress + * it, Refer {@link PowertoolsSqs#batchProcessor(SQSEvent, boolean, SqsMessageHandler)} + *

+ * + * @param event {@link SQSEvent} received by lambda function. + * @param handler Instance of class implementing {@link SqsMessageHandler} which will be called for each message in event. + * @return List of values returned by {@link SqsMessageHandler#process(SQSMessage)} while processing each message- + * @throws SQSBatchProcessingException if some messages fail during processing. */ public static List batchProcessor(final SQSEvent event, final SqsMessageHandler handler) { @@ -136,11 +198,31 @@ public static List batchProcessor(final SQSEvent event, } /** - * @param event - * @param suppressException - * @param handler - * @param - * @return + * This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent} + * + *

+ * Utility will take care of calling {@link SqsMessageHandler#process(SQSMessage)} method for each {@link SQSMessage} + * in the received {@link SQSEvent} + *

+ * + *

+ * If any exception is thrown from {@link SqsMessageHandler#process(SQSMessage)} during processing of a messages, + * Utility will take care of deleting all the successful messages from SQS. When one or more single message fails + * processing due to exception thrown from {@link SqsMessageHandler#process(SQSMessage)} + * {@link SQSBatchProcessingException} is thrown with all the details of successful and failed messages. + *

+ * Exception can also be suppressed if desired. + *

+ * If all the messages are successfully processes, No SQS messages are deleted explicitly but is rather delegated to + * Lambda execution context for deletion. + *

+ * + * @param event {@link SQSEvent} received by lambda function. + * @param suppressException if this is set to true, No {@link SQSBatchProcessingException} is thrown even on failed + * messages. + * @param handler Instance of class implementing {@link SqsMessageHandler} which will be called for each message in event. + * @return List of values returned by {@link SqsMessageHandler#process(SQSMessage)} while processing each message. + * @throws SQSBatchProcessingException if some messages fail during processing and no suppression enabled. */ public static List batchProcessor(final SQSEvent event, final boolean suppressException, @@ -158,12 +240,7 @@ public static List batchProcessor(final SQSEvent event, } } - try { - batchContext.processSuccessAndReset(suppressException); - } catch (SQSBatchProcessingException e) { - e.addSuccessMessageReturnValues(handlerReturn); - throw e; - } + batchContext.processSuccessAndHandleFailed(handlerReturn, suppressException); return handlerReturn; } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java index 984e25038..38a9c943d 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SQSBatchProcessingException.java @@ -5,19 +5,34 @@ import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; import static java.util.stream.Collectors.joining; /** + *

+ * When one or more {@link SQSMessage} fails and if any exception is thrown from {@link SqsMessageHandler#process(SQSMessage)} + * during processing of a messages, this exception is with all the details of successful and failed messages. + *

* + *

+ * This exception can be thrown form: + *

    + *
  • {@link SqsBatchProcessor}
  • + *
  • {@link PowertoolsSqs#batchProcessor(SQSEvent, Class)}
  • + *
  • {@link PowertoolsSqs#batchProcessor(SQSEvent, boolean, Class)}
  • + *
  • {@link PowertoolsSqs#batchProcessor(SQSEvent, SqsMessageHandler)}
  • + *
  • {@link PowertoolsSqs#batchProcessor(SQSEvent, boolean, SqsMessageHandler)}
  • + *
+ *

*/ public class SQSBatchProcessingException extends RuntimeException { private final List exceptions; - private final List failures; + private final List failures; private final List returnValues; public SQSBatchProcessingException(final List exceptions, - final List failures, + final List failures, final List successReturns) { super(exceptions.stream() .map(Throwable::toString) @@ -28,15 +43,27 @@ public SQSBatchProcessingException(final List exceptions, this.returnValues = new ArrayList<>(successReturns); } + /** + * Details for exceptions that occurred while processing messages in {@link SqsMessageHandler#process(SQSMessage)} + * @return List of exceptions that occurred while processing messages + */ public List getExceptions() { return exceptions; } + /** + * List of returns from {@link SqsMessageHandler#process(SQSMessage)} that were successfully processed. + * @return List of returns from successfully processed messages + */ public List successMessageReturnValues() { return returnValues; } - public List getFailures() { + /** + * Details of {@link SQSMessage} that failed in {@link SqsMessageHandler#process(SQSMessage)} + * @return List of failed messages + */ + public List getFailures() { return failures; } @@ -46,8 +73,4 @@ public void printStackTrace() { exception.printStackTrace(); } } - - void addSuccessMessageReturnValues(final List returnValues) { - this.returnValues.addAll(returnValues); - } } diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java index 3a8768ff4..342765052 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsBatchProcessor.java @@ -5,8 +5,52 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.*; + /** + * {@link SqsBatchProcessor} is used to process batch messages in {@link SQSEvent} + * + *

+ * When using the annotation, implementation of {@link SqsMessageHandler} is required. Annotation will take care of + * calling {@link SqsMessageHandler#process(SQSMessage)} method for each {@link SQSMessage} in the received {@link SQSEvent} + *

+ * + *

+ * If any exception is thrown from {@link SqsMessageHandler#process(SQSMessage)} during processing of a messages, Utility + * will take care of deleting all the successful messages from SQS. When one or more single message fails processing due + * to exception thrown from {@link SqsMessageHandler#process(SQSMessage)}, Lambda execution will fail + * with {@link SQSBatchProcessingException}. + * + * If all the messages are successfully processes, No SQS messages are deleted explicitly but is rather delegated to + * Lambda execution context for deletion. + *

+ * + *

+ * If you want to suppress the exception even if any message in batch fails, set + * {@link SqsBatchProcessor#suppressException()} to true. By default its value is false + *

+ * + *
+ * public class SqsMessageHandler implements RequestHandler {
+ *
+ *    {@literal @}Override
+ *    {@literal @}{@link SqsBatchProcessor(SqsMessageHandler)}
+ *     public String handleRequest(SQSEvent sqsEvent, Context context) {
+ *
+ *         return "ok";
+ *     }
+ *
+ *     public class DummySqsMessageHandler implements SqsMessageHandler{
+ *     @Override
+ *     public Object process(SQSEvent.SQSMessage message) {
+ *         throw new UnsupportedOperationException();
+ *     }
+ * }
  *
+ *     ...
+ * 
  */
 @Retention(RetentionPolicy.RUNTIME)
 @Target(ElementType.METHOD)
diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java
index 1ceac32e6..5839939b8 100644
--- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java
+++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsMessageHandler.java
@@ -1,10 +1,26 @@
 package software.amazon.lambda.powertools.sqs;
 
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+
 import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
 
 /**
+ * 

+ * This interface should be implemented for processing {@link SQSMessage} inside {@link SQSEvent} received by lambda + * function. + *

* - * @param + *

+ * It is required by utilities: + *

    + *
  • {@link SqsBatchProcessor}
  • + *
  • {@link PowertoolsSqs#batchProcessor(SQSEvent, Class)}
  • + *
  • {@link PowertoolsSqs#batchProcessor(SQSEvent, boolean, Class)}
  • + *
  • {@link PowertoolsSqs#batchProcessor(SQSEvent, SqsMessageHandler)}
  • + *
  • {@link PowertoolsSqs#batchProcessor(SQSEvent, boolean, SqsMessageHandler)}
  • + *
+ *

+ * @param Return value type from {@link SqsMessageHandler#process(SQSMessage)} */ @FunctionalInterface public interface SqsMessageHandler { diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java index 54942f1ca..eca4be3b8 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java @@ -1,6 +1,5 @@ package software.amazon.lambda.powertools.sqs.internal; -import java.awt.geom.Area; import java.util.ArrayList; import java.util.List; @@ -37,21 +36,21 @@ public void addFailure(SQSMessage event, Exception e) { exceptions.add(e); } - public void processSuccessAndReset(final boolean suppressException) { - try { - if (hasFailures()) { + public void processSuccessAndHandleFailed(final List successReturns, + final boolean suppressException) { + if (hasFailures()) { + deleteSuccessMessage(); - deleteSuccessMessage(); + if (suppressException) { + List messageIds = failures.stream(). + map(SQSMessage::getMessageId) + .collect(toList()); - if (suppressException) { - List messageIds = failures.stream().map(SQSMessage::getMessageId).collect(toList()); - LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. Failed messages %s", failures.size(), messageIds)); - } else { - throw new SQSBatchProcessingException(exceptions, failures, new ArrayList()); - } + LOG.debug(format("[%s] records failed processing, but exceptions are suppressed. " + + "Failed messages %s", failures.size(), messageIds)); + } else { + throw new SQSBatchProcessingException(exceptions, failures, successReturns); } - } finally { - reset(); } } @@ -81,10 +80,4 @@ private String url() { .build()) .queueUrl(); } - - private void reset() { - success.clear(); - failures.clear(); - exceptions.clear(); - } } From 145174b676eecf877d45d826b9369655cfab0bf5 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sun, 4 Oct 2020 12:12:25 +0200 Subject: [PATCH 07/12] public docs update --- docs/content/utilities/batch.mdx | 251 ++++++++++++++++++ .../utilities/sqs_large_message_handling.mdx | 6 +- docs/gatsby-config.js | 3 +- 3 files changed, 256 insertions(+), 4 deletions(-) create mode 100644 docs/content/utilities/batch.mdx diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx new file mode 100644 index 000000000..babe709e9 --- /dev/null +++ b/docs/content/utilities/batch.mdx @@ -0,0 +1,251 @@ +--- +title: SQS Batch Processing +description: Utility +--- + +import Note from "../../src/components/Note" + +The SQS batch processing utility provides a way to handle partial failures when processing batches of messages from SQS. + +**Key Features** + +* Prevent successfully processed messages from being returned to SQS +* A simple interface for individually processing messages from a batch + +**Background** + +When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS. + +If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function is triggered with the same batch one more time. + +With this utility, messages within a batch are handled individually - only messages that were not successfully processed +are returned to the queue. + + + While this utility lowers the chance of processing messages more than once, it is not guaranteed. We recommend implementing processing logic in an idempotent manner wherever possible. +

+ More details on how Lambda works with SQS can be found in the AWS documentation +
+ +## Install + +To install this utility, add the following dependency to your project. + +```xml + + software.amazon.lambda + powertools-sqs + 0.4.0-beta + +``` + +And configure the aspectj-maven-plugin to compile-time weave (CTW) the +aws-lambda-powertools-java aspects into your project. You may already have this +plugin in your pom. In that case add the dependency to the `aspectLibraries` +section. + +```xml + + + ... + + org.codehaus.mojo + aspectj-maven-plugin + 1.11 + + 1.8 + 1.8 + 1.8 + + + + software.amazon.lambda + powertools-sqs + + + + + + + + compile + + + + + ... + + +``` + +**IAM Permissions** + +This utility requires additional permissions to work as expected. Lambda functions using this utility require the `sqs:GetQueueUrl` and `sqs:DeleteMessageBatch` permission. + +## Processing messages from SQS + +You can use either **[SqsBatchProcessor annotation](#SqsBatchProcessor annotation)**, or **[PowertoolsSqs Utility API](#PowertoolsSqs Utility API)** as a fluent API. + +Both have nearly the same behaviour when it comes to processing messages from the batch: + +* **Entire batch has been successfully processed**, where your Lambda handler returned successfully, we will let SQS delete the batch to optimize your cost +* **Entire Batch has been partially processed successfully**, where exceptions were raised within your `SqsMessageHandler` interface implementation, we will: + - **1)** Delete successfully processed messages from the queue by directly calling `sqs:DeleteMessageBatch` + - **2)** Raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue + +The only difference is that **PowertoolsSqs Utility API** will give you access to return from the processed messages if you need. Exception `SQSBatchProcessingException` thrown from the +utility will have access to both successful and failed messaged along with failure exceptions. + +## Functional Interface SqsMessageHandler + +Both [annotation](#SqsBatchProcessor annotation) and [PowertoolsSqs Utility API](#PowertoolsSqs Utility API) requires an implementation of functional interface `SqsMessageHandler`. + +This implementation is responsible for processing each individual message from the batch, and to raise an exception if unable to process any of the messages sent. + +**Any non-exception/successful return from your record handler function** will instruct utility to queue up each individual message for deletion. + +### SqsBatchProcessor annotation + +When using this annotation, you need provide a class implementation of `SqsMessageHandler` that will process individual messages from the batch - It should raise an exception if it is unable to process the record. + +All records in the batch will be passed to this handler for processing, even if exceptions are thrown - Here's the behaviour after completing the batch: + +* **Any successfully processed messages**, we will delete them from the queue via `sqs:DeleteMessageBatch` +* **Any unprocessed messages detected**, we will raise `SQSBatchProcessingException` to ensure failed messages return to your SQS queue + + + You will not have accessed to the processed messages within the Lambda Handler - all processing logic will and should be performed by the implemented SqsMessageHandler#process() function. + +
+ +```java:title=App.java +public class AppSqsEvent implements RequestHandler { + @Override + @SqsBatchProcessor(SampleMessageHandler.class) // highlight-line + public String handleRequest(SQSEvent input, Context context) { + return "{\"statusCode\": 200}"; + } + + public class SampleMessageHandler implements SqsMessageHandler { + + @Override + public String process(SQSMessage message) { + // This will be called for each individual message from a batch + // It should raise an exception if the message was not processed successfully + String returnVal = doSomething(message.getBody()); + return returnVal; + } + } +} +``` + +### PowertoolsSqs Utility API + +If you require access to the result of processed messages, you can use this utility. + +The result from calling PowertoolsSqs#batchProcessor() on the context manager will be a list of all the return values from your SqsMessageHandler#process() function. + +```java:title=App.java +public class AppSqsEvent implements RequestHandler> { + @Override + public List handleRequest(SQSEvent input, Context context) { + List returnValues = PowertoolsSqs.batchProcessor(input, SampleMessageHandler.class); // highlight-line + + return returnValues; + } + + public class SampleMessageHandler implements SqsMessageHandler { + + @Override + public String process(SQSMessage message) { + // This will be called for each individual message from a batch + // It should raise an exception if the message was not processed successfully + String returnVal = doSomething(message.getBody()); + return returnVal; + } + } +} +``` + +You can also use the utility in a more functional way` by providing inline implementation of functional interface SqsMessageHandler#process() + +```java:title=App.java +public class AppSqsEvent implements RequestHandler> { + + @Override + public List handleRequest(SQSEvent input, Context context) { + // highlight-start + List returnValues = PowertoolsSqs.batchProcessor(input, (message) -> { + // This will be called for each individual message from a batch + // It should raise an exception if the message was not processed successfully + String returnVal = doSomething(message.getBody()); + return returnVal; + }); + // highlight-end + + return returnValues; + } +} +``` + +## Passing custom SqsClient + +If you need to pass custom SqsClient such as region to the SDK, you can pass your own `SqsClient` to be used by utility either for +**[SqsBatchProcessor annotation](#SqsBatchProcessor annotation)**, or **[PowertoolsSqs Utility API](#PowertoolsSqs Utility API)**. + +```java:title=App.java + +public class AppSqsEvent implements RequestHandler> { + // highlight-start + static { + PowertoolsSqs.defaultSqsClient(SqsClient.builder() + .build()); + } + // highlight-end + + @Override + public List handleRequest(SQSEvent input, Context context) { + List returnValues = PowertoolsSqs.batchProcessor(input, SampleMessageHandler.class); + + return returnValues; + } + + public class SampleMessageHandler implements SqsMessageHandler { + + @Override + public String process(SQSMessage message) { + // This will be called for each individual message from a batch + // It should raise an exception if the message was not processed successfully + String returnVal = doSomething(message.getBody()); + return returnVal; + } + } +} + +``` + +## Suppressing exceptions + +If you want to disable the default behavior where `SQSBatchProcessingException` is raised if there are any exception, you can pass the `suppressException` boolean argument. + +**Within SqsBatchProcessor annotation** + +```java:title=App.java +... + @Override + @SqsBatchProcessor(value = SampleMessageHandler.class, suppressException = true) // highlight-line + public String handleRequest(SQSEvent input, Context context) { + return "{\"statusCode\": 200}"; + } +``` + +**Within PowertoolsSqs Utility API** + +```java:title=App.java + @Override + public List handleRequest(SQSEvent input, Context context) { + List returnValues = PowertoolsSqs.batchProcessor(input, true, SampleMessageHandler.class); // highlight-line + + return returnValues; + } +``` diff --git a/docs/content/utilities/sqs_large_message_handling.mdx b/docs/content/utilities/sqs_large_message_handling.mdx index cb273f38c..e2e4a77ad 100644 --- a/docs/content/utilities/sqs_large_message_handling.mdx +++ b/docs/content/utilities/sqs_large_message_handling.mdx @@ -35,7 +35,7 @@ To install this utility, add the following dependency to your project. And configure the aspectj-maven-plugin to compile-time weave (CTW) the aws-lambda-powertools-java aspects into your project. You may already have this -plugin in your pom. In that case add the depenedency to the `aspectLibraries` +plugin in your pom. In that case add the dependency to the `aspectLibraries` section. ```xml @@ -51,12 +51,12 @@ section. 1.8 1.8 - ... + software.amazon.lambda powertools-sqs - ... + diff --git a/docs/gatsby-config.js b/docs/gatsby-config.js index d3cd330a1..b79126b44 100644 --- a/docs/gatsby-config.js +++ b/docs/gatsby-config.js @@ -29,7 +29,8 @@ module.exports = { ], 'Utilities': [ 'utilities/sqs_large_message_handling', - 'utilities/parameters' + 'utilities/batch', + 'utilities/parameters', ], }, navConfig: { From 7cce85f1290a1859f96444dc63da7b5571756ab2 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sun, 4 Oct 2020 12:53:37 +0200 Subject: [PATCH 08/12] Fix correct place holder for queuename and account --- .../lambda/powertools/sqs/internal/BatchContext.java | 8 +++++--- .../powertools/sqs/PowertoolsSqsBatchProcessorTest.java | 8 ++++++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java index eca4be3b8..d1d8da066 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/internal/BatchContext.java @@ -8,6 +8,7 @@ import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry; +import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException; @@ -68,15 +69,16 @@ private void deleteSuccessMessage() { .build()).collect(toList())) .build(); - client.deleteMessageBatch(request); + DeleteMessageBatchResponse deleteMessageBatchResponse = client.deleteMessageBatch(request); + LOG.debug(format("Response from delete request %s", deleteMessageBatchResponse)); } } private String url() { String[] arnArray = success.get(0).getEventSourceArn().split(":"); return client.getQueueUrl(GetQueueUrlRequest.builder() - .queueOwnerAWSAccountId(arnArray[1]) - .queueName(arnArray[2]) + .queueOwnerAWSAccountId(arnArray[4]) + .queueName(arnArray[5]) .build()) .queueUrl(); } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java index 002af9636..8020d5bba 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java @@ -9,6 +9,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import org.mockito.ArgumentCaptor; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest; import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest; @@ -107,6 +108,13 @@ void shouldBatchProcessAndDeleteSuccessMessageOnPartialFailures() { verify(interactionClient).listQueues(); verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + ArgumentCaptor captor = ArgumentCaptor.forClass(GetQueueUrlRequest.class); + verify(sqsClient).getQueueUrl(captor.capture()); + + assertThat(captor.getValue()) + .hasFieldOrPropertyWithValue("queueName", "my-queue") + .hasFieldOrPropertyWithValue("queueOwnerAWSAccountId", "123456789012"); } @Test From 33eb76d81acc2f708ac8f0e68dabd2860ab9ad94 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Sun, 4 Oct 2020 13:32:02 +0200 Subject: [PATCH 09/12] Example usage with relevant permissions --- example/HelloWorldFunction/pom.xml | 9 ++++ .../src/main/java/helloworld/AppSqsEvent.java | 35 +++++++++++++ .../main/java/helloworld/AppSqsEventUtil.java | 39 ++++++++++++++ example/events/eventSqs.json | 36 +++++++++++++ example/template.yaml | 51 +++++++++++++++++++ 5 files changed, 170 insertions(+) create mode 100644 example/HelloWorldFunction/src/main/java/helloworld/AppSqsEvent.java create mode 100644 example/HelloWorldFunction/src/main/java/helloworld/AppSqsEventUtil.java create mode 100644 example/events/eventSqs.json diff --git a/example/HelloWorldFunction/pom.xml b/example/HelloWorldFunction/pom.xml index 9ad3559f9..c23351de5 100644 --- a/example/HelloWorldFunction/pom.xml +++ b/example/HelloWorldFunction/pom.xml @@ -33,6 +33,11 @@ powertools-parameters 0.4.0-beta + + software.amazon.lambda + powertools-sqs + 0.4.0-beta + com.amazonaws aws-lambda-java-core @@ -90,6 +95,10 @@ software.amazon.lambda powertools-metrics + + software.amazon.lambda + powertools-sqs + diff --git a/example/HelloWorldFunction/src/main/java/helloworld/AppSqsEvent.java b/example/HelloWorldFunction/src/main/java/helloworld/AppSqsEvent.java new file mode 100644 index 000000000..ff9d050af --- /dev/null +++ b/example/HelloWorldFunction/src/main/java/helloworld/AppSqsEvent.java @@ -0,0 +1,35 @@ +package helloworld; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import software.amazon.lambda.powertools.logging.PowertoolsLogging; +import software.amazon.lambda.powertools.sqs.SqsBatchProcessor; +import software.amazon.lambda.powertools.sqs.SqsMessageHandler; + +import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; + +public class AppSqsEvent implements RequestHandler { + private static final Logger LOG = LogManager.getLogger(AppSqsEvent.class); + + @Override + @SqsBatchProcessor(SampleMessageHandler.class) + @PowertoolsLogging(logEvent = true) + public String handleRequest(SQSEvent input, Context context) { + return "{\"statusCode\": 200}"; + } + + public class SampleMessageHandler implements SqsMessageHandler { + + @Override + public String process(SQSMessage message) { + if("19dd0b57-b21e-4ac1-bd88-01bbb068cb99".equals(message.getMessageId())) { + throw new RuntimeException(message.getMessageId()); + } + LOG.info("Processing message with details {}", message); + return message.getMessageId(); + } + } +} diff --git a/example/HelloWorldFunction/src/main/java/helloworld/AppSqsEventUtil.java b/example/HelloWorldFunction/src/main/java/helloworld/AppSqsEventUtil.java new file mode 100644 index 000000000..a1300defc --- /dev/null +++ b/example/HelloWorldFunction/src/main/java/helloworld/AppSqsEventUtil.java @@ -0,0 +1,39 @@ +package helloworld; + +import java.util.List; + +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import software.amazon.lambda.powertools.sqs.PowertoolsSqs; +import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException; + +import static java.util.Collections.emptyList; + +public class AppSqsEventUtil implements RequestHandler> { + private static final Logger LOG = LogManager.getLogger(AppSqsEventUtil.class); + + @Override + public List handleRequest(SQSEvent input, Context context) { + try { + + return PowertoolsSqs.batchProcessor(input, (message) -> { + if ("19dd0b57-b21e-4ac1-bd88-01bbb068cb99".equals(message.getMessageId())) { + throw new RuntimeException(message.getMessageId()); + } + + LOG.info("Processing message with details {}", message); + return message.getMessageId(); + }); + + } catch (SQSBatchProcessingException e) { + LOG.info("Exception details {}", e.getMessage(), e); + LOG.info("Success message Returns{}", e.successMessageReturnValues()); + LOG.info("Failed messages {}", e.getFailures()); + LOG.info("Failed messages Reasons {}", e.getExceptions()); + return emptyList(); + } + } +} diff --git a/example/events/eventSqs.json b/example/events/eventSqs.json new file mode 100644 index 000000000..37a29c4dd --- /dev/null +++ b/example/events/eventSqs.json @@ -0,0 +1,36 @@ +{ + "Records": [ + { + "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb99", + "receiptHandle": "MessageReceiptHandle", + "body": "Hello from SQS!", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1523232000000", + "SenderId": "123456789012", + "ApproximateFirstReceiveTimestamp": "1523232000001" + }, + "messageAttributes": {}, + "md5OfBody": "7b270e59b47ff90a553787216d55d999", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-west-1:123456789:powertools-example-TestSqsQueue-1JW5W8N9", + "awsRegion": "eu-west-1" + }, + { + "messageId": "19dd0b57-b21e-4ac1-bd88-01bbb068cb78", + "receiptHandle": "MessageReceiptHandle", + "body": "Hello from SQS!", + "attributes": { + "ApproximateReceiveCount": "1", + "SentTimestamp": "1523232000000", + "SenderId": "123456789012", + "ApproximateFirstReceiveTimestamp": "1523232000001" + }, + "messageAttributes": {}, + "md5OfBody": "7b270e59b47ff90a553787216d55d91d", + "eventSource": "aws:sqs", + "eventSourceARN": "arn:aws:sqs:eu-west-1:123456789:powertools-example-TestSqsQueue-1JW5W8N9", + "awsRegion": "eu-west-1" + } + ] +} \ No newline at end of file diff --git a/example/template.yaml b/example/template.yaml index a7dd1b8be..9f279c2bf 100644 --- a/example/template.yaml +++ b/example/template.yaml @@ -125,6 +125,57 @@ Resources: Value: aGVsbG8gd29ybGQ= Description: Base64 SSM Parameter for lambda-powertools-java powertools-parameters module + TestSqsQueue: + Type: AWS::SQS::Queue + + HelloWorldSqsEventFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: HelloWorldFunction + Handler: helloworld.AppSqsEvent::handleRequest + Runtime: java8 + MemorySize: 512 + Tracing: Active + Policies: + - Statement: + - Sid: AdditionalPermisssionForPowertoolsSQSUtils + Effect: Allow + Action: + - sqs:GetQueueUrl + - sqs:DeleteMessageBatch + Resource: !GetAtt TestSqsQueue.Arn + Events: + TestSQSEvent: + Type: SQS + Properties: + Queue: !GetAtt TestSqsQueue.Arn + BatchSize: 10 + + TestAnotherSqsQueue: + Type: AWS::SQS::Queue + + HelloWorldSqsEventUtilFunction: + Type: AWS::Serverless::Function + Properties: + CodeUri: HelloWorldFunction + Handler: helloworld.AppSqsEventUtil::handleRequest + Runtime: java8 + MemorySize: 512 + Tracing: Active + Policies: + - Statement: + - Sid: AdditionalPermisssionForPowertoolsSQSUtils + Effect: Allow + Action: + - sqs:GetQueueUrl + - sqs:DeleteMessageBatch + Resource: !GetAtt TestAnotherSqsQueue.Arn + Events: + TestSQSEvent: + Type: SQS + Properties: + Queue: !GetAtt TestAnotherSqsQueue.Arn + BatchSize: 10 Outputs: # ServerlessRestApi is an implicit API created out of Events key under Serverless::Function From e4d2b1b4b5c21baac1dc92c5676900f5e1ed231a Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Mon, 5 Oct 2020 09:49:53 +0200 Subject: [PATCH 10/12] Minor doc updates --- docs/content/utilities/batch.mdx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx index babe709e9..61022161b 100644 --- a/docs/content/utilities/batch.mdx +++ b/docs/content/utilities/batch.mdx @@ -14,11 +14,11 @@ The SQS batch processing utility provides a way to handle partial failures when **Background** -When using SQS as a Lambda event source mapping, Lambda functions are triggered with a batch of messages from SQS. +When using SQS as a Lambda event source mapping, Lambda functions can be triggered with a batch of messages from SQS. -If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function is triggered with the same batch one more time. +If your function fails to process any message from the batch, the entire batch returns to your SQS queue, and your Lambda function will be triggered with the same batch again. -With this utility, messages within a batch are handled individually - only messages that were not successfully processed +With this utility, messages within a batch will be handled individually - only messages that were not successfully processed are returned to the queue. From 1b0eec334513e4f4c4086595bd7035c061a04376 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Mon, 5 Oct 2020 10:19:36 +0200 Subject: [PATCH 11/12] Ranme method to set custom sqs client --- docs/content/utilities/batch.mdx | 2 +- .../amazon/lambda/powertools/sqs/PowertoolsSqs.java | 8 ++------ .../powertools/sqs/PowertoolsSqsBatchProcessorTest.java | 4 ++-- .../sqs/internal/SqsMessageBatchProcessorAspectTest.java | 4 ++-- 4 files changed, 7 insertions(+), 11 deletions(-) diff --git a/docs/content/utilities/batch.mdx b/docs/content/utilities/batch.mdx index 61022161b..74401a726 100644 --- a/docs/content/utilities/batch.mdx +++ b/docs/content/utilities/batch.mdx @@ -198,7 +198,7 @@ If you need to pass custom SqsClient such as region to the SDK, you can pass you public class AppSqsEvent implements RequestHandler> { // highlight-start static { - PowertoolsSqs.defaultSqsClient(SqsClient.builder() + PowertoolsSqs.overrideSqsClient(SqsClient.builder() .build()); } // highlight-end diff --git a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java index 6bfea05a9..01ded6410 100644 --- a/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java +++ b/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/PowertoolsSqs.java @@ -92,7 +92,7 @@ public static R enrichedMessageFromS3(final SQSEvent sqsEvent, * * @param client {@link SqsClient} to be used by utility */ - public static void defaultSqsClient(SqsClient client) { + public static void overrideSqsClient(SqsClient client) { PowertoolsSqs.client = client; } @@ -229,7 +229,7 @@ public static List batchProcessor(final SQSEvent event, final SqsMessageHandler handler) { final List handlerReturn = new ArrayList<>(); - BatchContext batchContext = new BatchContext(defaultSqsClient()); + BatchContext batchContext = new BatchContext(client); for (SQSMessage message : event.getRecords()) { try { @@ -245,10 +245,6 @@ public static List batchProcessor(final SQSEvent event, return handlerReturn; } - private static SqsClient defaultSqsClient() { - return client; - } - private static SqsMessageHandler instantiatedHandler(final Class> handler) { try { diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java index 8020d5bba..c894081d4 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/PowertoolsSqsBatchProcessorTest.java @@ -26,7 +26,7 @@ import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.batchProcessor; -import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.defaultSqsClient; +import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.overrideSqsClient; class PowertoolsSqsBatchProcessorTest { @@ -44,7 +44,7 @@ void setUp() throws IOException { .queueUrl("test") .build()); - defaultSqsClient(sqsClient); + overrideSqsClient(sqsClient); } @Test diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java index 68999cc9a..59460e665 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java @@ -28,7 +28,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; -import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.defaultSqsClient; +import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.overrideSqsClient; public class SqsMessageBatchProcessorAspectTest { public static final SqsClient sqsClient = mock(SqsClient.class); @@ -41,7 +41,7 @@ public class SqsMessageBatchProcessorAspectTest { @BeforeEach void setUp() throws IOException { - defaultSqsClient(sqsClient); + overrideSqsClient(sqsClient); reset(sqsClient); setupContext(); event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class); From 583544055ed8d46d52ecc7bfc413b27674175104 Mon Sep 17 00:00:00 2001 From: Pankaj Agrawal Date: Mon, 5 Oct 2020 12:23:03 +0200 Subject: [PATCH 12/12] Make test less confusing --- .../PartialBatchFailureSuppressedHandler.java | 4 ++-- .../handlers/PartialBatchPartialFailureHandler.java | 4 ++-- .../sqs/handlers/PartialBatchSuccessHandler.java | 4 ++-- .../SqsMessageBatchProcessorAspectTest.java | 13 ++++++++----- 4 files changed, 14 insertions(+), 11 deletions(-) diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchFailureSuppressedHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchFailureSuppressedHandler.java index ea1cd8944..7bec0e091 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchFailureSuppressedHandler.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchFailureSuppressedHandler.java @@ -7,7 +7,7 @@ import software.amazon.lambda.powertools.sqs.SqsMessageHandler; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; -import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.sqsClient; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.mockedRandom; public class PartialBatchFailureSuppressedHandler implements RequestHandler { @Override @@ -25,7 +25,7 @@ public String process(SQSMessage message) { throw new RuntimeException("2e1424d4-f796-459a-8184-9c92662be6da"); } - sqsClient.listQueues(); + mockedRandom.nextInt(); return "Success"; } } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchPartialFailureHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchPartialFailureHandler.java index 43a569ca6..6301f84ef 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchPartialFailureHandler.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchPartialFailureHandler.java @@ -7,7 +7,7 @@ import software.amazon.lambda.powertools.sqs.SqsMessageHandler; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; -import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.sqsClient; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.mockedRandom; public class PartialBatchPartialFailureHandler implements RequestHandler { @Override @@ -25,7 +25,7 @@ public String process(SQSMessage message) { throw new RuntimeException("2e1424d4-f796-459a-8184-9c92662be6da"); } - sqsClient.listQueues(); + mockedRandom.nextInt(); return "Success"; } } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchSuccessHandler.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchSuccessHandler.java index d44b084da..009db08b9 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchSuccessHandler.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/handlers/PartialBatchSuccessHandler.java @@ -7,7 +7,7 @@ import software.amazon.lambda.powertools.sqs.SqsMessageHandler; import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage; -import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.sqsClient; +import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.mockedRandom; public class PartialBatchSuccessHandler implements RequestHandler { @Override @@ -21,7 +21,7 @@ private class InnerMessageHandler implements SqsMessageHandler { @Override public String process(SQSMessage message) { - sqsClient.listQueues(); + mockedRandom.nextInt(); return "Success"; } } diff --git a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java index 59460e665..7d7d3d023 100644 --- a/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java +++ b/powertools-sqs/src/test/java/software/amazon/lambda/powertools/sqs/internal/SqsMessageBatchProcessorAspectTest.java @@ -1,6 +1,7 @@ package software.amazon.lambda.powertools.sqs.internal; import java.io.IOException; +import java.util.Random; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; @@ -31,17 +32,19 @@ import static software.amazon.lambda.powertools.sqs.PowertoolsSqs.overrideSqsClient; public class SqsMessageBatchProcessorAspectTest { - public static final SqsClient sqsClient = mock(SqsClient.class); + public static final Random mockedRandom = mock(Random.class); + private static final SqsClient sqsClient = mock(SqsClient.class); private static final ObjectMapper MAPPER = new ObjectMapper(); private SQSEvent event; private RequestHandler requestHandler; - private Context context = mock(Context.class); + private final Context context = mock(Context.class); @BeforeEach void setUp() throws IOException { overrideSqsClient(sqsClient); + reset(mockedRandom); reset(sqsClient); setupContext(); event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class); @@ -57,7 +60,7 @@ void setUp() throws IOException { void shouldBatchProcessAllMessageSuccessfullyAndNotDeleteFromSQS() { requestHandler.handleRequest(event, context); - verify(sqsClient, times(2)).listQueues(); + verify(mockedRandom, times(2)).nextInt(); verify(sqsClient, times(0)).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); } @@ -83,7 +86,7 @@ void shouldBatchProcessMessageWithSuccessDeletedOnFailureInBatchFromSQS() { .contains("Success"); }); - verify(sqsClient).listQueues(); + verify(mockedRandom).nextInt(); verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); } @@ -93,7 +96,7 @@ void shouldBatchProcessMessageWithSuccessDeletedOnFailureWithSuppressionInBatchF requestHandler.handleRequest(event, context); - verify(sqsClient).listQueues(); + verify(mockedRandom).nextInt(); verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); }