diff --git a/src/EventStore.Core.Tests/Http/Streams/feed.cs b/src/EventStore.Core.Tests/Http/Streams/feed.cs index f0e1f504544..49fb8ab5361 100644 --- a/src/EventStore.Core.Tests/Http/Streams/feed.cs +++ b/src/EventStore.Core.Tests/Http/Streams/feed.cs @@ -415,7 +415,7 @@ public class private List _entries; protected override async Task When() { - _feed = await GetJson("/streams/" + LinkedStreamName + "/0/forward/10?embed=rich", + _feed = await GetJson("/streams/" + LinkedStreamName + "/0/forward/10",extra: "embed=rich", accept: ContentType.Json); _entries = _feed != null ? _feed["entries"].ToList() : new List(); } diff --git a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs index e1e80c68c42..15363fc7b5c 100644 --- a/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs +++ b/src/EventStore.Core/Services/Storage/StorageReaderWorker.cs @@ -309,6 +309,7 @@ public class StorageReaderWorker : StorageReaderWorker, IHandle eventTypes, EventByTypeIndexEventReader reader break; case ReadStreamResult.Success: _reader.UpdateNextStreamPosition(message.EventStreamId, message.NextEventNumber); - var isEof = message.Events.Length == 0; - _eofs[message.EventStreamId] = isEof; + _eofs[message.EventStreamId] = (message.Events.Length == 0) && message.IsEndOfStream; EnqueueEvents(message); ProcessBuffersAndContinue(eventStreamId: message.EventStreamId); break; + default: throw new NotSupportedException( String.Format("ReadEvents result code was not recognized. Code: {0}", message.Result)); diff --git a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs index d404c93fc2b..c02f0c53c14 100644 --- a/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/MultiStreamEventReader.cs @@ -22,7 +22,6 @@ public class MultiStreamEventReader : EventReader, private CheckpointTag _fromPositions; private readonly bool _resolveLinkTos; private readonly ITimeProvider _timeProvider; - private readonly HashSet _eventsRequested = new HashSet(); private readonly Dictionary _preparePositions = new Dictionary(); @@ -121,7 +120,7 @@ public class MultiStreamEventReader : EventReader, CheckEof(); break; case ReadStreamResult.Success: - if (message.Events.Length == 0) { + if ((message.Events.Length == 0) && message.IsEndOfStream) { // the end _eofs[message.EventStreamId] = true; UpdateSafePositionToJoin(message.EventStreamId, MessageToLastCommitPosition(message)); @@ -129,6 +128,9 @@ public class MultiStreamEventReader : EventReader, CheckEof(); } else { _eofs[message.EventStreamId] = false; + if (message.Events.Length == 0) { + _fromPositions.Streams[message.EventStreamId] = message.NextEventNumber; + } for (int index = 0; index < message.Events.Length; index++) { var @event = message.Events[index].Event; var @link = message.Events[index].Link; diff --git a/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs b/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs index c8f30d5139d..b17c253b89c 100644 --- a/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs +++ b/src/EventStore.Projections.Core/Services/Processing/PartitionCompletedWorkItem.cs @@ -31,7 +31,7 @@ class PartitionCompletedWorkItem : CheckpointWorkItemBase { protected override void WriteOutput() { _projection.EmitEofResult(_partition, _state.Result, _checkpointTag, Guid.Empty, null); - //NOTE: write output is an ordered processing stage + // NOTE: write output is an ordered processing stage // thus all the work items before have been already processed // and as we are processing in the stream-by-stream mode // it is safe to clean everything before this position up diff --git a/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs index fb6518d45a3..cb07d659e64 100644 --- a/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/StreamEventReader.cs @@ -90,7 +90,7 @@ public class StreamEventReader : EventReader, case ReadStreamResult.Success: var oldFromSequenceNumber = StartFrom(message, _fromSequenceNumber); _fromSequenceNumber = message.NextEventNumber; - var eof = message.Events.Length == 0; + var eof = (message.Events.Length == 0) && message.IsEndOfStream; _eof = eof; var willDispose = eof && _stopOnEof; diff --git a/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs b/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs index 4167d2ce8fc..3b154d4ed48 100644 --- a/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs +++ b/src/EventStore.Projections.Core/Services/Processing/TransactionFileEventReader.cs @@ -63,7 +63,7 @@ public class TransactionFileEventReader : EventReader, return; } - var eof = message.Events.Length == 0; + var eof = (message.Events.Length == 0) && message.IsEndOfStream; _eof = eof; var willDispose = _stopOnEof && eof; var oldFrom = _from; diff --git a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs index 709ac9793f0..bb1ad9cb989 100644 --- a/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs +++ b/src/EventStore.Projections.Core/Services/v8/V8ProjectionStateHandler.cs @@ -10,6 +10,8 @@ using Serilog; namespace EventStore.Projections.Core.Services.v8 { + using System.Threading; + using Client.PersistentSubscriptions; public class V8ProjectionStateHandler : IProjectionStateHandler { private readonly PreludeScript _prelude; @@ -123,6 +125,7 @@ public class EmittedEventJsonContract { _query.InitializeShared(); } + public string GetStatePartition( CheckpointTag eventPosition, string category, ResolvedEvent @event) { CheckDisposed();