Skip to content

Commit

Permalink
Added: test showing the problem with missing events when reading
Browse files Browse the repository at this point in the history
event-by-type-index and a stream gets deleted
  • Loading branch information
ysw committed Feb 14, 2014
1 parent bc0720f commit 1fd6ca2
Show file tree
Hide file tree
Showing 7 changed files with 153 additions and 23 deletions.
14 changes: 12 additions & 2 deletions src/EventStore/EventStore.Core/Data/ResolvedEvent.cs
Expand Up @@ -39,35 +39,45 @@ public struct ResolvedEvent
/// Position of the OriginalEvent (unresolved link or event) if available
/// </summary>
public readonly TFPos? OriginalPosition;

public readonly ReadEventResult ResolveResult;

public string OriginalStreamId { get { return OriginalEvent.EventStreamId; } }
public int OriginalEventNumber { get { return OriginalEvent.EventNumber; } }

public ResolvedEvent(EventRecord @event, EventRecord link)
public ResolvedEvent(
EventRecord @event, EventRecord link, ReadEventResult resolveResult = default(ReadEventResult))
{
Event = @event;
Link = link;
OriginalPosition = null;
ResolveResult = resolveResult;
}

public ResolvedEvent(EventRecord @event, EventRecord link, long commitPosition)
public ResolvedEvent(
EventRecord @event, EventRecord link, long commitPosition,
ReadEventResult resolveResult = default(ReadEventResult))
{
Event = @event;
Link = link;
OriginalPosition = new TFPos(commitPosition, (link ?? @event).LogPosition);
ResolveResult = resolveResult;
}

public ResolvedEvent(EventRecord @event)
{
Event = @event;
Link = null;
OriginalPosition = null;
ResolveResult = default(ReadEventResult);
}

public ResolvedEvent(EventRecord @event, long commitPosition)
{
Event = @event;
Link = null;
OriginalPosition = new TFPos(commitPosition, @event.LogPosition);
ResolveResult = default(ReadEventResult);
}
}
}
Expand Up @@ -437,14 +437,15 @@ private ResolvedEvent[] ResolveLinkToEvents(EventRecord[] records, bool resolveL

var res = _readIndex.ReadEvent(streamId, eventNumber);
if (res.Result == ReadEventResult.Success)
return new ResolvedEvent(res.Record, eventRecord);
return new ResolvedEvent(res.Record, eventRecord, ReadEventResult.Success);
return new ResolvedEvent(null, eventRecord, res.Result);
}
catch (Exception exc)
{
Log.ErrorException(exc, "Error while resolving link for event record: {0}", eventRecord.ToString());
}
// return unresolved link
return new ResolvedEvent(null, eventRecord);
return new ResolvedEvent(null, eventRecord, ReadEventResult.Error);
}
return new ResolvedEvent(eventRecord);
}
Expand All @@ -460,14 +461,17 @@ private ResolvedEvent[] ResolveReadAllResult(IList<CommitEventRecord> records, b
var resolvedPair = ResolveLinkToEvent(record.Event, user);
if (resolvedPair == null)
return null;
result[i] = new ResolvedEvent(resolvedPair.Value.Event, resolvedPair.Value.Link, record.CommitPosition);
result[i] = new ResolvedEvent(
resolvedPair.Value.Event, resolvedPair.Value.Link, record.CommitPosition,
resolvedPair.Value.ResolveResult);
}
}
else
{
for (int i = 0; i < result.Length; ++i)
{
result[i] = new ResolvedEvent(records[i].Event, null, records[i].CommitPosition);
result[i] = new ResolvedEvent(
records[i].Event, null, records[i].CommitPosition, default(ReadEventResult));
}
}
return result;
Expand Down
15 changes: 9 additions & 6 deletions src/EventStore/EventStore.Core/Services/SubscriptionsService.cs
Expand Up @@ -291,19 +291,21 @@ public void Handle(StorageMessage.EventCommited message)
ReissueReadsFor(message.Event.EventStreamId, message.CommitPosition, message.Event.EventNumber);
}

private ResolvedEvent? ProcessEventCommited(string eventStreamId, long commitPosition, EventRecord evnt, ResolvedEvent? resolvedEvent)
private ResolvedEvent? ProcessEventCommited(
string eventStreamId, long commitPosition, EventRecord evnt, ResolvedEvent? resolvedEvent)
{
List<Subscription> subscriptions;
if (!_subscriptionTopics.TryGetValue(eventStreamId, out subscriptions))
if (!_subscriptionTopics.TryGetValue(eventStreamId, out subscriptions))
return resolvedEvent;
for (int i = 0, n = subscriptions.Count; i < n; i++)
{
var subscr = subscriptions[i];
if (commitPosition <= subscr.LastCommitPosition || evnt.EventNumber <= subscr.LastEventNumber)
continue;

var pair = new ResolvedEvent(evnt, null, commitPosition);
var pair = new ResolvedEvent(evnt, null, commitPosition, default(ReadEventResult));
if (subscr.ResolveLinkTos)
// resolve event if has not been previously resolved
resolvedEvent = pair = resolvedEvent ?? ResolveLinkToEvent(evnt, commitPosition);

subscr.Envelope.ReplyWith(new ClientMessage.StreamEventAppeared(subscr.CorrelationId, pair));
Expand All @@ -323,16 +325,17 @@ private ResolvedEvent ResolveLinkToEvent(EventRecord eventRecord, long commitPos

var res = _readIndex.ReadEvent(streamId, eventNumber);
if (res.Result == ReadEventResult.Success)
return new ResolvedEvent(res.Record, eventRecord, commitPosition);
return new ResolvedEvent(res.Record, eventRecord, commitPosition, ReadEventResult.Success);
return new ResolvedEvent(null, eventRecord, res.Result);
}
catch (Exception exc)
{
Log.ErrorException(exc, "Error while resolving link for event record: {0}", eventRecord.ToString());
}
// return unresolved link
return new ResolvedEvent(null, eventRecord, commitPosition);
return new ResolvedEvent(null, eventRecord, commitPosition, ReadEventResult.Error);
}
return new ResolvedEvent(eventRecord, null, commitPosition);
return new ResolvedEvent(eventRecord, null, commitPosition, default(ReadEventResult));
}

