Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixed: an invalid event order detected when completing backward strea…

…m read

does not fault a projection
  • Loading branch information...
commit 6455c6ec445b2cfa19a5195de3de2b62c30d8ac9 1 parent be60ac5
@ysw ysw authored
View
7 ...entStore/EventStore.Projections.Core.Tests/ClientAPI/specification_with_standard_projections_runnning.cs
@@ -79,6 +79,13 @@ public override void TestFixtureSetUp()
When();
}
+ [TearDown]
+ public void PostTestAsserts()
+ {
+ var all = _manager.ListAll(_admin);
+ Assert.That(!all.Contains("Faulted"), "Projections faulted while running the test");
+ }
+
protected void EnableStandardProjections()
{
EnableProjection(ProjectionNamesBuilder.StandardProjections.EventByCategoryStandardProjection);
View
6 ...dling_delete/recovery/with_from_all_foreach_projection_running_and_events_get_indexed_before_recovery.cs
@@ -43,8 +43,8 @@ protected override void Given()
{
base.Given();
PostEvent("stream1", "type1", "{}");
- PostEvent("stream1", "type2", "{}");
PostEvent("stream2", "type1", "{}");
+ PostEvent("stream1", "type2", "{}");
PostEvent("stream2", "type2", "{}");
WaitIdle();
PostProjection(@"
@@ -56,10 +56,10 @@ protected override void Given()
}).outputState();
");
WaitIdle();
- _manager.Abort("test-projection", _admin);
- WaitIdle();
HardDeleteStream("stream1");
WaitIdle();
+ _manager.Abort("test-projection", _admin);
+ WaitIdle();
EnableStandardProjections();
WaitIdle();
DisableStandardProjections();
View
22 src/EventStore/EventStore.Projections.Core/Services/Processing/EmittedStream.cs
@@ -339,7 +339,14 @@ private void ReadStreamEventsBackwardCompleted(ClientMessage.ReadStreamEventsBac
var stop = CollectAlreadyCommittedEvents(message, upTo);
if (stop)
- SubmitWriteEventsInRecovery();
+ try
+ {
+ SubmitWriteEventsInRecovery();
+ }
+ catch (InvalidEmittedEventSequenceExceptioin ex)
+ {
+ Failed(ex.Message);
+ }
else
SubmitListEvents(upTo, message.NextEventNumber);
@@ -641,8 +648,9 @@ private void SubmitWriteEventsInRecovery()
private static void ValidateEmittedEventInRecoveryMode(Tuple<CheckpointTag, string, int> topAlreadyCommitted, EmittedEvent eventsToWrite)
{
- if (topAlreadyCommitted.Item1 != eventsToWrite.CausedByTag || topAlreadyCommitted.Item2 != eventsToWrite.EventType)
- throw new InvalidOperationException(
+ var failed = topAlreadyCommitted.Item1 != eventsToWrite.CausedByTag || topAlreadyCommitted.Item2 != eventsToWrite.EventType;
+ if (failed)
+ throw new InvalidEmittedEventSequenceExceptioin(
string.Format(
"An event emitted in recovery differ from the originally emitted event. Existing('{0}', '{1}'). New('{2}', '{3}')",
topAlreadyCommitted.Item2, topAlreadyCommitted.Item1, eventsToWrite.EventType, eventsToWrite.CausedByTag));
@@ -678,4 +686,12 @@ public void Handle(CoreProjectionProcessingMessage.EmittedStreamWriteCompleted m
ProcessWrites();
}
}
+
+ class InvalidEmittedEventSequenceExceptioin : Exception
+ {
+ public InvalidEmittedEventSequenceExceptioin(string message)
+ : base(message)
+ {
+ }
+ }
}
View
5 src/EventStore/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs
@@ -191,7 +191,10 @@ protected void DeliverEvent(float progress, ResolvedEvent resolvedEvent, TFPos p
bool isDeletedStreamEvent;
- if (resolvedEvent.IsLinkToDeletedStream)
+ if (resolvedEvent.IsLinkToDeletedStream && !resolvedEvent.IsLinkToDeletedStreamTombstone)
+ return;
+
+ if (resolvedEvent.IsLinkToDeletedStreamTombstone)
{
isDeletedStreamEvent = true;
partitionStreamId = resolvedEvent.EventStreamId;
View
7 src/EventStore/EventStore.Projections.Core/Services/Processing/ResolvedEvent.cs
@@ -59,6 +59,7 @@ public class ResolvedEvent
public readonly string PositionMetadata;
public readonly string StreamMetadata;
public readonly bool IsLinkToDeletedStream;
+ public readonly bool IsLinkToDeletedStreamTombstone;
public ResolvedEvent(EventStore.Core.Data.ResolvedEvent resolvedEvent, byte[] streamMetadata)
{
@@ -111,9 +112,9 @@ public ResolvedEvent(EventStore.Core.Data.ResolvedEvent resolvedEvent, byte[] st
originalPosition = new TFPos(-1, resolvedEvent.OriginalEvent.LogPosition);
JToken deletedValue;
- if (resolvedEvent.ResolveResult == ReadEventResult.StreamDeleted
- || resolvedEvent.ResolveResult == ReadEventResult.NoStream
- || extraMetadata != null && extraMetadata.TryGetValue("$deleted", out deletedValue))
+ IsLinkToDeletedStreamTombstone = extraMetadata != null && extraMetadata.TryGetValue("$deleted", out deletedValue);
+ if (resolvedEvent.ResolveResult == ReadEventResult.NoStream
+ || resolvedEvent.ResolveResult == ReadEventResult.StreamDeleted || IsLinkToDeletedStreamTombstone)
{
IsLinkToDeletedStream = true;
var streamId = SystemEventTypes.StreamReferenceEventToStreamId(
Please sign in to comment.
Something went wrong with that request. Please try again.