Skip to content

Commit

Permalink
Merge pull request #1508 from Particular/handle-poison-messages-in-st…
Browse files Browse the repository at this point in the history
…aging

If message consistently fails in staging phase of retries, ServiceControl is unable to retry any messages
  • Loading branch information
SzymonPobiega committed Dec 5, 2018
2 parents da49bba + b874d04 commit bbe3c63
Show file tree
Hide file tree
Showing 7 changed files with 222 additions and 54 deletions.
Expand Up @@ -782,6 +782,7 @@ namespace ServiceControl.Recoverability
public string FailedMessageId { get; set; }
public string Id { get; set; }
public string RetryBatchId { get; set; }
public int StageAttempts { get; set; }
public static string MakeDocumentId(string messageUniqueId) { }
}
public class FailedMessages_ByGroup : Raven.Client.Indexes.AbstractIndexCreationTask<ServiceControl.MessageFailures.FailedMessage, ServiceControl.Recoverability.FailureGroupMessageView>
Expand Down Expand Up @@ -835,6 +836,11 @@ namespace ServiceControl.Recoverability
string Name { get; }
string ClassifyFailure(ServiceControl.Recoverability.ClassifiableMessageDetails failureDetails);
}
public class MessageFailedInStaging
{
public MessageFailedInStaging() { }
public string UniqueMessageId { get; set; }
}
public class MessagesSubmittedForRetry
{
public MessagesSubmittedForRetry() { }
Expand Down
120 changes: 91 additions & 29 deletions src/ServiceControl.UnitTests/Recoverability/Retry_State_Tests.cs
Expand Up @@ -82,7 +82,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte
DocumentStore = documentStore
};

var processor = new RetryProcessor(sender, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(bodyStorage), documentStore, domainEvents, "TestEndpoint"), retryManager);
var processor = new RetryProcessor(documentStore, sender, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(bodyStorage), documentStore, domainEvents, "TestEndpoint"), retryManager);

documentStore.WaitForIndexing();

Expand All @@ -101,7 +101,7 @@ public async Task When_a_group_is_prepared_with_three_batches_and_SC_is_restarte
};
await documentManager.RebuildRetryOperationState(session);

processor = new RetryProcessor(sender, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(bodyStorage), documentStore, domainEvents, "TestEndpoint"), retryManager);
processor = new RetryProcessor(documentStore, sender, domainEvents, new TestReturnToSenderDequeuer(new ReturnToSender(bodyStorage), documentStore, domainEvents, "TestEndpoint"), retryManager);

await processor.ProcessBatches(session, CancellationToken.None);
await session.SaveChangesAsync();
Expand Down Expand Up @@ -130,7 +130,7 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed()
};

var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(bodyStorage), documentStore, domainEvents, "TestEndpoint");
var processor = new RetryProcessor(sender, domainEvents, returnToSender, retryManager);
var processor = new RetryProcessor(documentStore, sender, domainEvents, returnToSender, retryManager);

using (var session = documentStore.OpenAsyncSession())
{
Expand All @@ -146,6 +146,62 @@ public async Task When_a_group_is_forwarded_the_status_is_Completed()
}
}

[Test]
public async Task When_there_is_one_poison_message_it_is_removed_from_batch_and_the_status_is_Complete()
{
var domainEvents = new FakeDomainEvents();
var retryManager = new RetryingManager(domainEvents);

using (var documentStore = InMemoryStoreBuilder.GetInMemoryStore())
{
await CreateAFailedMessageAndMarkAsPartOfRetryBatch(documentStore, retryManager, "Test-group", true, "A", "B", "C");

var sender = new TestSender();
sender.Callback = operation =>
{
//Always fails staging message B
if (operation.Message.MessageId == "FailedMessages/B")
{
throw new Exception("Simulated");
}
};

var bodyStorage = new RavenAttachmentsBodyStorage
{
DocumentStore = documentStore
};

var returnToSender = new TestReturnToSenderDequeuer(new ReturnToSender(bodyStorage), documentStore, domainEvents, "TestEndpoint");
var processor = new RetryProcessor(documentStore, sender, domainEvents, returnToSender, retryManager);

bool c;
do
{
try
{
using (var session = documentStore.OpenAsyncSession())
{
c = await processor.ProcessBatches(session, CancellationToken.None);
await session.SaveChangesAsync();
}
}
catch (Exception)
{
//Continue trying until there is no exception -> poison message is removed from the batch
c = true;
}

} while (c);

var status = retryManager.GetStatusForRetryOperation("Test-group", RetryType.FailureGroup);

Assert.AreEqual(RetryState.Completed, status.RetryState);
Assert.AreEqual(3, status.NumberOfMessagesPrepared);
Assert.AreEqual(2, status.NumberOfMessagesForwarded);
Assert.AreEqual(1, status.NumberOfMessagesSkipped);
}
}

