Skip to content

Commit

Permalink
feat(batch-processing): test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
Pankaj Agrawal committed Aug 12, 2021
1 parent 2a2f1c0 commit b494fa2
Show file tree
Hide file tree
Showing 7 changed files with 293 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,17 @@ public static <R> List<R> batchProcessor(final SQSEvent event,
return batchProcessor(event, suppressException, handlerInstance, false, nonRetryableExceptions);
}

@SafeVarargs
public static <R> List<R> batchProcessor(final SQSEvent event,
final boolean suppressException,
final Class<? extends SqsMessageHandler<R>> handler,
final boolean deleteNonRetryableMessageFromQueue,
final Class<? extends Exception>... nonRetryableExceptions) {

SqsMessageHandler<R> handlerInstance = instantiatedHandler(handler);
return batchProcessor(event, suppressException, handlerInstance, deleteNonRetryableMessageFromQueue, nonRetryableExceptions);
}

/**
* This utility method is used to processes each {@link SQSMessage} inside received {@link SQSEvent}
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ && placedOnSqsEventRequestHandler(pjp)) {

SQSEvent sqsEvent = (SQSEvent) proceedArgs[0];

batchProcessor(sqsEvent, sqsBatch.suppressException(), sqsBatch.value(), sqsBatch.nonRetryableExceptions());
batchProcessor(sqsEvent,
sqsBatch.suppressException(),
sqsBatch.value(),
sqsBatch.deleteNonRetryableMessageFromQueue(),
sqsBatch.nonRetryableExceptions());
}

return pjp.proceed(proceedArgs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,36 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
verify(sqsClient).sendMessageBatch(any(Consumer.class));
}

@Test
void shouldBatchProcessAndDeleteNonRetryableException() {
String failedId = "2e1424d4-f796-459a-8184-9c92662be6da";
HashMap<QueueAttributeName, String> attributes = new HashMap<>();

attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
" \"maxReceiveCount\": 2\n" +
"}");

when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
.attributes(attributes)
.build());

List<String> batchProcessor = batchProcessor(event, false, (message) -> {
if (failedId.equals(message.getMessageId())) {
throw new IllegalStateException("Failed processing");
}

interactionClient.listQueues();
return "Success";
}, true, IllegalStateException.class, IllegalArgumentException.class);

Assertions.assertThat(batchProcessor)
.hasSize(1);

verify(sqsClient, times(0)).sendMessageBatch(any(Consumer.class));
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
}

public class FailureSampleInnerSqsHandler implements SqsMessageHandler<String> {
@Override
public String process(SQSEvent.SQSMessage message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public String process(SQSMessage message) {
throw new IllegalArgumentException("Invalid message and was moved to DLQ");
}

if("2e1424d4-f796-459a-9696-9c92662ba5da".equals(message.getMessageId())) {
throw new RuntimeException("Invalid message and should be reprocessed");
}

mockedRandom.nextInt();
return "Success";
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package software.amazon.lambda.powertools.sqs.handlers;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import software.amazon.lambda.powertools.sqs.SqsBatch;
import software.amazon.lambda.powertools.sqs.SqsMessageHandler;

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static software.amazon.lambda.powertools.sqs.internal.SqsMessageBatchProcessorAspectTest.mockedRandom;

public class SqsMessageHandlerWithNonRetryableHandlerWithDelete implements RequestHandler<SQSEvent, String> {

@Override
@SqsBatch(value = InnerMessageHandler.class,
nonRetryableExceptions = {IllegalStateException.class, IllegalArgumentException.class},
deleteNonRetryableMessageFromQueue = true)
public String handleRequest(final SQSEvent sqsEvent,
final Context context) {
return "Success";
}

private class InnerMessageHandler implements SqsMessageHandler<Object> {

@Override
public String process(SQSMessage message) {
if(message.getMessageId().isEmpty()) {
throw new IllegalArgumentException("Invalid message and was moved to DLQ");
}

if("2e1424d4-f796-459a-9696-9c92662ba5da".equals(message.getMessageId())) {
throw new RuntimeException("Invalid message and should be reprocessed");
}

mockedRandom.nextInt();
return "Success";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
Expand All @@ -23,11 +24,13 @@
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchPartialFailureHandler;
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchSuccessHandler;
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandlerWithNonRetryableHandler;
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandlerWithNonRetryableHandlerWithDelete;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand Down Expand Up @@ -131,6 +134,137 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
verify(sqsClient).sendMessageBatch(any(Consumer.class));
}

@Test
void shouldBatchProcessAndDeleteNonRetryableExceptionMessage() {
requestHandler = new SqsMessageHandlerWithNonRetryableHandlerWithDelete();
event.getRecords().get(0).setMessageId("");

requestHandler.handleRequest(event, context);

verify(mockedRandom).nextInt();
ArgumentCaptor<DeleteMessageBatchRequest> captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
verify(sqsClient).deleteMessageBatch(captor.capture());
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
verify(sqsClient, never()).getQueueAttributes(any(GetQueueAttributesRequest.class));

assertThat(captor.getValue())
.satisfies(deleteMessageBatchRequest -> assertThat(deleteMessageBatchRequest.entries())
.hasSize(2)
.extracting("id")
.contains("", "2e1424d4-f796-459a-8184-9c92662be6da"));
}

@Test
void shouldBatchProcessAndFailWithExceptionForNonRetryableExceptionAndNoDlq() {
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();

event.getRecords().get(0).setMessageId("");
event.getRecords().forEach(sqsMessage -> sqsMessage.setEventSourceArn(sqsMessage.getEventSourceArn() + "-temp"));

when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
.build());

assertThatExceptionOfType(SQSBatchProcessingException.class)
.isThrownBy(() -> requestHandler.handleRequest(event, context))
.satisfies(e -> {
assertThat(e.getExceptions())
.hasSize(1)
.extracting("detailMessage")
.containsExactly("Invalid message and was moved to DLQ");

assertThat(e.getFailures())
.hasSize(1)
.extracting("messageId")
.containsExactly("");

assertThat(e.successMessageReturnValues())
.hasSize(1)
.contains("Success");
});

verify(mockedRandom).nextInt();
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
}

@Test
void shouldBatchProcessAndFailWithExceptionForNonRetryableExceptionWhenFailedParsingPolicy() {
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
event.getRecords().get(0).setMessageId("");
event.getRecords().forEach(sqsMessage -> sqsMessage.setEventSourceArn(sqsMessage.getEventSourceArn() + "-temp-queue"));

HashMap<QueueAttributeName, String> attributes = new HashMap<>();

attributes.put(QueueAttributeName.REDRIVE_POLICY, "MalFormedRedrivePolicy");

when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
.attributes(attributes)
.build());

assertThatExceptionOfType(SQSBatchProcessingException.class)
.isThrownBy(() -> requestHandler.handleRequest(event, context))
.satisfies(e -> {
assertThat(e.getExceptions())
.hasSize(1)
.extracting("detailMessage")
.containsExactly("Invalid message and was moved to DLQ");

assertThat(e.getFailures())
.hasSize(1)
.extracting("messageId")
.containsExactly("");

assertThat(e.successMessageReturnValues())
.hasSize(1)
.contains("Success");
});

verify(mockedRandom).nextInt();
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
verify(sqsClient, never()).sendMessageBatch(any(Consumer.class));
}

@Test
void shouldBatchProcessAndMoveNonRetryableExceptionToDlqAndThrowException() throws IOException {
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
event = MAPPER.readValue(this.getClass().getResource("/threeMessageSqsBatchEvent.json"), SQSEvent.class);

event.getRecords().get(1).setMessageId("");

HashMap<QueueAttributeName, String> attributes = new HashMap<>();

attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
" \"maxReceiveCount\": 2\n" +
"}");

when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
.attributes(attributes)
.build());

assertThatExceptionOfType(SQSBatchProcessingException.class)
.isThrownBy(() -> requestHandler.handleRequest(event, context))
.satisfies(e -> {
assertThat(e.getExceptions())
.hasSize(1)
.extracting("detailMessage")
.containsExactly("Invalid message and should be reprocessed");

assertThat(e.getFailures())
.hasSize(1)
.extracting("messageId")
.containsExactly("2e1424d4-f796-459a-9696-9c92662ba5da");

assertThat(e.successMessageReturnValues())
.hasSize(1)
.contains("Success");
});

verify(mockedRandom).nextInt();
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
verify(sqsClient).sendMessageBatch(any(Consumer.class));
}

private void setupContext() {
when(context.getFunctionName()).thenReturn("testFunction");
when(context.getInvokedFunctionArn()).thenReturn("testArn");
Expand Down
70 changes: 70 additions & 0 deletions powertools-sqs/src/test/resources/threeMessageSqsBatchEvent.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
{
"records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-8184-9c92662be6da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649"
},
"messageAttributes": {
"Attribute3" : {
"binaryValue" : "MTEwMA==",
"dataType" : "Binary"
},
"Attribute2" : {
"stringValue" : "123",
"dataType" : "Number"
},
"Attribute1" : {
"stringValue" : "AttributeValue1",
"dataType" : "String"
}
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
"awsRegion": "us-east-2"
},
{
"messageId": "2e1424d4-f796-459a-9696-9c92662ba5da",
"receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...",
"body": "Test message.",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082650636",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082650649"
},
"messageAttributes": {
"Attribute2" : {
"stringValue" : "123",
"dataType" : "Number"
}
},
"md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3",
"eventSource": "aws:sqs",
"eventSourceArn": "arn:aws:sqs:us-east-1:906126917321:sqs-lambda-MySqsQueue-4DYWWJIE97N5",
"awsRegion": "us-east-2"
}
]
}

0 comments on commit b494fa2

Please sign in to comment.