Permalink
Browse files

Progress: cleanup in meaning or OriginalEventPosition in resolved events

  • Loading branch information...
ysw committed Feb 17, 2014
1 parent 1afe6a3 commit 4d7d5b76cdfb37641bb6fa3de167949049e12c20
Showing with 212 additions and 263 deletions.
  1. +35 −0 ...e/EventStore.Projections.Core.Tests/ClientAPI/specification_with_standard_projections_runnning.cs
  2. +1 −13 ...when_handling_delete/with_from_category_foreach_projection/when_running_and_events_are_indexed.cs
  3. +16 −19 ...m_and_tombstone.cs → when_running_and_events_are_indexed_but_a_stream_and_tombstone_postponed.cs}
  4. +1 −1 src/EventStore/EventStore.Projections.Core.Tests/EventStore.Projections.Core.Tests.csproj
  5. +4 −4 ...rojections.Core.Tests/Services/event_reader/stream_reader/when_handling_read_completed_and_eof.cs
  6. +4 −4 ...ore.Tests/Services/event_reader/stream_reader/when_handling_read_completed_stream_event_reader.cs
  7. +4 −4 ...ore.Tests/Services/event_reader/stream_reader/when_handling_read_completed_then_pause_then_eof.cs
  8. +0 −2 src/EventStore/EventStore.Projections.Core/EventStore.Projections.Core.csproj
  9. +14 −6 src/EventStore/EventStore.Projections.Core/Messages/ReaderSubscriptionMessage.cs
  10. +2 −2 src/EventStore/EventStore.Projections.Core/Services/Processing/ByStreamCatalogEventReader.cs
  11. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/BypassingEventFilter.cs
  12. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/CategoryEventFilter.cs
  13. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventFilter.cs
  14. +8 −16 src/EventStore/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs
  15. +10 −10 src/EventStore/EventStore.Projections.Core/Services/Processing/EventByTypeIndexPositionTagger.cs
  16. +1 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/EventFilter.cs
  17. +3 −3 src/EventStore/EventStore.Projections.Core/Services/Processing/EventReader.cs
  18. +2 −2 src/EventStore/EventStore.Projections.Core/Services/Processing/ExternallyFedByStreamEventReader.cs
  19. +5 −5 src/EventStore/EventStore.Projections.Core/Services/Processing/HeadingEventReader.cs
  20. +0 −58 src/EventStore/EventStore.Projections.Core/Services/Processing/IndexedEventTypeEventFilter.cs
  21. +0 −65 src/EventStore/EventStore.Projections.Core/Services/Processing/IndexedEventTypesEventFilter.cs
  22. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/MultiStreamEventFilter.cs
  23. +2 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/ReaderSubscription.cs
  24. +1 −1 src/EventStore/EventStore.Projections.Core/Services/Processing/ReaderSubscriptionBase.cs
  25. +22 −13 src/EventStore/EventStore.Projections.Core/Services/Processing/ResolvedEvent.cs
  26. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/StreamEventFilter.cs
  27. +7 −17 src/EventStore/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs
  28. +5 −0 src/EventStore/EventStore.Projections.Core/Services/Processing/TransactionFileEventFilter.cs
  29. +12 −8 src/EventStore/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs
  30. +3 −3 src/EventStore/EventStore.Projections.Core/Services/Processing/TransactionFilePositionTagger.cs
  31. +25 −7 src/EventStore/EventStore.Projections.Core/Standard/StreamDeletedHelper.cs
