From 1fd6ca227dab767d9fb5e3a0c368b4cff64d8098 Mon Sep 17 00:00:00 2001 From: Yuriy Solodkyy Date: Fri, 14 Feb 2014 14:55:02 +0200 Subject: [PATCH] Added: test showing the problem with missing events when reading event-by-type-index and a stream gets deleted --- .../EventStore.Core/Data/ResolvedEvent.cs | 14 +++- .../Services/Storage/StorageReaderWorker.cs | 12 ++- .../Services/SubscriptionsService.cs | 15 ++-- .../Services/Transport/Http/Convert.cs | 10 ++- ...e_indexed_but_more_events_and_tombstone.cs | 79 +++++++++++++++++++ ...ng_and_events_are_indexed_but_tombstone.cs | 45 ++++++++--- .../EventStore.Projections.Core.Tests.csproj | 1 + 7 files changed, 153 insertions(+), 23 deletions(-) create mode 100644 src/EventStore/EventStore.Projections.Core.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs diff --git a/src/EventStore/EventStore.Core/Data/ResolvedEvent.cs b/src/EventStore/EventStore.Core/Data/ResolvedEvent.cs index 6522c1efaa0..d9c79832f13 100644 --- a/src/EventStore/EventStore.Core/Data/ResolvedEvent.cs +++ b/src/EventStore/EventStore.Core/Data/ResolvedEvent.cs @@ -39,21 +39,29 @@ public struct ResolvedEvent /// Position of the OriginalEvent (unresolved link or event) if available /// public readonly TFPos? OriginalPosition; + + public readonly ReadEventResult ResolveResult; + public string OriginalStreamId { get { return OriginalEvent.EventStreamId; } } public int OriginalEventNumber { get { return OriginalEvent.EventNumber; } } - public ResolvedEvent(EventRecord @event, EventRecord link) + public ResolvedEvent( + EventRecord @event, EventRecord link, ReadEventResult resolveResult = default(ReadEventResult)) { Event = @event; Link = link; OriginalPosition = null; + ResolveResult = resolveResult; } - public ResolvedEvent(EventRecord @event, EventRecord link, long commitPosition) + public ResolvedEvent( + EventRecord @event, EventRecord link, long commitPosition, + ReadEventResult resolveResult = default(ReadEventResult)) { Event = @event; Link = link; OriginalPosition = new TFPos(commitPosition, (link ?? @event).LogPosition); + ResolveResult = resolveResult; } public ResolvedEvent(EventRecord @event) @@ -61,6 +69,7 @@ public ResolvedEvent(EventRecord @event) Event = @event; Link = null; OriginalPosition = null; + ResolveResult = default(ReadEventResult); } public ResolvedEvent(EventRecord @event, long commitPosition) @@ -68,6 +77,7 @@ public ResolvedEvent(EventRecord @event, long commitPosition) Event = @event; Link = null; OriginalPosition = new TFPos(commitPosition, @event.LogPosition); + ResolveResult = default(ReadEventResult); } } } \ No newline at end of file diff --git a/src/EventStore/EventStore.Core/Services/Storage/StorageReaderWorker.cs b/src/EventStore/EventStore.Core/Services/Storage/StorageReaderWorker.cs index 9fe15543233..436e6440ca7 100644 --- a/src/EventStore/EventStore.Core/Services/Storage/StorageReaderWorker.cs +++ b/src/EventStore/EventStore.Core/Services/Storage/StorageReaderWorker.cs @@ -437,14 +437,15 @@ private ResolvedEvent[] ResolveLinkToEvents(EventRecord[] records, bool resolveL var res = _readIndex.ReadEvent(streamId, eventNumber); if (res.Result == ReadEventResult.Success) - return new ResolvedEvent(res.Record, eventRecord); + return new ResolvedEvent(res.Record, eventRecord, ReadEventResult.Success); + return new ResolvedEvent(null, eventRecord, res.Result); } catch (Exception exc) { Log.ErrorException(exc, "Error while resolving link for event record: {0}", eventRecord.ToString()); } // return unresolved link - return new ResolvedEvent(null, eventRecord); + return new ResolvedEvent(null, eventRecord, ReadEventResult.Error); } return new ResolvedEvent(eventRecord); } @@ -460,14 +461,17 @@ private ResolvedEvent[] ResolveReadAllResult(IList records, b var resolvedPair = ResolveLinkToEvent(record.Event, user); if (resolvedPair == null) return null; - result[i] = new ResolvedEvent(resolvedPair.Value.Event, resolvedPair.Value.Link, record.CommitPosition); + result[i] = new ResolvedEvent( + resolvedPair.Value.Event, resolvedPair.Value.Link, record.CommitPosition, + resolvedPair.Value.ResolveResult); } } else { for (int i = 0; i < result.Length; ++i) { - result[i] = new ResolvedEvent(records[i].Event, null, records[i].CommitPosition); + result[i] = new ResolvedEvent( + records[i].Event, null, records[i].CommitPosition, default(ReadEventResult)); } } return result; diff --git a/src/EventStore/EventStore.Core/Services/SubscriptionsService.cs b/src/EventStore/EventStore.Core/Services/SubscriptionsService.cs index b45fbd172e6..eccd36809d6 100644 --- a/src/EventStore/EventStore.Core/Services/SubscriptionsService.cs +++ b/src/EventStore/EventStore.Core/Services/SubscriptionsService.cs @@ -291,10 +291,11 @@ public void Handle(StorageMessage.EventCommited message) ReissueReadsFor(message.Event.EventStreamId, message.CommitPosition, message.Event.EventNumber); } - private ResolvedEvent? ProcessEventCommited(string eventStreamId, long commitPosition, EventRecord evnt, ResolvedEvent? resolvedEvent) + private ResolvedEvent? ProcessEventCommited( + string eventStreamId, long commitPosition, EventRecord evnt, ResolvedEvent? resolvedEvent) { List subscriptions; - if (!_subscriptionTopics.TryGetValue(eventStreamId, out subscriptions)) + if (!_subscriptionTopics.TryGetValue(eventStreamId, out subscriptions)) return resolvedEvent; for (int i = 0, n = subscriptions.Count; i < n; i++) { @@ -302,8 +303,9 @@ public void Handle(StorageMessage.EventCommited message) if (commitPosition <= subscr.LastCommitPosition || evnt.EventNumber <= subscr.LastEventNumber) continue; - var pair = new ResolvedEvent(evnt, null, commitPosition); + var pair = new ResolvedEvent(evnt, null, commitPosition, default(ReadEventResult)); if (subscr.ResolveLinkTos) + // resolve event if has not been previously resolved resolvedEvent = pair = resolvedEvent ?? ResolveLinkToEvent(evnt, commitPosition); subscr.Envelope.ReplyWith(new ClientMessage.StreamEventAppeared(subscr.CorrelationId, pair)); @@ -323,16 +325,17 @@ private ResolvedEvent ResolveLinkToEvent(EventRecord eventRecord, long commitPos var res = _readIndex.ReadEvent(streamId, eventNumber); if (res.Result == ReadEventResult.Success) - return new ResolvedEvent(res.Record, eventRecord, commitPosition); + return new ResolvedEvent(res.Record, eventRecord, commitPosition, ReadEventResult.Success); + return new ResolvedEvent(null, eventRecord, res.Result); } catch (Exception exc) { Log.ErrorException(exc, "Error while resolving link for event record: {0}", eventRecord.ToString()); } // return unresolved link - return new ResolvedEvent(null, eventRecord, commitPosition); + return new ResolvedEvent(null, eventRecord, commitPosition, ReadEventResult.Error); } - return new ResolvedEvent(eventRecord, null, commitPosition); + return new ResolvedEvent(eventRecord, null, commitPosition, default(ReadEventResult)); } private void ReissueReadsFor(string streamId, long commitPosition, int eventNumber) diff --git a/src/EventStore/EventStore.Core/Services/Transport/Http/Convert.cs b/src/EventStore/EventStore.Core/Services/Transport/Http/Convert.cs index ce88074a620..c04ce3e25b2 100644 --- a/src/EventStore/EventStore.Core/Services/Transport/Http/Convert.cs +++ b/src/EventStore/EventStore.Core/Services/Transport/Http/Convert.cs @@ -131,7 +131,10 @@ public static FeedElement ToAllEventsForwardFeed(ClientMessage.ReadAllEventsForw feed.AddLink("metadata", HostName.Combine(requestedUrl, "/streams/{0}/metadata", AllEscaped)); for (int i = msg.Events.Length - 1; i >= 0; --i) { - feed.AddEntry(ToEntry(new ResolvedEvent(msg.Events[i].Event, msg.Events[i].Link), requestedUrl, embedContent)); + feed.AddEntry( + ToEntry( + new ResolvedEvent(msg.Events[i].Event, msg.Events[i].Link, msg.Events[i].ResolveResult), + requestedUrl, embedContent)); } return feed; } @@ -156,7 +159,10 @@ public static FeedElement ToAllEventsBackwardFeed(ClientMessage.ReadAllEventsBac feed.AddLink("metadata", HostName.Combine(requestedUrl, "/streams/{0}/metadata", AllEscaped)); for (int i = 0; i < msg.Events.Length; ++i) { - feed.AddEntry(ToEntry(new ResolvedEvent(msg.Events[i].Event, msg.Events[i].Link), requestedUrl, embedContent)); + feed.AddEntry( + ToEntry( + new ResolvedEvent(msg.Events[i].Event, msg.Events[i].Link, msg.Events[i].ResolveResult), + requestedUrl, embedContent)); } return feed; } diff --git a/src/EventStore/EventStore.Projections.Core.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs b/src/EventStore/EventStore.Projections.Core.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs new file mode 100644 index 00000000000..a4511426e7e --- /dev/null +++ b/src/EventStore/EventStore.Projections.Core.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone.cs @@ -0,0 +1,79 @@ +// Copyright (c) 2012, Event Store LLP +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// Neither the name of the Event Store LLP nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + +using NUnit.Framework; + +namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete +{ + [TestFixture] + public class with_from_all_foreach_projection_running_and_events_are_indexed_but_more_events_and_tombstone : + specification_with_standard_projections_runnning + { + protected override bool GivenStandardProjectionsRunning() + { + return false; + } + + protected override void Given() + { + base.Given(); + PostEvent("stream1", "type1", "{}"); + PostEvent("stream1", "type2", "{}"); + PostEvent("stream2", "type1", "{}"); + PostEvent("stream2", "type2", "{}"); + WaitIdle(); + EnableStandardProjections(); + WaitIdle(); + DisableStandardProjections(); + WaitIdle(); + HardDeleteStream("stream1"); + WaitIdle(); + } + + protected override void When() + { + base.When(); + PostProjection(@" +fromAll().foreachStream().when({ + $init: function(){return {}}, + type1: function(s,e){s.a=1}, + type2: function(s,e){s.a=1}, + $deleted: function(s,e){s.deleted=1}, +}).outputState(); +"); + WaitIdle(); + } + + [Test, Category("Network")] + public void receives_deleted_notification() + { + AssertStreamTail( + "$projections-test-projection-stream1-result", "Result:{\"a\":1}", "Result:{\"a\":1,\"deleted\":1}"); + } + } +} diff --git a/src/EventStore/EventStore.Projections.Core.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone.cs b/src/EventStore/EventStore.Projections.Core.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone.cs index c15f32784c5..9d3c056a34e 100644 --- a/src/EventStore/EventStore.Projections.Core.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone.cs +++ b/src/EventStore/EventStore.Projections.Core.Tests/ClientAPI/when_handling_delete/with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone.cs @@ -1,10 +1,38 @@ -using System.Threading; +// Copyright (c) 2012, Event Store LLP +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// Neither the name of the Event Store LLP nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// + using NUnit.Framework; namespace EventStore.Projections.Core.Tests.ClientAPI.when_handling_delete { [TestFixture] - public class with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone : specification_with_standard_projections_runnning + public class with_from_all_foreach_projection_running_and_events_are_indexed_but_tombstone : + specification_with_standard_projections_runnning { protected override bool GivenStandardProjectionsRunning() { @@ -15,7 +43,6 @@ protected override void Given() { base.Given(); PostEvent("stream1", "type1", "{}"); - PostEvent("stream1", "type2", "{}"); PostEvent("stream2", "type1", "{}"); PostEvent("stream2", "type2", "{}"); WaitIdle(); @@ -23,9 +50,9 @@ protected override void Given() WaitIdle(); DisableStandardProjections(); WaitIdle(); + PostEvent("stream1", "type2", "{}"); HardDeleteStream("stream1"); WaitIdle(); - } protected override void When() @@ -33,9 +60,9 @@ protected override void When() base.When(); PostProjection(@" fromAll().foreachStream().when({ - $init: function(){return {}}, - type1: function(s,e){s.a=1}, - type2: function(s,e){s.a=1}, + $init: function(){return {a:0}}, + type1: function(s,e){s.a++}, + type2: function(s,e){}, $deleted: function(s,e){s.deleted=1}, }).outputState(); "); @@ -46,7 +73,7 @@ protected override void When() public void receives_deleted_notification() { AssertStreamTail( - "$projections-test-projection-stream1-result", "Result:{\"a\":1}", "Result:{\"a\":1,\"deleted\":1}"); + "$projections-test-projection-stream1-result", "Result:{\"a\":0,\"deleted\":1}"); } } -} \ No newline at end of file +} diff --git a/src/EventStore/EventStore.Projections.Core.Tests/EventStore.Projections.Core.Tests.csproj b/src/EventStore/EventStore.Projections.Core.Tests/EventStore.Projections.Core.Tests.csproj index 07230d4c1a4..05aa8a4562a 100644 --- a/src/EventStore/EventStore.Projections.Core.Tests/EventStore.Projections.Core.Tests.csproj +++ b/src/EventStore/EventStore.Projections.Core.Tests/EventStore.Projections.Core.Tests.csproj @@ -110,6 +110,7 @@ +