[Test]
public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_Forwarding()
{
Expand All @@ -165,7 +221,7 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_

var sender = new TestSender();

var processor = new RetryProcessor(sender, domainEvents, new TestReturnToSenderDequeuer(returnToSender, documentStore, domainEvents, "TestEndpoint"), retryManager);
var processor = new RetryProcessor(documentStore, sender, domainEvents, new TestReturnToSenderDequeuer(returnToSender, documentStore, domainEvents, "TestEndpoint"), retryManager);

documentStore.WaitForIndexing();

Expand All @@ -183,37 +239,37 @@ public async Task When_a_group_has_one_batch_out_of_two_forwarded_the_status_is_
}
}

async Task CreateAFailedMessageAndMarkAsPartOfRetryBatch(IDocumentStore documentStore, RetryingManager retryManager, string groupId, bool progressToStaged, int numberOfMessages)
Task CreateAFailedMessageAndMarkAsPartOfRetryBatch(IDocumentStore documentStore, RetryingManager retryManager, string groupId, bool progressToStaged, int numberOfMessages)
{
var messages = Enumerable.Range(0, numberOfMessages).Select(i =>
{
var id = Guid.NewGuid().ToString();
return CreateAFailedMessageAndMarkAsPartOfRetryBatch(documentStore, retryManager, groupId, progressToStaged, Enumerable.Range(0, numberOfMessages).Select(i => Guid.NewGuid().ToString()).ToArray());
}

return new FailedMessage
async Task CreateAFailedMessageAndMarkAsPartOfRetryBatch(IDocumentStore documentStore, RetryingManager retryManager, string groupId, bool progressToStaged, params string[] messageIds)
{
var messages = messageIds.Select(id => new FailedMessage
{
Id = FailedMessage.MakeDocumentId(id),
UniqueMessageId = id,
FailureGroups = new List<FailedMessage.FailureGroup>
{
Id = FailedMessage.MakeDocumentId(id),
UniqueMessageId = id,
FailureGroups = new List<FailedMessage.FailureGroup>
new FailedMessage.FailureGroup
{
new FailedMessage.FailureGroup
{
Id = groupId,
Title = groupId,
Type = groupId
}
},
Status = FailedMessageStatus.Unresolved,
ProcessingAttempts = new List<FailedMessage.ProcessingAttempt>
Id = groupId,
Title = groupId,
Type = groupId
}
},
Status = FailedMessageStatus.Unresolved,
ProcessingAttempts = new List<FailedMessage.ProcessingAttempt>
{
new FailedMessage.ProcessingAttempt
{
new FailedMessage.ProcessingAttempt
{
AttemptedAt = DateTime.UtcNow,
MessageMetadata = new Dictionary<string, object>(),
FailureDetails = new FailureDetails(),
Headers = new Dictionary<string, string>()
}
AttemptedAt = DateTime.UtcNow,
MessageMetadata = new Dictionary<string, object>(),
FailureDetails = new FailureDetails(),
Headers = new Dictionary<string, string>()
}
};
}
});

using (var session = documentStore.OpenAsyncSession())
Expand Down Expand Up @@ -282,8 +338,14 @@ public override Task Run(Predicate<MessageContext> filter, CancellationToken can

public class TestSender : IDispatchMessages
{
public Action<UnicastTransportOperation> Callback { get; set; } = m => { };

public Task Dispatch(TransportOperations outgoingMessages, TransportTransaction transaction, ContextBag context)
{
foreach (var operation in outgoingMessages.UnicastTransportOperations)
{
Callback(operation);
}
return Task.FromResult(0);
}
}
Expand Down
@@ -0,0 +1,14 @@
namespace ServiceControl.EventLog.Definitions
{
using Recoverability;

class MessageFailedInStagingDefinition : EventLogMappingDefinition<MessageFailedInStaging>
{
public MessageFailedInStagingDefinition()
{
TreatAsError();
RelatesToMessage(m => m.UniqueMessageId);
Description(m => $"All attempts to stage message {m.UniqueMessageId} failed. The message has been removed from the retry batch. Please contact Particular Software support and provide failure details from ServiceControl log files.");
}
}
}
@@ -0,0 +1,9 @@
namespace ServiceControl.Recoverability
{
using Infrastructure.DomainEvents;

public class MessageFailedInStaging : IDomainEvent
{
public string UniqueMessageId { get; set; }
}
}
Expand Up @@ -5,6 +5,7 @@ public class FailedMessageRetry
public string Id { get; set; }
public string FailedMessageId { get; set; }
public string RetryBatchId { get; set; }
public int StageAttempts { get; set; }

public static string MakeDocumentId(string messageUniqueId)
{
Expand Down

0 comments on commit bbe3c63

Please sign in to comment.