From 2dc8c4a88931210b6e0060dba57e5924f0b111de Mon Sep 17 00:00:00 2001 From: Hayley Campbell Date: Thu, 13 Jul 2017 15:39:01 +0200 Subject: [PATCH] Merge pull request #1356 from EventStore/retry_projection_registration_with_same_id Make use of the idempotency checks when attempting rewrites --- ...projection_and_registration_write_fails.cs | 25 ++++++++++++++++--- .../Services/Management/ProjectionManager.cs | 11 +++++--- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/src/EventStore.Projections.Core.Tests/Services/projections_manager/when_posting_a_persistent_projection_and_registration_write_fails.cs b/src/EventStore.Projections.Core.Tests/Services/projections_manager/when_posting_a_persistent_projection_and_registration_write_fails.cs index cb5cfb063dc..2817b415be2 100644 --- a/src/EventStore.Projections.Core.Tests/Services/projections_manager/when_posting_a_persistent_projection_and_registration_write_fails.cs +++ b/src/EventStore.Projections.Core.Tests/Services/projections_manager/when_posting_a_persistent_projection_and_registration_write_fails.cs @@ -8,12 +8,29 @@ using EventStore.Projections.Core.Services.Management; using NUnit.Framework; using EventStore.Projections.Core.Services.Processing; +using System.Collections; namespace EventStore.Projections.Core.Tests.Services.projections_manager { - [TestFixture] + public class FailureConditions : IEnumerable + { + public IEnumerator GetEnumerator() + { + yield return OperationResult.CommitTimeout; + yield return OperationResult.ForwardTimeout; + yield return OperationResult.PrepareTimeout; + } + } + + [TestFixture, TestFixtureSource(typeof(FailureConditions))] public class when_posting_a_persistent_projection_and_registration_write_fails : TestFixtureWithProjectionCoreAndManagementServices { + private OperationResult _failureCondition; + public when_posting_a_persistent_projection_and_registration_write_fails(OperationResult failureCondition) + { + _failureCondition = failureCondition; + } + protected override void Given() { NoStream("$projections-test-projection-order"); @@ -38,13 +55,15 @@ yield return } [Test, Category("v8")] - public void retries_creating_the_projection_only_the_specified_number_of_times() + public void retries_creating_the_projection_only_the_specified_number_of_times_and_the_same_event_id() { int retryCount = 0; var projectionRegistrationWrite = _consumer.HandledMessages.OfType().Where(x => x.EventStreamId == ProjectionNamesBuilder.ProjectionsRegistrationStream).Last(); + var eventId = projectionRegistrationWrite.Events[0].EventId; while (projectionRegistrationWrite != null) { - projectionRegistrationWrite.Envelope.ReplyWith(new ClientMessage.WriteEventsCompleted(projectionRegistrationWrite.CorrelationId, OperationResult.CommitTimeout, "Commit Timeout")); + Assert.AreEqual(eventId, projectionRegistrationWrite.Events[0].EventId); + projectionRegistrationWrite.Envelope.ReplyWith(new ClientMessage.WriteEventsCompleted(projectionRegistrationWrite.CorrelationId, _failureCondition, Enum.GetName(typeof(OperationResult), _failureCondition))); _queue.Process(); projectionRegistrationWrite = _consumer.HandledMessages.OfType().Where(x => x.EventStreamId == ProjectionNamesBuilder.ProjectionsRegistrationStream).LastOrDefault(); if(projectionRegistrationWrite != null) diff --git a/src/EventStore.Projections.Core/Services/Management/ProjectionManager.cs b/src/EventStore.Projections.Core/Services/Management/ProjectionManager.cs index 0cdd8a17bbf..f15a7a29a44 100644 --- a/src/EventStore.Projections.Core/Services/Management/ProjectionManager.cs +++ b/src/EventStore.Projections.Core/Services/Management/ProjectionManager.cs @@ -847,8 +847,10 @@ private void CreateSystemProjection(string name, Type handlerType, string config } if (message.Mode >= ProjectionMode.OneTime) { + var eventId = Guid.NewGuid(); BeginWriteProjectionRegistration( message.Name, + eventId, projectionId => { InitializeNewProjection(projectionId, message, version, replyEnvelope); @@ -1031,7 +1033,7 @@ private int GetNextWorkerIndex() return queueIndex; } - private void BeginWriteProjectionRegistration(string name, Action completed, IEnvelope envelope, int retryCount) + private void BeginWriteProjectionRegistration(string name, Guid eventId, Action completed, IEnvelope envelope, int retryCount) { var corrId = Guid.NewGuid(); _writeDispatcher.Publish( @@ -1043,17 +1045,18 @@ private void BeginWriteProjectionRegistration(string name, Action complete ProjectionNamesBuilder.ProjectionsRegistrationStream, ExpectedVersion.Any, new Event( - Guid.NewGuid(), + eventId, EventTypes.ProjectionCreated, false, Helper.UTF8NoBom.GetBytes(name), Empty.ByteArray), SystemAccount.Principal), - m => WriteProjectionRegistrationCompleted(m, completed, name, ProjectionNamesBuilder.ProjectionsRegistrationStream, envelope, retryCount)); + m => WriteProjectionRegistrationCompleted(m, eventId, completed, name, ProjectionNamesBuilder.ProjectionsRegistrationStream, envelope, retryCount)); } private void WriteProjectionRegistrationCompleted( ClientMessage.WriteEventsCompleted message, + Guid eventId, Action completed, string name, string eventStreamId, @@ -1077,7 +1080,7 @@ private void BeginWriteProjectionRegistration(string name, Action complete if (retryCount > 0) { _logger.Info("Retrying write projection registration for {0}", name); - BeginWriteProjectionRegistration(name, completed, replyEnvelope, --retryCount); + BeginWriteProjectionRegistration(name, eventId, completed, replyEnvelope, --retryCount); return; } }