Skip to content

Commit

Permalink
chore(performance): Build queue url from queue arn instead of API call
Browse files Browse the repository at this point in the history
  • Loading branch information
Pankaj Agrawal committed Aug 12, 2021
1 parent 242736b commit 2a2f1c0
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
Expand All @@ -28,12 +27,12 @@

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static java.lang.String.format;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;

public final class BatchContext {
private static final Logger LOG = LoggerFactory.getLogger(BatchContext.class);
private static final Map<String, String> queueArnToQueueUrlMapping = new HashMap<>();
private static final Map<String, String> queueArnToDlqUrlMapping = new HashMap<>();
private static final Map<String, String> QUEUE_ARN_TO_DLQ_URL_MAPPING = new HashMap<>();

private final Map<SQSMessage, Exception> messageToException = new HashMap<>();
private final List<SQSMessage> success = new ArrayList<>();
Expand Down Expand Up @@ -91,6 +90,10 @@ public final <T> void processSuccessAndHandleFailed(final List<T> successReturns

deleteMessagesFromQueue(messagesToBeDeleted);

if (failedMessages.isEmpty()) {
return;
}

if (suppressException) {
List<String> messageIds = failedMessages.stream().
map(SQSMessage::getMessageId)
Expand Down Expand Up @@ -148,21 +151,27 @@ private boolean moveNonRetryableMessagesToDlqIfConfigured(Map<SQSMessage, Except
private Optional<String> fetchDlqUrl(Map<SQSMessage, Exception> nonRetryableMessageToException) {
return nonRetryableMessageToException.keySet().stream()
.findFirst()
.map(sqsMessage -> queueArnToDlqUrlMapping.computeIfAbsent(sqsMessage.getEventSourceArn(), sourceArn -> {
.map(sqsMessage -> QUEUE_ARN_TO_DLQ_URL_MAPPING.computeIfAbsent(sqsMessage.getEventSourceArn(), sourceArn -> {
String queueUrl = url(sourceArn);

GetQueueAttributesResponse queueAttributes = client.getQueueAttributes(GetQueueAttributesRequest.builder()
.attributeNames(QueueAttributeName.REDRIVE_POLICY)
.queueUrl(queueUrl)
.build());

try {
JsonNode jsonNode = SqsUtils.objectMapper().readTree(queueAttributes.attributes().get(QueueAttributeName.REDRIVE_POLICY));
return url(jsonNode.get("deadLetterTargetArn").asText());
} catch (JsonProcessingException e) {
LOG.debug("Unable to parse Re drive policy for queue {}. Even if DLQ exists, failed messages will be send back to main queue.", queueUrl, e);
return null;
}
return ofNullable(queueAttributes.attributes().get(QueueAttributeName.REDRIVE_POLICY))
.map(policy -> {
try {
return SqsUtils.objectMapper().readTree(policy);
} catch (JsonProcessingException e) {
LOG.debug("Unable to parse Re drive policy for queue {}. Even if DLQ exists, failed messages will be send back to main queue.", queueUrl, e);
return null;
}
})
.map(node -> node.get("deadLetterTargetArn"))
.map(JsonNode::asText)
.map(this::url)
.orElse(null);
}));
}

Expand All @@ -186,14 +195,7 @@ private void deleteMessagesFromQueue(final List<SQSMessage> messages) {
}

private String url(String queueArn) {
return queueArnToQueueUrlMapping.computeIfAbsent(queueArn, s -> {
String[] arnArray = queueArn.split(":");

return client.getQueueUrl(GetQueueUrlRequest.builder()
.queueOwnerAWSAccountId(arnArray[4])
.queueName(arnArray[5])
.build())
.queueUrl();
});
String[] arnArray = queueArn.split(":");
return String.format("https://sqs.%s.amazonaws.com/%s/%s", arnArray[3], arnArray[4], arnArray[5]);
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
package software.amazon.lambda.powertools.sqs;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.function.Consumer;

import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;

import static com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -39,11 +43,6 @@ class SqsUtilsBatchProcessorTest {
void setUp() throws IOException {
reset(sqsClient, interactionClient);
event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class);

when(sqsClient.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(GetQueueUrlResponse.builder()
.queueUrl("test")
.build());

overrideSqsClient(sqsClient);
}

Expand Down Expand Up @@ -107,14 +106,12 @@ void shouldBatchProcessAndDeleteSuccessMessageOnPartialFailures() {
});

verify(interactionClient).listQueues();
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));

ArgumentCaptor<GetQueueUrlRequest> captor = ArgumentCaptor.forClass(GetQueueUrlRequest.class);
verify(sqsClient).getQueueUrl(captor.capture());
ArgumentCaptor<DeleteMessageBatchRequest> captor = ArgumentCaptor.forClass(DeleteMessageBatchRequest.class);
verify(sqsClient).deleteMessageBatch(captor.capture());

assertThat(captor.getValue())
.hasFieldOrPropertyWithValue("queueName", "my-queue")
.hasFieldOrPropertyWithValue("queueOwnerAWSAccountId", "123456789012");
.hasFieldOrPropertyWithValue("queueUrl", "https://sqs.us-east-2.amazonaws.com/123456789012/my-queue");
}

@Test
Expand Down Expand Up @@ -219,6 +216,16 @@ public String process(SQSMessage message) {
@Test
void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
String failedId = "2e1424d4-f796-459a-8184-9c92662be6da";
HashMap<QueueAttributeName, String> attributes = new HashMap<>();

attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
" \"maxReceiveCount\": 2\n" +
"}");

when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
.attributes(attributes)
.build());

List<String> batchProcessor = batchProcessor(event, (message) -> {
if (failedId.equals(message.getMessageId())) {
Expand All @@ -228,6 +235,11 @@ void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
interactionClient.listQueues();
return "Success";
}, IllegalStateException.class, IllegalArgumentException.class);

Assertions.assertThat(batchProcessor)
.hasSize(1);

verify(sqsClient).sendMessageBatch(any(Consumer.class));
}

public class FailureSampleInnerSqsHandler implements SqsMessageHandler<String> {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package software.amazon.lambda.powertools.sqs.internal;

import java.io.IOException;
import java.util.HashMap;
import java.util.Random;
import java.util.function.Consumer;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
Expand All @@ -12,13 +14,15 @@
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlResponse;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
import software.amazon.lambda.powertools.sqs.SQSBatchProcessingException;
import software.amazon.lambda.powertools.sqs.handlers.LambdaHandlerApiGateway;
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchFailureSuppressedHandler;
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchPartialFailureHandler;
import software.amazon.lambda.powertools.sqs.handlers.PartialBatchSuccessHandler;
import software.amazon.lambda.powertools.sqs.handlers.SqsMessageHandlerWithNonRetryableHandler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
Expand Down Expand Up @@ -48,11 +52,6 @@ void setUp() throws IOException {
reset(sqsClient);
setupContext();
event = MAPPER.readValue(this.getClass().getResource("/sampleSqsBatchEvent.json"), SQSEvent.class);

when(sqsClient.getQueueUrl(any(GetQueueUrlRequest.class))).thenReturn(GetQueueUrlResponse.builder()
.queueUrl("test")
.build());

requestHandler = new PartialBatchSuccessHandler();
}

Expand Down Expand Up @@ -109,6 +108,29 @@ void shouldNotTakeEffectOnNonSqsEventHandler() {
verifyNoInteractions(sqsClient);
}

@Test
void shouldBatchProcessAndMoveNonRetryableExceptionToDlq() {
requestHandler = new SqsMessageHandlerWithNonRetryableHandler();
event.getRecords().get(0).setMessageId("");

HashMap<QueueAttributeName, String> attributes = new HashMap<>();

attributes.put(QueueAttributeName.REDRIVE_POLICY, "{\n" +
" \"deadLetterTargetArn\": \"arn:aws:sqs:us-east-2:123456789012:retry-queue\",\n" +
" \"maxReceiveCount\": 2\n" +
"}");

when(sqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(GetQueueAttributesResponse.builder()
.attributes(attributes)
.build());

requestHandler.handleRequest(event, context);

verify(mockedRandom).nextInt();
verify(sqsClient).deleteMessageBatch(any(DeleteMessageBatchRequest.class));
verify(sqsClient).sendMessageBatch(any(Consumer.class));
}

private void setupContext() {
when(context.getFunctionName()).thenReturn("testFunction");
when(context.getInvokedFunctionArn()).thenReturn("testArn");
Expand Down

0 comments on commit 2a2f1c0

Please sign in to comment.