From 6246f21071fc0d190139aac472eca65a04971523 Mon Sep 17 00:00:00 2001 From: Steven Blair Date: Sun, 9 May 2021 16:43:10 +0100 Subject: [PATCH 01/14] Prevent scavenged events from being passed to ExecuteHandler --- .../Services/v8/V8ProjectionStateHandler.cs | 30 ++++++++++++------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs index 7083785966a..57788363828 100644 --- a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs @@ -10,6 +10,9 @@ using Serilog; namespace EventStore.Projections.Core.Services.v8 { + using System.Threading; + using Client.PersistentSubscriptions; + public class V8ProjectionStateHandler : IProjectionStateHandler { private readonly PreludeScript _prelude; private readonly QueryScript _query; @@ -61,7 +64,7 @@ public class EmittedEventJsonContract { private string GetEventData(ResolvedEvent evnt) { if (_enableContentTypeValidation) { - return (evnt.IsJson ? evnt.Data : evnt.Data ?? string.Empty)?.Trim(); + return (evnt.IsJson ? evnt.Data : evnt.Data ?? null)?.Trim(); //Callers all expect null for no data. } return evnt.Data?.Trim(); } @@ -126,17 +129,24 @@ public class EmittedEventJsonContract { CheckpointTag eventPosition, string category, ResolvedEvent @event) { CheckDisposed(); if (@event == null) throw new ArgumentNullException("event"); - var partition = _query.GetPartition( - GetEventData(@event), - new string[] { - @event.EventStreamId, @event.IsJson ? "1" : "", @event.EventType, category ?? "", - @event.EventSequenceNumber.ToString(CultureInfo.InvariantCulture), @event.Metadata ?? "", - @event.PositionMetadata ?? "" - }); + + var eventData = GetEventData(@event); + + if (string.IsNullOrEmpty(eventData)) { + //Nothing to actually process + return null; + } + + var partition = _query.GetPartition(eventData, + new string[] { + @event.EventStreamId, @event.IsJson ? "1" : "", @event.EventType, category ?? "", + @event.EventSequenceNumber.ToString(CultureInfo.InvariantCulture), @event.Metadata ?? "", + @event.PositionMetadata ?? "" + }); if (partition == "") return null; - else - return partition; + + return partition; } public bool ProcessEvent( From aca8cbe523f243cbe399837b51570b74c270acef Mon Sep 17 00:00:00 2001 From: Steven Blair Date: Sun, 9 May 2021 17:25:36 +0100 Subject: [PATCH 02/14] Changed GetEventData to return empty string as it broke a unit test. --- .../Services/v8/V8ProjectionStateHandler.cs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs index 57788363828..2bab7bef86b 100644 --- a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs @@ -64,7 +64,7 @@ public class EmittedEventJsonContract { private string GetEventData(ResolvedEvent evnt) { if (_enableContentTypeValidation) { - return (evnt.IsJson ? evnt.Data : evnt.Data ?? null)?.Trim(); //Callers all expect null for no data. + return (evnt.IsJson ? evnt.Data : evnt.Data ?? string.Empty)?.Trim(); } return evnt.Data?.Trim(); } @@ -158,7 +158,7 @@ public class EmittedEventJsonContract { Tuple newStates = null; var data = GetEventData(@event); - if (@event == null || data == null) { + if (@event == null || data == null) { //TODO: change to String.IsNullOrEmpty on data? newStates = _query.Push( "", new string[] { }); @@ -201,7 +201,7 @@ public class EmittedEventJsonContract { _emittedEvents = null; var data = GetEventData(@event); - if (@event == null || data == null) { + if (@event == null || data == null) { //TODO: change to String.IsNullOrEmpty on data? emittedEvents = null; return true; } From 6e8651bbdedbda6f464646b8b8d747ca35d4c8cb Mon Sep 17 00:00:00 2001 From: Steven Blair Date: Sun, 9 May 2021 18:05:31 +0100 Subject: [PATCH 03/14] Tweak to code to force CI --- .../Services/v8/V8ProjectionStateHandler.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs index 2bab7bef86b..5e1c065678a 100644 --- a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs @@ -130,7 +130,7 @@ public class EmittedEventJsonContract { CheckDisposed(); if (@event == null) throw new ArgumentNullException("event"); - var eventData = GetEventData(@event); + string eventData = GetEventData(@event); if (string.IsNullOrEmpty(eventData)) { //Nothing to actually process From 7ee1ea40a16195997f23bc7d8c624487b7627a89 Mon Sep 17 00:00:00 2001 From: Steven Blair Date: Wed, 12 May 2021 10:54:15 +0100 Subject: [PATCH 04/14] Now checking the eventType before passing to GetPartition. --- .../Services/v8/V8ProjectionStateHandler.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs index 5e1c065678a..649b0c1357b 100644 --- a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs @@ -130,13 +130,14 @@ public class EmittedEventJsonContract { CheckDisposed(); if (@event == null) throw new ArgumentNullException("event"); - string eventData = GetEventData(@event); - - if (string.IsNullOrEmpty(eventData)) { + if (string.IsNullOrEmpty(@event.EventType)) { //Nothing to actually process return null; } + //Only get the event data if our previous checks passed. + string eventData = GetEventData(@event); + var partition = _query.GetPartition(eventData, new string[] { @event.EventStreamId, @event.IsJson ? "1" : "", @event.EventType, category ?? "", From d230881065e351a16e09d02c94542a9ebae84cad Mon Sep 17 00:00:00 2001 From: StuartFergusonVme Date: Thu, 13 May 2021 07:19:45 +0100 Subject: [PATCH 05/14] Handle trucated streams when reading events for projections --- .../Services/Storage/StorageReaderWorker.cs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs index a2a470468b1..8ca91c160b9 100644 --- a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs +++ b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs @@ -308,6 +308,17 @@ public class StorageReaderWorker : StorageReaderWorker, IHandle -1) { + result = _readIndex.ReadStreamEventsForward(streamName, streamId, result.NextEventNumber, msg.MaxCount); + } + } + } + CheckEventsOrder(msg, result); var resolvedPairs = ResolveLinkToEvents(result.Records, msg.ResolveLinkTos, msg.User); if (resolvedPairs == null) From 37f709f00db7e98d67e88c6d201a573d3e1f34e6 Mon Sep 17 00:00:00 2001 From: StuartFergusonVme Date: Fri, 14 May 2021 09:18:29 +0100 Subject: [PATCH 06/14] re-trigger CI --- src/EventStore.Core/Services/Storage/StorageReaderWorker.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs index 8ca91c160b9..9eff14ae50e 100644 --- a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs +++ b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs @@ -309,7 +309,7 @@ public class StorageReaderWorker : StorageReaderWorker, IHandle Date: Mon, 17 May 2021 12:15:02 +0100 Subject: [PATCH 07/14] WIP --- .../Services/Storage/StorageReaderWorker.cs | 12 +----------- .../Processing/EventByTypeIndexEventReader.cs | 4 ++-- .../Services/Processing/MultiStreamEventReader.cs | 2 +- .../Services/Processing/StreamEventReader.cs | 2 +- .../Processing/TransactionFileEventReader.cs | 2 +- 5 files changed, 6 insertions(+), 16 deletions(-) diff --git a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs index 9eff14ae50e..6b41e390b08 100644 --- a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs +++ b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs @@ -308,17 +308,7 @@ public class StorageReaderWorker : StorageReaderWorker, IHandle -1) { - result = _readIndex.ReadStreamEventsForward(streamName, streamId, result.NextEventNumber, msg.MaxCount); - } - } - } - + CheckEventsOrder(msg, result); var resolvedPairs = ResolveLinkToEvents(result.Records, msg.ResolveLinkTos, msg.User); if (resolvedPairs == null) diff --git a/src/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs index a1fc89be274..cdaaea5a75d 100644 --- a/src/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs @@ -251,11 +251,11 @@ public IndexBased(HashSet eventTypes, EventByTypeIndexEventReader reader break; case ReadStreamResult.Success: _reader.UpdateNextStreamPosition(message.EventStreamId, message.NextEventNumber); - var isEof = message.Events.Length == 0; - _eofs[message.EventStreamId] = isEof; + _eofs[message.EventStreamId] = message.IsEndOfStream; EnqueueEvents(message); ProcessBuffersAndContinue(eventStreamId: message.EventStreamId); break; + default: throw new NotSupportedException( String.Format("ReadEvents result code was not recognized. Code: {0}", message.Result)); diff --git a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs index d404c93fc2b..e0200b4c6a9 100644 --- a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs @@ -121,7 +121,7 @@ public class MultiStreamEventReader : EventReader, CheckEof(); break; case ReadStreamResult.Success: - if (message.Events.Length == 0) { + if (message.IsEndOfStream) { // the end _eofs[message.EventStreamId] = true; UpdateSafePositionToJoin(message.EventStreamId, MessageToLastCommitPosition(message)); diff --git a/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs index fb6518d45a3..4dae876b581 100644 --- a/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs @@ -90,7 +90,7 @@ public class StreamEventReader : EventReader, case ReadStreamResult.Success: var oldFromSequenceNumber = StartFrom(message, _fromSequenceNumber); _fromSequenceNumber = message.NextEventNumber; - var eof = message.Events.Length == 0; + var eof = message.IsEndOfStream; _eof = eof; var willDispose = eof && _stopOnEof; diff --git a/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs index 4167d2ce8fc..b4caf8f13fa 100644 --- a/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs @@ -63,7 +63,7 @@ public class TransactionFileEventReader : EventReader, return; } - var eof = message.Events.Length == 0; + var eof = message.IsEndOfStream; _eof = eof; var willDispose = _stopOnEof && eof; var oldFrom = _from; From cc76ccc1e296a79136d65237e2bf679bc2a4ce52 Mon Sep 17 00:00:00 2001 From: StuartFergusonVme Date: Mon, 17 May 2021 12:23:18 +0100 Subject: [PATCH 08/14] Updated EventByTypeIndexEventReader, MultiStreamEventReader, StreamEventReader, TransactionFileEventReader to handle EOF correctly as requested --- .../Services/v8/V8ProjectionStateHandler.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs index c844ce91e46..33a342eedce 100644 --- a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs @@ -125,6 +125,7 @@ public class EmittedEventJsonContract { _query.InitializeShared(); } + public string GetStatePartition( CheckpointTag eventPosition, string category, ResolvedEvent @event) { CheckDisposed(); From 38ba91a8d7109d39096e7148f5301ce54277deab Mon Sep 17 00:00:00 2001 From: StuartFergusonVme Date: Tue, 18 May 2021 06:47:13 +0100 Subject: [PATCH 09/14] Tweak to unt test as was sending bad request --- src/EventStore.Core.Tests/Http/Streams/feed.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/EventStore.Core.Tests/Http/Streams/feed.cs b/src/EventStore.Core.Tests/Http/Streams/feed.cs index d431f8a07d4..8ae9501e19b 100644 --- a/src/EventStore.Core.Tests/Http/Streams/feed.cs +++ b/src/EventStore.Core.Tests/Http/Streams/feed.cs @@ -393,7 +393,7 @@ public class private List _entries; protected override async Task When() { - _feed = await GetJson("/streams/" + LinkedStreamName + "/0/forward/10?embed=rich", + _feed = await GetJson("/streams/" + LinkedStreamName + "/0/forward/10",extra: "embed=rich", accept: ContentType.Json); _entries = _feed != null ? _feed["entries"].ToList() : new List(); } From 213fcd53e7019f2ce478dc6101d3342ffeb0fcb9 Mon Sep 17 00:00:00 2001 From: StuartFergusonVme Date: Tue, 18 May 2021 17:01:20 +0100 Subject: [PATCH 10/14] Further changes to EOF handling --- .../Services/Processing/EventByTypeIndexEventReader.cs | 2 +- .../Services/Processing/MultiStreamEventReader.cs | 2 +- .../Services/Processing/StreamEventReader.cs | 2 +- .../Services/Processing/TransactionFileEventReader.cs | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs index cdaaea5a75d..a01ec052884 100644 --- a/src/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/EventByTypeIndexEventReader.cs @@ -251,7 +251,7 @@ public IndexBased(HashSet eventTypes, EventByTypeIndexEventReader reader break; case ReadStreamResult.Success: _reader.UpdateNextStreamPosition(message.EventStreamId, message.NextEventNumber); - _eofs[message.EventStreamId] = message.IsEndOfStream; + _eofs[message.EventStreamId] = (message.Events.Length == 0) && message.IsEndOfStream; EnqueueEvents(message); ProcessBuffersAndContinue(eventStreamId: message.EventStreamId); break; diff --git a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs index e0200b4c6a9..c86d8653306 100644 --- a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs @@ -121,7 +121,7 @@ public class MultiStreamEventReader : EventReader, CheckEof(); break; case ReadStreamResult.Success: - if (message.IsEndOfStream) { + if ((message.Events.Length == 0) && message.IsEndOfStream) { // the end _eofs[message.EventStreamId] = true; UpdateSafePositionToJoin(message.EventStreamId, MessageToLastCommitPosition(message)); diff --git a/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs index 4dae876b581..cb07d659e64 100644 --- a/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs @@ -90,7 +90,7 @@ public class StreamEventReader : EventReader, case ReadStreamResult.Success: var oldFromSequenceNumber = StartFrom(message, _fromSequenceNumber); _fromSequenceNumber = message.NextEventNumber; - var eof = message.IsEndOfStream; + var eof = (message.Events.Length == 0) && message.IsEndOfStream; _eof = eof; var willDispose = eof && _stopOnEof; diff --git a/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs index b4caf8f13fa..3b154d4ed48 100644 --- a/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs @@ -63,7 +63,7 @@ public class TransactionFileEventReader : EventReader, return; } - var eof = message.IsEndOfStream; + var eof = (message.Events.Length == 0) && message.IsEndOfStream; _eof = eof; var willDispose = _stopOnEof && eof; var oldFrom = _from; From d34e765f19b02916cf6328ce3e32a3eb88428ce5 Mon Sep 17 00:00:00 2001 From: StuartFergusonVme Date: Fri, 21 May 2021 14:33:22 +0100 Subject: [PATCH 11/14] Update __fromPositions when no events read but not at the end of the stream --- .../Services/Processing/MultiStreamEventReader.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs index c86d8653306..aaab3a935be 100644 --- a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs @@ -129,6 +129,9 @@ public class MultiStreamEventReader : EventReader, CheckEof(); } else { _eofs[message.EventStreamId] = false; + if (message.Events.Length == 0) { + _fromPositions.Streams[message.EventStreamId] = message.NextEventNumber; + } for (int index = 0; index < message.Events.Length; index++) { var @event = message.Events[index].Event; var @link = message.Events[index].Link; From 11c3101aead8108812e2bd915f760eeaa163b5b3 Mon Sep 17 00:00:00 2001 From: Steven Blair Date: Fri, 21 May 2021 16:34:04 +0100 Subject: [PATCH 12/14] Format change to kick off CI --- .../Services/Processing/MultiStreamEventReader.cs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs index aaab3a935be..c02f0c53c14 100644 --- a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs @@ -22,7 +22,6 @@ public class MultiStreamEventReader : EventReader, private CheckpointTag _fromPositions; private readonly bool _resolveLinkTos; private readonly ITimeProvider _timeProvider; - private readonly HashSet _eventsRequested = new HashSet(); private readonly Dictionary _preparePositions = new Dictionary(); From f8544a23b520f145003f280d468aee4a016c05af Mon Sep 17 00:00:00 2001 From: Steven Blair Date: Fri, 21 May 2021 16:57:43 +0100 Subject: [PATCH 13/14] Another CI attempt. --- .../Services/Processing/PartitionCompletedWorkItem.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs b/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs index c8f30d5139d..b17c253b89c 100644 --- a/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs +++ b/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs @@ -31,7 +31,7 @@ class PartitionCompletedWorkItem : CheckpointWorkItemBase { protected override void WriteOutput() { _projection.EmitEofResult(_partition, _state.Result, _checkpointTag, Guid.Empty, null); - //NOTE: write output is an ordered processing stage + // NOTE: write output is an ordered processing stage // thus all the work items before have been already processed // and as we are processing in the stream-by-stream mode // it is safe to clean everything before this position up From 373b4ff6d80cd66ff0b57695dd577d7ffe6c7e93 Mon Sep 17 00:00:00 2001 From: StuartFergusonVme Date: Tue, 25 May 2021 15:34:13 +0100 Subject: [PATCH 14/14] Update _fromPositions when no events read but not at the end of the stream --- src/EventStore.Core.Tests/Http/Streams/feed.cs | 2 +- src/EventStore.Core/Services/Storage/StorageReaderWorker.cs | 1 + .../Services/Processing/EventByTypeIndexEventReader.cs | 4 ++-- .../Services/Processing/MultiStreamEventReader.cs | 6 ++++-- .../Services/Processing/PartitionCompletedWorkItem.cs | 2 +- .../Services/Processing/StreamEventReader.cs | 2 +- .../Services/Processing/TransactionFileEventReader.cs | 2 +- .../Services/v8/V8ProjectionStateHandler.cs | 3 +++ 8 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/EventStore.Core.Tests/Http/Streams/feed.cs b/src/EventStore.Core.Tests/Http/Streams/feed.cs index f0e1f504544..49fb8ab5361 100644 --- a/src/EventStore.Core.Tests/Http/Streams/feed.cs +++ b/src/EventStore.Core.Tests/Http/Streams/feed.cs @@ -415,7 +415,7 @@ public class private List _entries; protected override async Task When() { - _feed = await GetJson("/streams/" + LinkedStreamName + "/0/forward/10?embed=rich", + _feed = await GetJson("/streams/" + LinkedStreamName + "/0/forward/10",extra: "embed=rich", accept: ContentType.Json); _entries = _feed != null ? _feed["entries"].ToList() : new List(); } diff --git a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs index e1e80c68c42..15363fc7b5c 100644 --- a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs +++ b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs @@ -309,6 +309,7 @@ public class StorageReaderWorker : StorageReaderWorker, IHandle eventTypes, EventByTypeIndexEventReader reader break; case ReadStreamResult.Success: _reader.UpdateNextStreamPosition(message.EventStreamId, message.NextEventNumber); - var isEof = message.Events.Length == 0; - _eofs[message.EventStreamId] = isEof; + _eofs[message.EventStreamId] = (message.Events.Length == 0) && message.IsEndOfStream; EnqueueEvents(message); ProcessBuffersAndContinue(eventStreamId: message.EventStreamId); break; + default: throw new NotSupportedException( String.Format("ReadEvents result code was not recognized. Code: {0}", message.Result)); diff --git a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs index d404c93fc2b..c02f0c53c14 100644 --- a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs @@ -22,7 +22,6 @@ public class MultiStreamEventReader : EventReader, private CheckpointTag _fromPositions; private readonly bool _resolveLinkTos; private readonly ITimeProvider _timeProvider; - private readonly HashSet _eventsRequested = new HashSet(); private readonly Dictionary _preparePositions = new Dictionary(); @@ -121,7 +120,7 @@ public class MultiStreamEventReader : EventReader, CheckEof(); break; case ReadStreamResult.Success: - if (message.Events.Length == 0) { + if ((message.Events.Length == 0) && message.IsEndOfStream) { // the end _eofs[message.EventStreamId] = true; UpdateSafePositionToJoin(message.EventStreamId, MessageToLastCommitPosition(message)); @@ -129,6 +128,9 @@ public class MultiStreamEventReader : EventReader, CheckEof(); } else { _eofs[message.EventStreamId] = false; + if (message.Events.Length == 0) { + _fromPositions.Streams[message.EventStreamId] = message.NextEventNumber; + } for (int index = 0; index < message.Events.Length; index++) { var @event = message.Events[index].Event; var @link = message.Events[index].Link; diff --git a/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs b/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs index c8f30d5139d..b17c253b89c 100644 --- a/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs +++ b/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs @@ -31,7 +31,7 @@ class PartitionCompletedWorkItem : CheckpointWorkItemBase { protected override void WriteOutput() { _projection.EmitEofResult(_partition, _state.Result, _checkpointTag, Guid.Empty, null); - //NOTE: write output is an ordered processing stage + // NOTE: write output is an ordered processing stage // thus all the work items before have been already processed // and as we are processing in the stream-by-stream mode // it is safe to clean everything before this position up diff --git a/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs index fb6518d45a3..cb07d659e64 100644 --- a/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs @@ -90,7 +90,7 @@ public class StreamEventReader : EventReader, case ReadStreamResult.Success: var oldFromSequenceNumber = StartFrom(message, _fromSequenceNumber); _fromSequenceNumber = message.NextEventNumber; - var eof = message.Events.Length == 0; + var eof = (message.Events.Length == 0) && message.IsEndOfStream; _eof = eof; var willDispose = eof && _stopOnEof; diff --git a/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs index 4167d2ce8fc..3b154d4ed48 100644 --- a/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs @@ -63,7 +63,7 @@ public class TransactionFileEventReader : EventReader, return; } - var eof = message.Events.Length == 0; + var eof = (message.Events.Length == 0) && message.IsEndOfStream; _eof = eof; var willDispose = _stopOnEof && eof; var oldFrom = _from; diff --git a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs index 709ac9793f0..bb1ad9cb989 100644 --- a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs @@ -10,6 +10,8 @@ using Serilog; namespace EventStore.Projections.Core.Services.v8 { + using System.Threading; + using Client.PersistentSubscriptions; public class V8ProjectionStateHandler : IProjectionStateHandler { private readonly PreludeScript _prelude; @@ -123,6 +125,7 @@ public class EmittedEventJsonContract { _query.InitializeShared(); } + public string GetStatePartition( CheckpointTag eventPosition, string category, ResolvedEvent @event) { CheckDisposed();