Skip to content

Commit

Permalink
Merge pull request #1356 from EventStore/retry_projection_registratio…
Browse files Browse the repository at this point in the history
…n_with_same_id

Make use of the idempotency checks when attempting rewrites
  • Loading branch information
hayley-jean committed Jul 31, 2017
1 parent ffe2e99 commit 2dc8c4a
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 7 deletions.
Expand Up @@ -8,12 +8,29 @@
using EventStore.Projections.Core.Services.Management;
using NUnit.Framework;
using EventStore.Projections.Core.Services.Processing;
using System.Collections;

namespace EventStore.Projections.Core.Tests.Services.projections_manager
{
[TestFixture]
public class FailureConditions : IEnumerable
{
public IEnumerator GetEnumerator()
{
yield return OperationResult.CommitTimeout;
yield return OperationResult.ForwardTimeout;
yield return OperationResult.PrepareTimeout;
}
}

[TestFixture, TestFixtureSource(typeof(FailureConditions))]
public class when_posting_a_persistent_projection_and_registration_write_fails : TestFixtureWithProjectionCoreAndManagementServices
{
private OperationResult _failureCondition;
public when_posting_a_persistent_projection_and_registration_write_fails(OperationResult failureCondition)
{
_failureCondition = failureCondition;
}

protected override void Given()
{
NoStream("$projections-test-projection-order");
Expand All @@ -38,13 +55,15 @@ yield return
}

[Test, Category("v8")]
public void retries_creating_the_projection_only_the_specified_number_of_times()
public void retries_creating_the_projection_only_the_specified_number_of_times_and_the_same_event_id()
{
int retryCount = 0;
var projectionRegistrationWrite = _consumer.HandledMessages.OfType<ClientMessage.WriteEvents>().Where(x => x.EventStreamId == ProjectionNamesBuilder.ProjectionsRegistrationStream).Last();
var eventId = projectionRegistrationWrite.Events[0].EventId;
while (projectionRegistrationWrite != null)
{
projectionRegistrationWrite.Envelope.ReplyWith(new ClientMessage.WriteEventsCompleted(projectionRegistrationWrite.CorrelationId, OperationResult.CommitTimeout, "Commit Timeout"));
Assert.AreEqual(eventId, projectionRegistrationWrite.Events[0].EventId);
projectionRegistrationWrite.Envelope.ReplyWith(new ClientMessage.WriteEventsCompleted(projectionRegistrationWrite.CorrelationId, _failureCondition, Enum.GetName(typeof(OperationResult), _failureCondition)));
_queue.Process();
projectionRegistrationWrite = _consumer.HandledMessages.OfType<ClientMessage.WriteEvents>().Where(x => x.EventStreamId == ProjectionNamesBuilder.ProjectionsRegistrationStream).LastOrDefault();
if(projectionRegistrationWrite != null)
Expand Down
Expand Up @@ -847,8 +847,10 @@ private void CreateSystemProjection(string name, Type handlerType, string config
}
if (message.Mode >= ProjectionMode.OneTime)
{
var eventId = Guid.NewGuid();
BeginWriteProjectionRegistration(
message.Name,
eventId,
projectionId =>
{
InitializeNewProjection(projectionId, message, version, replyEnvelope);
Expand Down Expand Up @@ -1031,7 +1033,7 @@ private int GetNextWorkerIndex()
return queueIndex;
}

private void BeginWriteProjectionRegistration(string name, Action<long> completed, IEnvelope envelope, int retryCount)
private void BeginWriteProjectionRegistration(string name, Guid eventId, Action<long> completed, IEnvelope envelope, int retryCount)
{
var corrId = Guid.NewGuid();
_writeDispatcher.Publish(
Expand All @@ -1043,17 +1045,18 @@ private void BeginWriteProjectionRegistration(string name, Action<long> complete
ProjectionNamesBuilder.ProjectionsRegistrationStream,
ExpectedVersion.Any,
new Event(
Guid.NewGuid(),
eventId,
EventTypes.ProjectionCreated,
false,
Helper.UTF8NoBom.GetBytes(name),
Empty.ByteArray),
SystemAccount.Principal),
m => WriteProjectionRegistrationCompleted(m, completed, name, ProjectionNamesBuilder.ProjectionsRegistrationStream, envelope, retryCount));
m => WriteProjectionRegistrationCompleted(m, eventId, completed, name, ProjectionNamesBuilder.ProjectionsRegistrationStream, envelope, retryCount));
}

private void WriteProjectionRegistrationCompleted(
ClientMessage.WriteEventsCompleted message,
Guid eventId,
Action<long> completed,
string name,
string eventStreamId,
Expand All @@ -1077,7 +1080,7 @@ private void BeginWriteProjectionRegistration(string name, Action<long> complete
if (retryCount > 0)
{
_logger.Info("Retrying write projection registration for {0}", name);
BeginWriteProjectionRegistration(name, completed, replyEnvelope, --retryCount);
BeginWriteProjectionRegistration(name, eventId, completed, replyEnvelope, --retryCount);
return;
}
}
Expand Down

0 comments on commit 2dc8c4a

Please sign in to comment.