Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Prevent message to be marked as success if failed sending to DLQ #731

Merged
merged 2 commits into from Feb 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/utilities/batch.md
Expand Up @@ -93,7 +93,11 @@ This utility requires additional permissions to work as expected. Lambda functio
If you are also using [nonRetryableExceptions](#move-non-retryable-messages-to-a-dead-letter-queue) attribute, utility will need additional permission of `sqs:GetQueueAttributes` on source SQS.
It also needs `sqs:SendMessage` and `sqs:SendMessageBatch` on configured dead letter queue.

Refer [example project](https://github.com/aws-samples/aws-lambda-powertools-examples/blob/main/java/SqsBatchProcessing/template.yaml#L67) for policy details example.
If source or dead letter queue is configured to use encryption at rest using [AWS Key Management Service (KMS)](https://aws.amazon.com/kms/), function will need additional permissions of
`kms:GenerateDataKey` and `kms:Decrypt` on the KMS key being used for encryption. Refer [docs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-key-management.html#compatibility-with-aws-services) for more details.

Refer [example project](https://github.com/aws-samples/aws-lambda-powertools-examples/blob/main/java/SqsBatchProcessing/template.yaml#L105) for policy details example.


## Processing messages from SQS

Expand Down
Expand Up @@ -6,7 +6,8 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -158,17 +159,23 @@ private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Except
})
.collect(toList());

batchRequest(dlqMessages, 10, entriesToSend -> {
List<SendMessageBatchResponse> sendMessageBatchResponses = batchRequest(dlqMessages, 10, entriesToSend -> {

SendMessageBatchResponse sendMessageBatchResponse = client.sendMessageBatch(SendMessageBatchRequest.builder()
.entries(entriesToSend)
.queueUrl(dlqUrl.get())
.build());
.entries(entriesToSend)
.queueUrl(dlqUrl.get())
.build());


LOG.debug("Response from send batch message to DLQ request {}", sendMessageBatchResponse);

return sendMessageBatchResponse;
});

return true;
return sendMessageBatchResponses.stream()
.filter(response -> null != response && response.hasFailed())
.peek(sendMessageBatchResponse -> LOG.error("Failed sending message to the DLQ. Entire batch will be re processed. Check if needed permissions are configured for the function. Response: {}", sendMessageBatchResponse))
.count() == 0;
}


Expand Down Expand Up @@ -220,17 +227,21 @@ private void deleteMessagesFromQueue(final List<SQSMessage> messages) {
DeleteMessageBatchResponse deleteMessageBatchResponse = client.deleteMessageBatch(request);

LOG.debug("Response from delete request {}", deleteMessageBatchResponse);

return deleteMessageBatchResponse;
});
}
}

private <T> void batchRequest(final List<T> listOFEntries,
final int size,
final Consumer<List<T>> batchLogic) {
IntStream.range(0, listOFEntries.size())
private <T, R> List<R> batchRequest(final List<T> listOFEntries,
final int size,
final Function<List<T>, R> batchLogic) {

return IntStream.range(0, listOFEntries.size())
.filter(index -> index % size == 0)
.mapToObj(index -> listOFEntries.subList(index, Math.min(index + size, listOFEntries.size())))
.forEach(batchLogic);
.map(batchLogic)
.collect(Collectors.toList());
}

private String url(String queueArn) {
Expand Down
@@ -1,10 +1,7 @@
package software.amazon.lambda.powertools.sqs.internal;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.function.Consumer;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
Expand All @@ -15,11 +12,13 @@
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.BatchResultErrorEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchResponse;
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway;
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchFailureSuppressedHandler;
Expand All @@ -30,7 +29,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.in;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
Expand Down Expand Up @@ -137,6 +135,38 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class));
}

@Test
void shouldBatchProcessAndThrowExceptionForNonRetryableExceptionWhenMoveToDlqReturnFailedResponse() {
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
event.getRecords().get(0).setMessageId("");

when(sqsClient.sendMessageBatch(any(SendMessageBatchRequest.class))).thenReturn(SendMessageBatchResponse.builder()
.failed(BatchResultErrorEntry.builder()
.message("Permission Error")
.code("KMS.AccessDeniedException")
.senderFault(true)
.build())
.build());

HashMap<QueueAttributeName, String> attributes = new HashMap<>();

attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
" \"maxReceiveCount\": 2\n" +
"}");

when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
.attributes(attributes)
.build());

Assertions.assertThatExceptionOfType(SQSBatchProcessingException.class).
isThrownBy(() -> requestHandler.handleRequest(event, context));

verify(interactionClient).listQueues();
verify(sqsClient).sendMessageBatch(any(SendMessageBatchRequest.class));
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
}

@Test
void shouldBatchProcessAndDeleteNonRetryableExceptionMessage() {
requestHandler = new SqsMessageHandlerWithNonRetryableHandlerWithDelete();
Expand Down