Permalink
Browse files

Progress: skipping events by emitted stream if previous position

  • Loading branch information...
1 parent 6455c6e commit 853d237020f444329e0c4fc8a888c5b171970b68 @ysw ysw committed Feb 16, 2014
@@ -32,12 +32,15 @@ public sealed class EmittedEventEnvelope
{
public readonly EmittedEvent Event;
public readonly EmittedStream.WriterConfiguration.StreamMetadata StreamMetadata;
+ public readonly bool CausedByStreamDeletedNotification;
public EmittedEventEnvelope(
- EmittedEvent @event, EmittedStream.WriterConfiguration.StreamMetadata streamMetadata = null)
+ EmittedEvent @event, EmittedStream.WriterConfiguration.StreamMetadata streamMetadata = null,
+ bool causedByStreamDeletedNotification = false)
{
Event = @event;
StreamMetadata = streamMetadata;
+ CausedByStreamDeletedNotification = causedByStreamDeletedNotification;
}
}
@@ -181,7 +181,7 @@ public ILogger Logger
_noCheckpoints = noCheckpoints;
}
- public void EmitEvents(EmittedEvent[] events)
+ public void EmitEvents(EmittedEventEnvelope[] events)
{
if (events == null) throw new ArgumentNullException("events");
CheckpointTag groupCausedBy = null;
@@ -637,23 +637,28 @@ private void SubmitWriteEventsInRecovery()
SubmitWriteEvents();
return;
}
- var topAlreadyCommitted = _alreadyCommittedEvents.Pop();
- ValidateEmittedEventInRecoveryMode(topAlreadyCommitted, eventToWrite);
+ var topAlreadyCommitted = ValidateEmittedEventInRecoveryMode(eventToWrite);
+ if (topAlreadyCommitted == null)
+ continue; // means skipped one already comitted item due to deleted stream handling
anyFound = true;
NotifyEventCommitted(eventToWrite, topAlreadyCommitted.Item3);
_pendingWrites.Dequeue(); // drop already committed event
}
OnWriteCompleted();
}
- private static void ValidateEmittedEventInRecoveryMode(Tuple<CheckpointTag, string, int> topAlreadyCommitted, EmittedEvent eventsToWrite)
+ private Tuple<CheckpointTag, string, int> ValidateEmittedEventInRecoveryMode(EmittedEvent eventsToWrite)
{
+ var topAlreadyCommitted = _alreadyCommittedEvents.Pop();
+ if (topAlreadyCommitted.Item1 < eventsToWrite.CausedByTag)
+ return null;
var failed = topAlreadyCommitted.Item1 != eventsToWrite.CausedByTag || topAlreadyCommitted.Item2 != eventsToWrite.EventType;
if (failed)
throw new InvalidEmittedEventSequenceExceptioin(
string.Format(
"An event emitted in recovery differ from the originally emitted event. Existing('{0}', '{1}'). New('{2}', '{3}')",
topAlreadyCommitted.Item2, topAlreadyCommitted.Item1, eventsToWrite.EventType, eventsToWrite.CausedByTag));
+ return topAlreadyCommitted;
}
private void RecoveryCompleted()

0 comments on commit 853d237

Please sign in to comment.