Skip to content
This repository has been archived by the owner on Dec 13, 2023. It is now read-only.

Issue#241 fix AMQPObservableQueue behaviour to return failed message… #242

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,16 +187,16 @@ public Address[] getAddresses() {
}

public List<String> ack(List<Message> messages) {
final List<String> processedDeliveryTags = new ArrayList<>();
final List<String> failedMessages = new ArrayList<>();
for (final Message message : messages) {
try {
ackMsg(message);
processedDeliveryTags.add(message.getReceipt());
} catch (final Exception e) {
LOGGER.error("Cannot ACK message with delivery tag {}", message.getReceipt(), e);
failedMessages.add(message.getReceipt());
}
}
return processedDeliveryTags;
return failedMessages;
}

public void ackMsg(Message message) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,12 +334,12 @@ void runObserve(
}

@Test
public void testGetMessagesFromExistingExchangeWithDefaultConfiguration() throws IOException, TimeoutException {
public void testGetMessagesFromExistingExchangeWithDefaultConfiguration()
throws IOException, TimeoutException {
// Mock channel and connection
Channel channel = mockBaseChannel();
Connection connection = mockGoodConnection(channel);
testGetMessagesFromExchangeAndDefaultConfiguration(
channel, connection, true, true);
testGetMessagesFromExchangeAndDefaultConfiguration(channel, connection, true, true);
}

@Test
Expand Down Expand Up @@ -386,8 +386,9 @@ public void testAck() throws IOException, TimeoutException {
msg.setPayload("Payload");
msg.setReceipt("1");
messages.add(msg);
List<String> deliveredTags = observableQueue.ack(messages);
assertNotNull(deliveredTags);
List<String> failedMessages = observableQueue.ack(messages);
assertNotNull(failedMessages);
assertTrue(failedMessages.isEmpty());
}

private void testGetMessagesFromExchangeAndDefaultConfiguration(
Expand Down