Skip to content
Browse files

Fixed: event-by-type-index checkpoint writing when starting standard

projections
Added: more tests on reading with deletes in event-by-type-index reader
  • Loading branch information...
1 parent 1fd6ca2 commit efddca19d3b18896363e859e6d189e1e6f63c232 @ysw ysw committed Feb 14, 2014
View
10 ...I/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed.cs
@@ -24,7 +24,15 @@ protected override void Given()
WaitIdle();
DisableStandardProjections();
WaitIdle();
-
+
+ // required to flush index checkpoint
+ {
+ EnableStandardProjections();
+ WaitIdle();
+ DisableStandardProjections();
+ WaitIdle();
+ }
+
}
protected override void When()
View
60 ..._from_all_foreach_projection_running_and_events_are_indexed_but_a_stream_and_tombstone.cs
@@ -0,0 +1,60 @@
+using NUnit.Framework;
+
+namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete
+{
+ [TestFixture]
+ public class with_from_all_foreach_projection_running_and_events_are_indexed_but_a_stream_and_tombstone :
+ specification_with_standard_projections_runnning
+ {
+ protected override bool GivenStandardProjectionsRunning()
+ {
+ return false;
+ }
+
+ protected override void Given()
+ {
+ base.Given();
+ PostEvent("stream2", "type1", "{}");
+ PostEvent("stream2", "type2", "{}");
+ WaitIdle();
+ EnableStandardProjections();
+ WaitIdle();
+ DisableStandardProjections();
+ WaitIdle();
+
+ // required to flush index checkpoint
+ {
+ EnableStandardProjections();
+ WaitIdle();
+ DisableStandardProjections();
+ WaitIdle();
+ }
+
+ PostEvent("stream1", "type1", "{}");
+ PostEvent("stream1", "type2", "{}");
+ HardDeleteStream("stream1");
+ WaitIdle();
+ }
+
+ protected override void When()
+ {
+ base.When();
+ PostProjection(@"
+fromAll().foreachStream().when({
+ $init: function(){return {a:0}},
+ type1: function(s,e){s.a++},
+ type2: function(s,e){s.a++},
+ $deleted: function(s,e){s.deleted=1},
+}).outputState();
+");
+ WaitIdle();
+ }
+
+ [Test, Category("Network")]
+ public void receives_deleted_notification()
+ {
+ AssertStreamTail(
+ "$projections-test-projection-stream1-result", "Result:{\"a\":2,\"deleted\":1}");
+ }
+ }
+}
View
19 ...om_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs
@@ -43,14 +43,23 @@ protected override void Given()
{
base.Given();
PostEvent("stream1", "type1", "{}");
- PostEvent("stream1", "type2", "{}");
PostEvent("stream2", "type1", "{}");
PostEvent("stream2", "type2", "{}");
WaitIdle();
EnableStandardProjections();
WaitIdle();
DisableStandardProjections();
WaitIdle();
+
+ // required to flush index checkpoint
+ {
+ EnableStandardProjections();
+ WaitIdle();
+ DisableStandardProjections();
+ WaitIdle();
+ }
+
+ PostEvent("stream1", "type2", "{}");
HardDeleteStream("stream1");
WaitIdle();
}
@@ -60,9 +69,9 @@ protected override void When()
base.When();
PostProjection(@"
fromAll().foreachStream().when({
- $init: function(){return {}},
- type1: function(s,e){s.a=1},
- type2: function(s,e){s.a=1},
+ $init: function(){return {a:0}},
+ type1: function(s,e){s.a++},
+ type2: function(s,e){},
$deleted: function(s,e){s.deleted=1},
}).outputState();
");
@@ -73,7 +82,7 @@ protected override void When()
public void receives_deleted_notification()
{
AssertStreamTail(
- "$projections-test-projection-stream1-result", "Result:{\"a\":1}", "Result:{\"a\":1,\"deleted\":1}");
+ "$projections-test-projection-stream1-result", "Result:{\"a\":0,\"deleted\":1}");
}
}
}
View
20 ...g_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone.cs
@@ -43,14 +43,24 @@ protected override void Given()
{
base.Given();
PostEvent("stream1", "type1", "{}");
+ PostEvent("stream1", "type2", "{}");
PostEvent("stream2", "type1", "{}");
PostEvent("stream2", "type2", "{}");
WaitIdle();
EnableStandardProjections();
WaitIdle();
DisableStandardProjections();
WaitIdle();
- PostEvent("stream1", "type2", "{}");
+
+ // required to flush index checkpoint
+ {
+ EnableStandardProjections();
+ WaitIdle();
+ DisableStandardProjections();
+ WaitIdle();
+ }
+
+
HardDeleteStream("stream1");
WaitIdle();
}
@@ -60,9 +70,9 @@ protected override void When()
base.When();
PostProjection(@"
fromAll().foreachStream().when({
- $init: function(){return {a:0}},
- type1: function(s,e){s.a++},
- type2: function(s,e){},
+ $init: function(){return {}},
+ type1: function(s,e){s.a=1},
+ type2: function(s,e){s.a=1},
$deleted: function(s,e){s.deleted=1},
}).outputState();
");
@@ -73,7 +83,7 @@ protected override void When()
public void receives_deleted_notification()
{
AssertStreamTail(
- "$projections-test-projection-stream1-result", "Result:{\"a\":0,\"deleted\":1}");
+ "$projections-test-projection-stream1-result", "Result:{\"deleted\":1}");
}
}
}
View
1 src/EventStore/EventStore.Projections.Core.Tests/EventStore.Projections.Core.Tests.csproj
@@ -110,6 +110,7 @@
<Compile Include="ClientAPI\specification_with_standard_projections_runnning.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed.cs" />
+ <Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed_but_a_stream_and_tombstone.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone.cs" />
<Compile Include="ClientAPI\with_standard_projections_running.cs" />
View
11 src/EventStore/EventStore.Projections.Core/Services/Processing/CoreProjection.cs
@@ -149,9 +149,10 @@ private enum State : uint
private void BeginPhase(IProjectionProcessingPhase processingPhase, CheckpointTag startFrom)
{
_projectionProcessingPhase = processingPhase;
+ _projectionProcessingPhase.SetProjectionState(PhaseState.Starting);
_checkpointManager = processingPhase.CheckpointManager;
- _projectionProcessingPhase.InitializeFromCheckpoint(startFrom);
+ _projectionProcessingPhase.InitializeFromCheckpoint(startFrom);
_checkpointManager.Start(startFrom);
}
@@ -392,6 +393,8 @@ private void GoToState(State state)
var wasStopped = _state == State.Stopped || _state == State.Faulted || _state == State.PhaseCompleted;
var wasStopping = _state == State.Stopping || _state == State.FaultedStopping
|| _state == State.CompletingPhase;
+ var wasStarting = _state == State.LoadStateRequested || _state == State.StateLoaded
+ || _state == State.Subscribed;
var wasStarted = _state == State.Subscribed || _state == State.Running || _state == State.Stopping
|| _state == State.FaultedStopping || _state == State.CompletingPhase;
var wasRunning = _state == State.Running;
@@ -418,6 +421,12 @@ private void GoToState(State state)
if (_projectionProcessingPhase != null) // null while loading state
switch (state)
{
+ case State.LoadStateRequested:
+ case State.StateLoaded:
+ case State.Subscribed:
+ if (!wasStarting)
+ _projectionProcessingPhase.SetProjectionState(PhaseState.Starting);
+ break;
case State.Running:
if (!wasRunning)
_projectionProcessingPhase.SetProjectionState(PhaseState.Running);
View
4 src/EventStore/EventStore.Projections.Core/Services/Processing/EmittedStream.cs
@@ -287,8 +287,8 @@ private void Failed(string reason)
private void ReadStreamEventsBackwardCompleted(ClientMessage.ReadStreamEventsBackwardCompleted message, CheckpointTag upTo)
{
- if (upTo == _zeroPosition)
- throw new ArgumentException("upTo cannot be equal to zero position");
+// if (upTo == _zeroPosition)
+// throw new ArgumentException("upTo cannot be equal to zero position");
if (!_awaitingListEventsCompleted)
throw new InvalidOperationException("ReadStreamEventsBackward has not been requested");
View
2 ...EventStore/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs
@@ -191,7 +191,7 @@ protected void DeliverEvent(float progress, ResolvedEvent resolvedEvent, TFPos p
bool isDeletedStreamEvent;
- if (resolvedEvent.IsLinkToStreamDeleted)
+ if (resolvedEvent.IsLinkToDeletedStream)
{
isDeletedStreamEvent = true;
partitionStreamId = resolvedEvent.EventStreamId;
View
7 ...entStore.Projections.Core/Services/Processing/EventProcessingProjectionProcessingPhase.cs
@@ -407,6 +407,11 @@ private void SetHandlerState(string partition)
public override void NewCheckpointStarted(CheckpointTag at)
{
+ if (!(_state == PhaseState.Running || _state == PhaseState.Starting))
+ {
+ Console.WriteLine("Starting a checkpoint in non-runnable state");
+ return;
+ }
var checkpointHandler = _projectionStateHandler as IProjectionCheckpointHandler;
if (checkpointHandler != null)
{
@@ -429,7 +434,7 @@ public override void NewCheckpointStarted(CheckpointTag at)
if (!ValidateEmittedEvents(emittedEvents))
return;
- if (_state == PhaseState.Running)
+ if (_state == PhaseState.Running || _state == PhaseState.Starting)
_resultWriter.EventsEmitted(emittedEvents, Guid.Empty, correlationId: null);
}
}
View
5 ...e.Projections.Core/Services/Processing/EventSubscriptionBasedProjectionProcessingPhase.cs
@@ -403,7 +403,6 @@ public void InitializeFromCheckpoint(CheckpointTag checkpointTag)
// this can be old checkpoint
var adjustedCheckpointTag = _readerStrategy.PositionTagger.AdjustTag(checkpointTag);
_processingQueue.InitializeQueue(adjustedCheckpointTag);
- NewCheckpointStarted(adjustedCheckpointTag);
}
public int GetBufferedEventCount()
@@ -618,8 +617,12 @@ public void Handle(EventReaderSubscriptionMessage.ReaderAssignedReader message)
public void SetProjectionState(PhaseState state)
{
+ var starting = _state == PhaseState.Starting && state == PhaseState.Running;
+
_state = state;
_processingQueue.SetIsRunning(state == PhaseState.Running);
+ if (starting)
+ NewCheckpointStarted(LastProcessedEventPosition);
}
}
}
View
3 src/EventStore/EventStore.Projections.Core/Services/Processing/IProjectionProcessingPhase.cs
@@ -9,7 +9,8 @@ public enum PhaseState
{
Unknown,
Stopped,
- Running
+ Starting,
+ Running,
}
View
23 src/EventStore/EventStore.Projections.Core/Services/Processing/ResolvedEvent.cs
@@ -58,7 +58,7 @@ public class ResolvedEvent
public readonly string Metadata;
public readonly string PositionMetadata;
public readonly string StreamMetadata;
- public readonly bool IsLinkToStreamDeleted;
+ public readonly bool IsLinkToDeletedStream;
public ResolvedEvent(EventStore.Core.Data.ResolvedEvent resolvedEvent, byte[] streamMetadata)
{
@@ -86,10 +86,10 @@ public ResolvedEvent(EventStore.Core.Data.ResolvedEvent resolvedEvent, byte[] st
TFPos originalPosition;
if (_resolvedLinkTo)
{
+ Dictionary<string, JToken> extraMetadata = null;
if (positionEvent.Metadata != null && positionEvent.Metadata.Length > 0)
{
//TODO: parse JSON only when unresolved link and just tag otherwise
- Dictionary<string, JToken> extraMetadata = null;
CheckpointTag tag;
if (resolvedEvent.Link != null && resolvedEvent.Event == null)
{
@@ -106,17 +106,20 @@ public ResolvedEvent(EventStore.Core.Data.ResolvedEvent resolvedEvent, byte[] st
originalPosition = parsedPosition != new TFPos(long.MinValue, long.MinValue)
? parsedPosition
: new TFPos(-1, resolvedEvent.OriginalEvent.LogPosition);
- JToken deletedValue;
- if (extraMetadata != null && extraMetadata.TryGetValue("$deleted", out deletedValue))
- {
- IsLinkToStreamDeleted = true;
- var streamId= SystemEventTypes.StreamReferenceEventToStreamId(
- SystemEventTypes.LinkTo, resolvedEvent.Link.Data);
- _eventStreamId = streamId;
- }
}
else
originalPosition = new TFPos(-1, resolvedEvent.OriginalEvent.LogPosition);
+
+ JToken deletedValue;
+ if (resolvedEvent.ResolveResult == ReadEventResult.StreamDeleted
+ || resolvedEvent.ResolveResult == ReadEventResult.NoStream
+ || extraMetadata != null && extraMetadata.TryGetValue("$deleted", out deletedValue))
+ {
+ IsLinkToDeletedStream = true;
+ var streamId = SystemEventTypes.StreamReferenceEventToStreamId(
+ SystemEventTypes.LinkTo, resolvedEvent.Link.Data);
+ _eventStreamId = streamId;
+ }
}
else
{

0 comments on commit efddca1

Please sign in to comment.
Something went wrong with that request. Please try again.