private void ReissueReadsFor(string streamId, long commitPosition, int eventNumber)
Expand Down
Expand Up @@ -131,7 +131,10 @@ public static FeedElement ToAllEventsForwardFeed(ClientMessage.ReadAllEventsForw
feed.AddLink("metadata", HostName.Combine(requestedUrl, "/streams/{0}/metadata", AllEscaped));
for (int i = msg.Events.Length - 1; i >= 0; --i)
{
feed.AddEntry(ToEntry(new ResolvedEvent(msg.Events[i].Event, msg.Events[i].Link), requestedUrl, embedContent));
feed.AddEntry(
ToEntry(
new ResolvedEvent(msg.Events[i].Event, msg.Events[i].Link, msg.Events[i].ResolveResult),
requestedUrl, embedContent));
}
return feed;
}
Expand All @@ -156,7 +159,10 @@ public static FeedElement ToAllEventsBackwardFeed(ClientMessage.ReadAllEventsBac
feed.AddLink("metadata", HostName.Combine(requestedUrl, "/streams/{0}/metadata", AllEscaped));
for (int i = 0; i < msg.Events.Length; ++i)
{
feed.AddEntry(ToEntry(new ResolvedEvent(msg.Events[i].Event, msg.Events[i].Link), requestedUrl, embedContent));
feed.AddEntry(
ToEntry(
new ResolvedEvent(msg.Events[i].Event, msg.Events[i].Link, msg.Events[i].ResolveResult),
requestedUrl, embedContent));
}
return feed;
}
Expand Down
@@ -0,0 +1,79 @@
// Copyright (c) 2012, Event Store LLP
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// Neither the name of the Event Store LLP nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

using NUnit.Framework;

namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete
{
[TestFixture]
public class with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone :
specification_with_standard_projections_runnning
{
protected override bool GivenStandardProjectionsRunning()
{
return false;
}

protected override void Given()
{
base.Given();
PostEvent("stream1", "type1", "{}");
PostEvent("stream1", "type2", "{}");
PostEvent("stream2", "type1", "{}");
PostEvent("stream2", "type2", "{}");
WaitIdle();
EnableStandardProjections();
WaitIdle();
DisableStandardProjections();
WaitIdle();
HardDeleteStream("stream1");
WaitIdle();
}

protected override void When()
{
base.When();
PostProjection(@"
fromAll().foreachStream().when({
$init: function(){return {}},
type1: function(s,e){s.a=1},
type2: function(s,e){s.a=1},
$deleted: function(s,e){s.deleted=1},
}).outputState();
");
WaitIdle();
}

[Test, Category("Network")]
public void receives_deleted_notification()
{
AssertStreamTail(
"$projections-test-projection-stream1-result", "Result:{\"a\":1}", "Result:{\"a\":1,\"deleted\":1}");
}
}
}
@@ -1,10 +1,38 @@
using System.Threading;
// Copyright (c) 2012, Event Store LLP
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// Neither the name of the Event Store LLP nor the names of its
// contributors may be used to endorse or promote products derived from
// this software without specific prior written permission
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//

using NUnit.Framework;

namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete
{
[TestFixture]
public class with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone : specification_with_standard_projections_runnning
public class with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone :
specification_with_standard_projections_runnning
{
protected override bool GivenStandardProjectionsRunning()
{
Expand All @@ -15,27 +43,26 @@ protected override void Given()
{
base.Given();
PostEvent("stream1", "type1", "{}");
PostEvent("stream1", "type2", "{}");
PostEvent("stream2", "type1", "{}");
PostEvent("stream2", "type2", "{}");
WaitIdle();
EnableStandardProjections();
WaitIdle();
DisableStandardProjections();
WaitIdle();
PostEvent("stream1", "type2", "{}");
HardDeleteStream("stream1");
WaitIdle();

}

protected override void When()
{
base.When();
PostProjection(@"
fromAll().foreachStream().when({
$init: function(){return {}},
type1: function(s,e){s.a=1},
type2: function(s,e){s.a=1},
$init: function(){return {a:0}},
type1: function(s,e){s.a++},
type2: function(s,e){},
$deleted: function(s,e){s.deleted=1},
}).outputState();
");
Expand All @@ -46,7 +73,7 @@ protected override void When()
public void receives_deleted_notification()
{
AssertStreamTail(
"$projections-test-projection-stream1-result", "Result:{\"a\":1}", "Result:{\"a\":1,\"deleted\":1}");
"$projections-test-projection-stream1-result", "Result:{\"a\":0,\"deleted\":1}");
}
}
}
}
Expand Up @@ -110,6 +110,7 @@
<Compile Include="ClientAPI\specification_with_standard_projections_runnning.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs" />
<Compile Include="ClientAPI\when_handling_delete\with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone.cs" />
<Compile Include="ClientAPI\with_standard_projections_running.cs" />
<Compile Include="Integration\from_streams_matching\when_running_without_stream_metadata.cs" />
Expand Down

0 comments on commit 1fd6ca2

Please sign in to comment.