From 605963d3b5e59d7e0dcc423c8619eb4f764f22b2 Mon Sep 17 00:00:00 2001 From: Leonie Adis Date: Thu, 20 May 2021 16:50:18 +0200 Subject: [PATCH] Execute all received sqs messages --- .../SimpleMessageListenerContainer.java | 13 ++----- .../SimpleMessageListenerContainerTest.java | 39 +++++++++++++++++++ 2 files changed, 43 insertions(+), 9 deletions(-) diff --git a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java index 3c22bce35..a710cfdc8 100644 --- a/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java +++ b/spring-cloud-aws-messaging/src/main/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainer.java @@ -338,15 +338,10 @@ public void run() { ? groupByMessageGroupId(receiveMessageResult) : groupByMessage(receiveMessageResult); CountDownLatch messageBatchLatch = new CountDownLatch(messageGroups.size()); for (MessageGroup messageGroup : messageGroups) { - if (isQueueRunning(this.logicalQueueName)) { - MessageGroupExecutor messageGroupExecutor = new MessageGroupExecutor(this.logicalQueueName, - messageGroup, this.queueAttributes); - getTaskExecutor() - .execute(new SignalExecutingRunnable(messageBatchLatch, messageGroupExecutor)); - } - else { - messageBatchLatch.countDown(); - } + MessageGroupExecutor messageGroupExecutor = new MessageGroupExecutor(this.logicalQueueName, + messageGroup, this.queueAttributes); + getTaskExecutor() + .execute(new SignalExecutingRunnable(messageBatchLatch, messageGroupExecutor)); } try { messageBatchLatch.await(); diff --git a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java index e976371fc..1a602b96f 100644 --- a/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java +++ b/spring-cloud-aws-messaging/src/test/java/io/awspring/cloud/messaging/listener/SimpleMessageListenerContainerTest.java @@ -367,6 +367,45 @@ public void handleMessage(org.springframework.messaging.Message message) thro container.start(); } + @Test + void testAllMessageReceivedAreConsumed() throws Exception { + SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); + + AmazonSQSAsync sqs = mock(AmazonSQSAsync.class, withSettings().stubOnly()); + container.setAmazonSqs(sqs); + + CountDownLatch countDownLatch = new CountDownLatch(1); + QueueMessageHandler messageHandler = new QueueMessageHandler() { + @Override + public void handleMessage(org.springframework.messaging.Message message) throws MessagingException { + countDownLatch.countDown(); + } + }; + container.setMessageHandler(messageHandler); + StaticApplicationContext applicationContext = new StaticApplicationContext(); + applicationContext.registerSingleton("testMessageListener", TestMessageListener.class); + messageHandler.setApplicationContext(applicationContext); + container.setBeanName("testContainerName"); + messageHandler.afterPropertiesSet(); + + mockGetQueueUrl(sqs, "testQueue", "http://testSimpleReceiveMessage.amazonaws.com"); + mockGetQueueAttributesWithEmptyResult(sqs, "http://testSimpleReceiveMessage.amazonaws.com"); + + container.afterPropertiesSet(); + + when(sqs.receiveMessage( + new ReceiveMessageRequest("http://testSimpleReceiveMessage.amazonaws.com").withAttributeNames("All") + .withMessageAttributeNames("All").withMaxNumberOfMessages(10).withWaitTimeSeconds(20))) + .thenReturn(new ReceiveMessageResult().withMessages( + new Message().withBody("messageContent"))); + when(sqs.getQueueAttributes(any(GetQueueAttributesRequest.class))).thenReturn(new GetQueueAttributesResult()); + + container.start(); + container.stop(); + + assertThat(countDownLatch.await(1, TimeUnit.SECONDS)).isTrue(); + } + @Test void listener_withMultipleMessageHandlers_shouldBeCalled() throws Exception { CountDownLatch countDownLatch = new CountDownLatch(2);