Skip to content

Commit

Permalink
Progress: partially correct deleted notification reading
Browse files Browse the repository at this point in the history
in EventByTypeindexEventReader (indexed part)
  • Loading branch information
ysw committed Feb 13, 2014
1 parent d7acfd2 commit 2e6704b
Show file tree
Hide file tree
Showing 9 changed files with 152 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,24 +79,37 @@ public override void TestFixtureSetUp()
When();
}

private void EnableStandardProjections()
protected void EnableStandardProjections()
{
EnableProjection(ProjectionNamesBuilder.StandardProjections.EventByCategoryStandardProjection);
EnableProjection(ProjectionNamesBuilder.StandardProjections.EventByTypeStandardProjection);
EnableProjection(ProjectionNamesBuilder.StandardProjections.StreamByCategoryStandardProjection);
EnableProjection(ProjectionNamesBuilder.StandardProjections.StreamsStandardProjection);
}

protected void DisableStandardProjections()
{
DisableProjection(ProjectionNamesBuilder.StandardProjections.EventByCategoryStandardProjection);
DisableProjection(ProjectionNamesBuilder.StandardProjections.EventByTypeStandardProjection);
DisableProjection(ProjectionNamesBuilder.StandardProjections.StreamByCategoryStandardProjection);
DisableProjection(ProjectionNamesBuilder.StandardProjections.StreamsStandardProjection);
}

protected virtual bool GivenStandardProjectionsRunning()
{
return true;
}

private void EnableProjection(string name)
protected void EnableProjection(string name)
{
_manager.Enable(name, _admin);
}

protected void DisableProjection(string name)
{
_manager.Disable(name, _admin);
}

[TestFixtureTearDown]
public override void TestFixtureTearDown()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
using NUnit.Framework;

namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete
{
[TestFixture]
public class with_from_all_foreach_projection_running_and_events_are_indexed : specification_with_standard_projections_runnning
{
protected override bool GivenStandardProjectionsRunning()
{
return false;
}

protected override void Given()
{
base.Given();
PostEvent("stream1", "type1", "{}");
PostEvent("stream1", "type2", "{}");
PostEvent("stream2", "type1", "{}");
PostEvent("stream2", "type2", "{}");
WaitIdle();
EnableStandardProjections();
WaitIdle();
HardDeleteStream("stream1");
WaitIdle();
DisableStandardProjections();
WaitIdle();

}

protected override void When()
{
base.When();
PostProjection(@"
fromAll().foreachStream().when({
$init: function(){return {}},
type1: function(s,e){s.a=1},
type2: function(s,e){s.a=1},
$deleted: function(s,e){s.deleted=1},
}).outputState();
");
WaitIdle();
}

[Test, Category("Network")]
public void receives_deleted_notification()
{
AssertStreamTail(
"$projections-test-projection-stream1-result", "Result:{\"a\":1}", "Result:{\"a\":1,\"deleted\":1}");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
<Compile Include="ClientAPI\event_by_type_index.cs" />
<Compile Include="ClientAPI\specification_with_standard_projections_runnning.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed.cs" />
<Compile Include="ClientAPI\with_standard_projections_running.cs" />
<Compile Include="Integration\from_streams_matching\when_running_without_stream_metadata.cs" />
<Compile Include="Integration\from_streams_matching\when_running_wit_stream_metadata.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
using EventStore.Projections.Core.Messages;
using EventStore.Projections.Core.Standard;
using EventStore.Projections.Core.Utils;
using Newtonsoft.Json.Linq;

namespace EventStore.Projections.Core.Services.Processing
{
Expand All @@ -48,6 +49,7 @@ public class EventByTypeIndexEventReader : EventReader
private const int _maxReadCount = 111;
private readonly HashSet<string> _eventTypes;
private readonly bool _resolveLinkTos;
private readonly bool _includeDeletedStreamNotification;
private readonly ITimeProvider _timeProvider;

private class PendingEvent
Expand All @@ -73,16 +75,20 @@ public PendingEvent(EventStore.Core.Data.ResolvedEvent resolvedEvent, TFPos tfPo

public EventByTypeIndexEventReader(
IODispatcher ioDispatcher, IPublisher publisher, Guid eventReaderCorrelationId, IPrincipal readAs,
string[] eventTypes, TFPos fromTfPosition, Dictionary<string, int> fromPositions, bool resolveLinkTos,
ITimeProvider timeProvider, bool stopOnEof = false, int? stopAfterNEvents = null)
string[] eventTypes, bool includeDeletedStreamNotification, TFPos fromTfPosition,
Dictionary<string, int> fromPositions, bool resolveLinkTos, ITimeProvider timeProvider,
bool stopOnEof = false, int? stopAfterNEvents = null)
: base(ioDispatcher, publisher, eventReaderCorrelationId, readAs, stopOnEof, stopAfterNEvents)
{
if (eventTypes == null) throw new ArgumentNullException("eventTypes");
if (timeProvider == null) throw new ArgumentNullException("timeProvider");
if (eventTypes.Length == 0) throw new ArgumentException("empty", "eventTypes");

_includeDeletedStreamNotification = includeDeletedStreamNotification;
_timeProvider = timeProvider;
_eventTypes = new HashSet<string>(eventTypes);
if (includeDeletedStreamNotification)
_eventTypes.Add("$deleted");
_streamToEventType = eventTypes.ToDictionary(v => "$et-" + v, v => v);
_lastEventPosition = fromTfPosition;
_resolveLinkTos = resolveLinkTos;
Expand Down Expand Up @@ -180,20 +186,33 @@ protected void DeliverEvent(float progress, ResolvedEvent resolvedEvent, TFPos p
_reader._lastEventPosition = resolvedEvent.OriginalPosition;
_reader._deliveredEvents ++;
//TODO: this is incomplete. where reading from TF we need to handle actual deletes

string partitionStreamId;
var isDeletedStreamEvent = StreamDeletedHelper.IsStreamDeletedEvent(
resolvedEvent.EventStreamId, resolvedEvent.EventType, resolvedEvent.Data, out partitionStreamId);


bool isDeletedStreamEvent;
if (resolvedEvent.IsLinkToStreamDeleted)
{
isDeletedStreamEvent = true;
partitionStreamId = resolvedEvent.EventStreamId;
}
else
{
isDeletedStreamEvent = StreamDeletedHelper.IsStreamDeletedEvent(
resolvedEvent.EventStreamId, resolvedEvent.EventType, resolvedEvent.Data, out partitionStreamId);
}
if (isDeletedStreamEvent)
{
var deletedPartition = partitionStreamId;

_reader._publisher.Publish(
//TODO: publish both link and event data
new ReaderSubscriptionMessage.EventReaderPartitionDeleted(
_reader.EventReaderCorrelationId, deletedPartition, source: this.GetType(),
lastEventNumber: -1, deleteEventPosition: position,
positionStreamId: resolvedEvent.PositionStreamId,
positionEventNumber: resolvedEvent.PositionSequenceNumber));
if (_reader._includeDeletedStreamNotification)
_reader._publisher.Publish(
//TODO: publish both link and event data
new ReaderSubscriptionMessage.EventReaderPartitionDeleted(
_reader.EventReaderCorrelationId, deletedPartition, source: this.GetType(),
lastEventNumber: -1, deleteEventPosition: position,
positionStreamId: resolvedEvent.PositionStreamId,
positionEventNumber: resolvedEvent.PositionSequenceNumber));
}
else
_reader._publisher.Publish(
Expand Down Expand Up @@ -641,6 +660,8 @@ public void Handle(ClientMessage.ReadAllEventsForwardCompleted message)
StreamDeletedHelper.IsStreamDeletedEvent(
@event.OriginalStreamId, @event.OriginalEvent.EventType,
@event.OriginalEvent.Data, out adjustedPositionStreamId);
if (data == null)
continue;
var eventType = isDeleteStreamEvent ? "$deleted" : data.EventType;
var byEvent = link == null && _eventTypes.Contains(eventType);
var originalTfPosition = @event.OriginalPosition.Value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,15 @@ public class EventByTypeIndexPositionTagger : PositionTagger
private readonly HashSet<string> _eventTypes;
private readonly Dictionary<string, string> _streamToEventType;

public EventByTypeIndexPositionTagger(int phase, string[] eventTypes): base(phase)
public EventByTypeIndexPositionTagger(
int phase, string[] eventTypes, bool includeStreamDeletedNotification = false)
: base(phase)
{
if (eventTypes == null) throw new ArgumentNullException("eventTypes");
if (eventTypes.Length == 0) throw new ArgumentException("eventTypes");
_eventTypes = new HashSet<string>(eventTypes);
if (includeStreamDeletedNotification)
_eventTypes.Add("$deleted");
_streams = new HashSet<string>(from eventType in eventTypes select "$et-" + eventType);
_streamToEventType = eventTypes.ToDictionary(v => "$et-" + v, v => v);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class ReaderStrategy : IReaderStrategy
private readonly bool _allEvents;
private readonly bool _includeLinks;
private readonly HashSet<string> _events;
private readonly bool _includeStreamDeletedNotification;
private readonly string _catalogStream;
private readonly bool _reorderEvents;
private readonly IPrincipal _runAs;
Expand Down Expand Up @@ -113,14 +114,14 @@ public static IReaderStrategy Create(int phase, IQuerySources sources, ITimeProv

var readerStrategy = new ReaderStrategy(
phase, sources.AllStreams, sources.Categories, sources.Streams, sources.AllEvents,
sources.IncludeLinksOption, sources.Events, sources.CatalogStream, sources.ProcessingLagOption,
sources.ReorderEventsOption, runAs, timeProvider);
sources.IncludeLinksOption, sources.Events, sources.HandlesDeletedNotifications, sources.CatalogStream,
sources.ProcessingLagOption, sources.ReorderEventsOption, runAs, timeProvider);
return readerStrategy;
}

private ReaderStrategy(
int phase, bool allStreams, string[] categories, string[] streams, bool allEvents, bool includeLinks,
string[] events, string catalogStream, int? processingLag, bool reorderEvents, IPrincipal runAs,
string[] events, bool includeStreamDeletedNotification, string catalogStream, int? processingLag, bool reorderEvents, IPrincipal runAs,
ITimeProvider timeProvider)
{
_phase = phase;
Expand All @@ -130,6 +131,7 @@ private ReaderStrategy(
_allEvents = allEvents;
_includeLinks = includeLinks;
_events = events != null && events.Length > 0 ? new HashSet<string>(events) : null;
_includeStreamDeletedNotification = includeStreamDeletedNotification;
_catalogStream = catalogStream;
_processingLag = processingLag.GetValueOrDefault();
_reorderEvents = reorderEvents;
Expand Down Expand Up @@ -187,7 +189,8 @@ public IEventReader CreatePausedEventReader(
{
//IEnumerable<string> streams = GetEventIndexStreams();
return CreatePausedEventIndexEventReader(
eventReaderId, ioDispatcher, publisher, checkpointTag, stopOnEof, stopAfterNEvents, true, _events);
eventReaderId, ioDispatcher, publisher, checkpointTag, stopOnEof, stopAfterNEvents, true, _events,
_includeStreamDeletedNotification);
}
if (_allStreams)
{
Expand Down Expand Up @@ -249,7 +252,7 @@ private EventFilter CreateEventFilter()
private PositionTagger CreatePositionTagger()
{
if (_allStreams && _events != null && _events.Count >= 1)
return new EventByTypeIndexPositionTagger(_phase, _events.ToArray());
return new EventByTypeIndexPositionTagger(_phase, _events.ToArray(), _includeStreamDeletedNotification);
if (_allStreams && _reorderEvents)
return new PreparePositionTagger(_phase);
if (_allStreams)
Expand Down Expand Up @@ -285,16 +288,20 @@ private IEventReader CreatePausedStreamEventReader(

private IEventReader CreatePausedEventIndexEventReader(
Guid eventReaderId, IODispatcher ioDispatcher, IPublisher publisher, CheckpointTag checkpointTag,
bool stopOnEof, int? stopAfterNEvents, bool resolveLinkTos, IEnumerable<string> eventTypes)
bool stopOnEof, int? stopAfterNEvents, bool resolveLinkTos, IEnumerable<string> eventTypes,
bool includeStreamDeletedNotification)
{
//NOTE: just optimization - anyway if reading from TF events may reappear
int p;
var nextPositions = eventTypes.ToDictionary(
v => "$et-" + v, v => checkpointTag.Streams.TryGetValue(v, out p) ? p + 1 : 0);

if (includeStreamDeletedNotification)
nextPositions.Add("$et-$deleted", checkpointTag.Streams.TryGetValue("$deleted", out p) ? p + 1 : 0);

return new EventByTypeIndexEventReader(
ioDispatcher, publisher, eventReaderId, _runAs, eventTypes.ToArray(), checkpointTag.Position,
nextPositions, resolveLinkTos, _timeProvider, stopOnEof, stopAfterNEvents);
ioDispatcher, publisher, eventReaderId, _runAs, eventTypes.ToArray(), includeStreamDeletedNotification,
checkpointTag.Position, nextPositions, resolveLinkTos, _timeProvider, stopOnEof, stopAfterNEvents);
}

private IEventReader CreatePausedMultiStreamEventReader(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
//

using System;
using System.Collections.Generic;
using System.Text;
using EventStore.Common.Utils;
using EventStore.Core.Data;
using EventStore.Core.Services;
using EventStore.Core.TransactionLog.LogRecords;
using Newtonsoft.Json.Linq;

namespace EventStore.Projections.Core.Services.Processing
{
Expand All @@ -55,6 +58,7 @@ public class ResolvedEvent
public readonly string Metadata;
public readonly string PositionMetadata;
public readonly string StreamMetadata;
public readonly bool IsLinkToStreamDeleted;

public ResolvedEvent(EventStore.Core.Data.ResolvedEvent resolvedEvent, byte[] streamMetadata)
{
Expand Down Expand Up @@ -84,11 +88,32 @@ public ResolvedEvent(EventStore.Core.Data.ResolvedEvent resolvedEvent, byte[] st
{
if (positionEvent.Metadata != null && positionEvent.Metadata.Length > 0)
{
var parsedPosition =
positionEvent.Metadata.ParseCheckpointTagJson().Position;
//TODO: parse JSON only when unresolved link and just tag otherwise
Dictionary<string, JToken> extraMetadata = null;
CheckpointTag tag;
if (resolvedEvent.Link != null && resolvedEvent.Event == null)
{
var checkpointTagJson =
positionEvent.Metadata.ParseCheckpointTagVersionExtraJson(default(ProjectionVersion));
tag = checkpointTagJson.Tag;
extraMetadata = checkpointTagJson.ExtraMetadata;
}
else
{
tag = positionEvent.Metadata.ParseCheckpointTagJson();
}
var parsedPosition = tag.Position;
originalPosition = parsedPosition != new TFPos(long.MinValue, long.MinValue)
? parsedPosition
: new TFPos(-1, resolvedEvent.OriginalEvent.LogPosition);
JToken deletedValue;
if (extraMetadata != null && extraMetadata.TryGetValue("$deleted", out deletedValue))
{
IsLinkToStreamDeleted = true;
var streamId= SystemEventTypes.StreamReferenceEventToStreamId(
SystemEventTypes.LinkTo, resolvedEvent.Link.Data);
_eventStreamId = streamId;
}
}
else
originalPosition = new TFPos(-1, resolvedEvent.OriginalEvent.LogPosition);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ private void DeliverEvent(
var isStreamDeletedEvent = StreamDeletedHelper.IsStreamDeletedEvent(
resolvedEvent.PositionStreamId, resolvedEvent.EventType, resolvedEvent.Data, out positionStreamId);
if (isStreamDeletedEvent)
_publisher.Publish(
_publisher.Publish(
new ReaderSubscriptionMessage.EventReaderPartitionDeleted(
EventReaderCorrelationId, positionStreamId, source: this.GetType(), lastEventNumber: -1,
deleteEventPosition: resolvedEvent.OriginalPosition, positionStreamId: null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ public static class StreamDeletedHelper
public static bool IsStreamDeletedEvent(
string streamOrMetaStreamId, string eventType, string eventData, out string streamId)
{
if (string.IsNullOrEmpty(streamOrMetaStreamId))
{
streamId = null;
return false;
}
bool isMetaStream;
if (SystemStreams.IsMetastream(streamOrMetaStreamId))
{
Expand Down

0 comments on commit 2e6704b

Please sign in to comment.