Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .autover/changes/0529a49e-6b48-4eea-874b-1a5c2be8b029.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"Projects": [
{
"Name": "AWS.Messaging",
"Type": "Patch",
"ChangelogMessages": [
"Fix issue with fifo when a message is failed to process the later messages are not retried"
]
},
{
"Name": "AWS.Messaging.Lambda",
"Type": "Patch",
"ChangelogMessages": [
"Update AWS.Messaging dependency"
]
}
]
}
4 changes: 4 additions & 0 deletions .github/workflows/BuildandTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ jobs:
uses: actions/setup-dotnet@4d6c8fcf3c8f7a60068d26b594648e99df24cee3 # pinning V4
with:
dotnet-version: 6.0.x
- name: Setup .NET Core 8.0
uses: actions/setup-dotnet@4d6c8fcf3c8f7a60068d26b594648e99df24cee3 # pinning V4
with:
dotnet-version: 8.0.x
- name: Restore dependencies
run: dotnet restore
- name: Build
Expand Down
11 changes: 9 additions & 2 deletions src/AWS.Messaging/Services/DefaultMessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -133,12 +133,19 @@ public async Task ProcessMessageGroupAsync(List<ConvertToEnvelopeResult> message
var remaining = messageGroup.Count;

// Sequentially process each message within the group
foreach (var message in messageGroup)
for(var i = 0; i < messageGroup.Count; i++)
{
var message = messageGroup[i];

var isSuccessful = await InvokeHandler(message.Envelope, message.Mapping, token);
if (!isSuccessful)
{
// If the handler invocation fails for any message, skip processing subsequent messages in the group.
// If the handler invocation fails for any message, report failure for the rest in the group so they are reported back to Lambda
// as not failure that should be retried. Otherwise Lambda will think the messages were successfully processed and delete the messages.
for (var unprocessedIndex = i + 1; unprocessedIndex < messageGroup.Count; unprocessedIndex++)
{
await _sqsMessageCommunication.ReportMessageFailureAsync(messageGroup[unprocessedIndex].Envelope, token);
}
_logger.LogError("Handler invocation failed for a message belonging to message group '{GroupdId}' having message ID '{MessageID}'. Skipping processing of {Remaining} messages from the same group.", groupId, message.Envelope.Id, remaining);
break;
}
Expand Down
38 changes: 34 additions & 4 deletions test/AWS.Messaging.UnitTests/LambdaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,15 +105,44 @@ public async Task MessageHandlerReturnsFailedStatusBatchResponse()
Id = "failed-message-1",
ReturnFailedStatus = true
};
var secondMessage = new SimulatedMessage
{
Id = "second-message-1",
ReturnFailedStatus = false
};

var tuple = await ExecuteWithBatchResponse(new SimulatedMessage[] { failedMessage });
var tuple = await ExecuteWithBatchResponse(new SimulatedMessage[] { failedMessage, secondMessage });
_mockSqs!.Verify(
expression: x => x.ChangeMessageVisibilityAsync(It.IsAny<ChangeMessageVisibilityRequest>(), It.IsAny<CancellationToken>()),
times: Times.Never);
Assert.Single(tuple.batchResponse.BatchItemFailures);
Assert.Equal("1", tuple.batchResponse.BatchItemFailures[0].ItemIdentifier);
}

[Fact]
public async Task MessageHandlerReturnsFailedStatusBatchResponseFifo()
{
var failedMessage = new SimulatedMessage
{
Id = "failed-message-1",
ReturnFailedStatus = true,
MessageGroupId = "group1"
};
var secondMessage = new SimulatedMessage
{
Id = "second-message-1",
ReturnFailedStatus = false,
MessageGroupId = "group1"
};

var tuple = await ExecuteWithBatchResponse(new SimulatedMessage[] { failedMessage, secondMessage }, isFifoQueue: true);
_mockSqs!.Verify(
expression: x => x.ChangeMessageVisibilityAsync(It.IsAny<ChangeMessageVisibilityRequest>(), It.IsAny<CancellationToken>()),
times: Times.Never);
Assert.Equal(2, tuple.batchResponse.BatchItemFailures.Count);
Assert.Equal("1", tuple.batchResponse.BatchItemFailures[0].ItemIdentifier);
}

[Fact]
public async Task MessageHandlerResetsVisibilityWhenFailedStatusBatchResponse()
{
Expand Down Expand Up @@ -304,13 +333,14 @@ private async Task<string> Execute(SimulatedMessage[] messages, int maxNumberOfC
SimulatedMessage[] messages,
int maxNumberOfConcurrentMessages = 1,
bool deleteMessagesWhenCompleted = false,
int? visibilityTimeoutForBatchFailures = default)
int? visibilityTimeoutForBatchFailures = default,
bool isFifoQueue = false)
{
var provider = CreateServiceProvider(
maxNumberOfConcurrentMessages: maxNumberOfConcurrentMessages,
deleteMessagesWhenCompleted: deleteMessagesWhenCompleted,
visibilityTimeoutForBatchFailures: visibilityTimeoutForBatchFailures);
var sqsEvent = await CreateLambdaEvent(provider, messages);
visibilityTimeoutForBatchFailures: visibilityTimeoutForBatchFailures, isFifoQueue: isFifoQueue);
var sqsEvent = await CreateLambdaEvent(provider, messages, isFifoQueue: isFifoQueue);

var logger = new TestLambdaLogger();
var context = new TestLambdaContext()
Expand Down
Loading