@@ -199,6 +199,26 @@ protected void AssertStreamTail(string streamId, params string[] events)
#endif
}
+ [Conditional("DEBUG")]
+ protected void DumpStream(string streamId)
+ {
+#if DEBUG
+ var result = _conn.ReadStreamEventsBackward(streamId, -1, 100, true, _admin);
+ switch (result.Status)
+ {
+ case SliceReadStatus.StreamDeleted:
+ Assert.Fail("Stream '{0}' is deleted", streamId);
+ break;
+ case SliceReadStatus.StreamNotFound:
+ Assert.Fail("Stream '{0}' does not exist", streamId);
+ break;
+ case SliceReadStatus.Success:
+ Dump("Dumping..", streamId, result.Events.Reverse().ToArray());
+ break;
+ }
+#endif
+ }
+
#if DEBUG
private void DumpFailed(string message, string streamId, string[] events, ResolvedEvent[] resultEvents)
{
@@ -213,6 +233,21 @@ private void DumpFailed(string message, string streamId, string[] events, Resolv
Assert.Fail(
"Stream: '{0}'\r\n{1}\r\n\r\nExisting events: \r\n{2}\r\n Expected events: \r\n{3}\r\n\r\nActual metas:{4}", streamId,
message, actual, expected, actualMeta);
+
+ }
+
+ private void Dump(string message, string streamId, ResolvedEvent[] resultEvents)
+ {
+ var actual = resultEvents.Aggregate(
+ "", (a, v) => a + ", " + v.OriginalEvent.EventType + ":" + v.OriginalEvent.DebugDataView);
+
+ var actualMeta = resultEvents.Aggregate(
+ "", (a, v) => a + "\r\n" + v.OriginalEvent.EventType + ":" + v.OriginalEvent.DebugMetadataView);
+
+
+ Debug.WriteLine(
+ "Stream: '{0}'\r\n{1}\r\n\r\nExisting events: \r\n{2}\r\n \r\nActual metas:{3}", streamId,
+ message, actual, actualMeta);
}
#endif
@@ -45,21 +45,9 @@ protected override void Given()
PostEvent("stream-1", "type2", "{}");
PostEvent("stream-2", "type1", "{}");
PostEvent("stream-2", "type2", "{}");
- WaitIdle();
- EnableStandardProjections();
- WaitIdle();
HardDeleteStream("stream-1");
WaitIdle();
- DisableStandardProjections();
- WaitIdle();
-
- // required to flush index checkpoint
- {
- EnableStandardProjections();
- WaitIdle();
- DisableStandardProjections();
- WaitIdle();
- }
+ EnableStandardProjections();
}
protected override void When()
@@ -31,7 +31,7 @@
namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete.with_from_category_foreach_projection
{
[TestFixture]
- public class when_running_and_events_are_indexed_but_a_stream_and_tombstone :
+ public class when_running_and_events_are_indexed_but_a_stream_and_tombstone_postponed :
specification_with_standard_projections_runnning
{
protected override bool GivenStandardProjectionsRunning()
@@ -49,24 +49,7 @@ protected override void Given()
WaitIdle();
DisableStandardProjections();
WaitIdle();
-
- // required to flush index checkpoint
- {
- EnableStandardProjections();
- WaitIdle();
- DisableStandardProjections();
- WaitIdle();
- }
-
- PostEvent("stream-1", "type1", "{}");
- PostEvent("stream-1", "type2", "{}");
- HardDeleteStream("stream-1");
- WaitIdle();
- }
-
- protected override void When()
- {
- base.When();
+
PostProjection(@"
fromCategory('stream').foreachStream().when({
$init: function(){return {a:0}},
@@ -76,11 +59,25 @@ protected override void When()
}).outputState();
");
WaitIdle();
+ // SUT projection must have been joined heading reader
+ EnableStandardProjections();
+ WaitIdle();
+ }
+
+ protected override void When()
+ {
+ base.When();
+ PostEvent("stream-1", "type1", "{}");
+ PostEvent("stream-1", "type2", "{}");
+ WaitIdle();
+ HardDeleteStream("stream-1");
+ WaitIdle();
}
[Test, Category("Network")]
public void receives_deleted_notification()
{
+ DumpStream("$ce-stream");
AssertStreamTail("$projections-test-projection-stream-1-result", "Result:{\"a\":2,\"deleted\":1}");
}
}
@@ -112,7 +112,7 @@
<Compile Include="ClientAPI\when_handling_delete\with_from_category_foreach_projection\recovery\when_running_and_events_are_indexed.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_category_foreach_projection\recovery\when_running_and_events_get_indexed_before_recovery.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_category_foreach_projection\when_running_and_events_are_indexed.cs" />
- <Compile Include="ClientAPI\when_handling_delete\with_from_category_foreach_projection\when_running_and_events_are_indexed_but_a_stream_and_tombstone.cs" />
+ <Compile Include="ClientAPI\when_handling_delete\with_from_category_foreach_projection\when_running_and_events_are_indexed_but_a_stream_and_tombstone_postponed.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_category_foreach_projection\when_running_and_events_are_indexed_but_more_events_and_tombstone.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_category_foreach_projection\when_running_and_events_are_indexed_but_tombstone.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_category_foreach_projection\when_running_and_no_indexing.cs" />
@@ -132,10 +132,10 @@ public void publishes_correct_committed_event_received_messages()
Assert.AreEqual(100, second.Data.Position.PreparePosition);
Assert.AreEqual(-1, first.Data.Position.CommitPosition);
Assert.AreEqual(-1, second.Data.Position.CommitPosition);
- Assert.AreEqual(50, first.Data.OriginalPosition.PreparePosition);
- Assert.AreEqual(100, second.Data.OriginalPosition.PreparePosition);
- Assert.AreEqual(-1, first.Data.OriginalPosition.CommitPosition);
- Assert.AreEqual(-1, second.Data.OriginalPosition.CommitPosition);
+ Assert.AreEqual(50, first.Data.EventOrLinkTargetPosition.PreparePosition);
+ Assert.AreEqual(100, second.Data.EventOrLinkTargetPosition.PreparePosition);
+ Assert.AreEqual(-1, first.Data.EventOrLinkTargetPosition.CommitPosition);
+ Assert.AreEqual(-1, second.Data.EventOrLinkTargetPosition.CommitPosition);
Assert.AreEqual(50, first.SafeTransactionFileReaderJoinPosition);
Assert.AreEqual(100, second.SafeTransactionFileReaderJoinPosition);
}
@@ -122,10 +122,10 @@ public void publishes_correct_committed_event_received_messages()
Assert.AreEqual(100, second.Data.Position.PreparePosition);
Assert.AreEqual(-1, first.Data.Position.CommitPosition);
Assert.AreEqual(-1, second.Data.Position.CommitPosition);
- Assert.AreEqual(50, first.Data.OriginalPosition.PreparePosition);
- Assert.AreEqual(100, second.Data.OriginalPosition.PreparePosition);
- Assert.AreEqual(-1, first.Data.OriginalPosition.CommitPosition);
- Assert.AreEqual(-1, second.Data.OriginalPosition.CommitPosition);
+ Assert.AreEqual(50, first.Data.EventOrLinkTargetPosition.PreparePosition);
+ Assert.AreEqual(100, second.Data.EventOrLinkTargetPosition.PreparePosition);
+ Assert.AreEqual(-1, first.Data.EventOrLinkTargetPosition.CommitPosition);
+ Assert.AreEqual(-1, second.Data.EventOrLinkTargetPosition.CommitPosition);
Assert.AreEqual(50, first.SafeTransactionFileReaderJoinPosition);
Assert.AreEqual(100, second.SafeTransactionFileReaderJoinPosition);
Assert.IsFalse(first.Data.IsJson);
@@ -133,10 +133,10 @@ public void publishes_correct_committed_event_received_messages()
Assert.AreEqual(100, second.Data.Position.PreparePosition);
Assert.AreEqual(-1, first.Data.Position.CommitPosition);
Assert.AreEqual(-1, second.Data.Position.CommitPosition);
- Assert.AreEqual(50, first.Data.OriginalPosition.PreparePosition);
- Assert.AreEqual(100, second.Data.OriginalPosition.PreparePosition);
- Assert.AreEqual(-1, first.Data.OriginalPosition.CommitPosition);
- Assert.AreEqual(-1, second.Data.OriginalPosition.CommitPosition);
+ Assert.AreEqual(50, first.Data.EventOrLinkTargetPosition.PreparePosition);
+ Assert.AreEqual(100, second.Data.EventOrLinkTargetPosition.PreparePosition);
+ Assert.AreEqual(-1, first.Data.EventOrLinkTargetPosition.CommitPosition);
+ Assert.AreEqual(-1, second.Data.EventOrLinkTargetPosition.CommitPosition);
Assert.AreEqual(50, first.SafeTransactionFileReaderJoinPosition);
Assert.AreEqual(100, second.SafeTransactionFileReaderJoinPosition);
}
@@ -221,8 +221,6 @@
<Compile Include="Services\Processing\ICoreProjectionForProcessingPhase.cs" />
<Compile Include="Services\Processing\IEventReader.cs" />
<Compile Include="Services\Processing\IEventWriter.cs" />
- <Compile Include="Services\Processing\IndexedEventTypeEventFilter.cs" />
- <Compile Include="Services\Processing\IndexedEventTypesEventFilter.cs" />
<Compile Include="Services\Processing\IEventProcessingPhase.cs" />
<Compile Include="Services\Processing\IProjectionProcessingPhase.cs" />
<Compile Include="Services\Processing\IReaderStrategy.cs" />
@@ -159,18 +159,21 @@ public override int MsgTypeId
private readonly string _partition;
private readonly int? _lastEventNumber;
- private readonly TFPos? _deleteEventPosition;
+ private readonly TFPos? _deleteLinkOrEventPosition;
+ private readonly TFPos? _deleteEventOrLinkTargetPosition;
private readonly string _positionStreamId;
private readonly int? _positionEventNumber;
public EventReaderPartitionDeleted(
- Guid correlationId, string partition, int? lastEventNumber, TFPos? deleteEventPosition,
- string positionStreamId, int? positionEventNumber, CheckpointTag preTagged = null, object source = null)
+ Guid correlationId, string partition, int? lastEventNumber, TFPos? deleteLinkOrEventPosition,
+ TFPos? deleteEventOrLinkTargetPosition, string positionStreamId, int? positionEventNumber,
+ CheckpointTag preTagged = null, object source = null)
: base(correlationId, preTagged, source)
{
_partition = partition;
_lastEventNumber = lastEventNumber;
- _deleteEventPosition = deleteEventPosition;
+ _deleteLinkOrEventPosition = deleteLinkOrEventPosition;
+ _deleteEventOrLinkTargetPosition = deleteEventOrLinkTargetPosition;
_positionStreamId = positionStreamId;
_positionEventNumber = positionEventNumber;
}
@@ -185,9 +188,9 @@ public string Partition
get { return _lastEventNumber; }
}
- public TFPos? DeleteEventPosition
+ public TFPos? DeleteEventOrLinkTargetPosition
{
- get { return _deleteEventPosition; }
+ get { return _deleteEventOrLinkTargetPosition; }
}
public string PositionStreamId
@@ -199,6 +202,11 @@ public string PositionStreamId
{
get { return _positionEventNumber; }
}
+
+ public TFPos? DeleteLinkOrEventPosition
+ {
+ get { return _deleteLinkOrEventPosition; }
+ }
}
public class EventReaderPartitionMeasured : SubscriptionMessage
@@ -163,12 +163,12 @@ private void ReadDataStreamCompleted(ClientMessage.ReadStreamEventsForwardComple
case ReadStreamResult.NoStream:
_dataNextSequenceNumber = int.MaxValue;
if (completed.LastEventNumber >= 0)
- SendPartitionDeleted(_dataStreamName, -1, null, null, null);
+ SendPartitionDeleted(_dataStreamName, -1, null, null, null, null);
PauseOrContinueProcessing();
break;
case ReadStreamResult.StreamDeleted:
_dataNextSequenceNumber = int.MaxValue;
- SendPartitionDeleted(_dataStreamName, -1, null, null, null);
+ SendPartitionDeleted(_dataStreamName, -1, null, null, null, null);
PauseOrContinueProcessing();
break;
case ReadStreamResult.Success:
@@ -35,6 +35,11 @@ public BypassingEventFilter()
{
}
+ public override bool DeletedNotificationPasses(string positionStreamId)
+ {
+ return true;
+ }
+
public override bool PassesSource(bool resolvedFromLinkTo, string positionStreamId, string eventType)
{
return true;
@@ -43,6 +43,11 @@ public CategoryEventFilter(string category, bool allEvents, HashSet<string> even
_categoryStream = "$ce-" + category;
}
+ public override bool DeletedNotificationPasses(string positionStreamId)
+ {
+ return _categoryStream == positionStreamId;
+ }
+
public override bool PassesSource(bool resolvedFromLinkTo, string positionStreamId, string eventType)
{
return resolvedFromLinkTo && _categoryStream == positionStreamId;
@@ -46,6 +46,11 @@ public EventByTypeIndexEventFilter(HashSet<string> events)
_streams = new HashSet<string>(from eventType in events select "$et-" + eventType);
}
+ public override bool DeletedNotificationPasses(string positionStreamId)
+ {
+ return true;
+ }
+
public override bool PassesSource(bool resolvedFromLinkTo, string positionStreamId, string eventType)
{
//TODO: add tests to assure that resolved by link events are not passed twice into the subscription?!!
@@ -181,39 +181,31 @@ protected State(EventByTypeIndexEventReader reader, IPrincipal readAs)
protected void DeliverEvent(float progress, ResolvedEvent resolvedEvent, TFPos position)
{
- if (resolvedEvent.OriginalPosition <= _reader._lastEventPosition)
+ if (resolvedEvent.EventOrLinkTargetPosition <= _reader._lastEventPosition)
return;
- _reader._lastEventPosition = resolvedEvent.OriginalPosition;
+ _reader._lastEventPosition = resolvedEvent.EventOrLinkTargetPosition;
_reader._deliveredEvents ++;
//TODO: this is incomplete. where reading from TF we need to handle actual deletes
- string partitionStreamId;
+ string deletedPartitionStreamId;
- bool isDeletedStreamEvent;
if (resolvedEvent.IsLinkToDeletedStream && !resolvedEvent.IsLinkToDeletedStreamTombstone)
return;
- if (resolvedEvent.IsLinkToDeletedStreamTombstone)
- {
- isDeletedStreamEvent = true;
- partitionStreamId = resolvedEvent.EventStreamId;
- }
- else
- {
- isDeletedStreamEvent = StreamDeletedHelper.IsStreamDeletedEvent(
- resolvedEvent.EventStreamId, resolvedEvent.EventType, resolvedEvent.Data, out partitionStreamId);
- }
+ bool isDeletedStreamEvent = StreamDeletedHelper.IsStreamDeletedEvent(
+ resolvedEvent, out deletedPartitionStreamId);
if (isDeletedStreamEvent)
{
- var deletedPartition = partitionStreamId;
+ var deletedPartition = deletedPartitionStreamId;
if (_reader._includeDeletedStreamNotification)
_reader._publisher.Publish(
//TODO: publish both link and event data
new ReaderSubscriptionMessage.EventReaderPartitionDeleted(
_reader.EventReaderCorrelationId, deletedPartition, source: this.GetType(),
- lastEventNumber: -1, deleteEventPosition: position,
+ lastEventNumber: -1, deleteEventOrLinkTargetPosition: position,
+ deleteLinkOrEventPosition: resolvedEvent.EventOrLinkTargetPosition,
positionStreamId: resolvedEvent.PositionStreamId,
positionEventNumber: resolvedEvent.PositionSequenceNumber));
}
Oops, something went wrong.

0 comments on commit 4d7d5b7

Please sign in to comment.