Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle truncated streams when reading events for projections #2956

Closed
wants to merge 17 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/EventStore.Core.Tests/Http/Streams/feed.cs
Expand Up @@ -415,7 +415,7 @@ public class
private List<JToken> _entries;

protected override async Task When() {
_feed = await GetJson<JObject>("/streams/" + LinkedStreamName + "/0/forward/10?embed=rich",
_feed = await GetJson<JObject>("/streams/" + LinkedStreamName + "/0/forward/10",extra: "embed=rich",
accept: ContentType.Json);
_entries = _feed != null ? _feed["entries"].ToList() : new List<JToken>();
}
Expand Down
Expand Up @@ -309,6 +309,7 @@ public class StorageReaderWorker<TStreamId> : StorageReaderWorker, IHandle<Clien

var result =
_readIndex.ReadStreamEventsForward(streamName, streamId, msg.FromEventNumber, msg.MaxCount);

CheckEventsOrder(msg, result);
var resolvedPairs = ResolveLinkToEvents(result.Records, msg.ResolveLinkTos, msg.User);
if (resolvedPairs == null)
Expand Down
Expand Up @@ -251,11 +251,11 @@ public IndexBased(HashSet<string> eventTypes, EventByTypeIndexEventReader reader
break;
case ReadStreamResult.Success:
_reader.UpdateNextStreamPosition(message.EventStreamId, message.NextEventNumber);
var isEof = message.Events.Length == 0;
_eofs[message.EventStreamId] = isEof;
_eofs[message.EventStreamId] = (message.Events.Length == 0) && message.IsEndOfStream;
EnqueueEvents(message);
ProcessBuffersAndContinue(eventStreamId: message.EventStreamId);
break;

default:
throw new NotSupportedException(
String.Format("ReadEvents result code was not recognized. Code: {0}", message.Result));
Expand Down
Expand Up @@ -22,7 +22,6 @@ public class MultiStreamEventReader : EventReader,
private CheckpointTag _fromPositions;
private readonly bool _resolveLinkTos;
private readonly ITimeProvider _timeProvider;

private readonly HashSet<string> _eventsRequested = new HashSet<string>();
private readonly Dictionary<string, long?> _preparePositions = new Dictionary<string, long?>();

Expand Down Expand Up @@ -121,14 +120,17 @@ public class MultiStreamEventReader : EventReader,
CheckEof();
break;
case ReadStreamResult.Success:
if (message.Events.Length == 0) {
if ((message.Events.Length == 0) && message.IsEndOfStream) {
// the end
_eofs[message.EventStreamId] = true;
UpdateSafePositionToJoin(message.EventStreamId, MessageToLastCommitPosition(message));
CheckIdle();
CheckEof();
} else {
_eofs[message.EventStreamId] = false;
if (message.Events.Length == 0) {
_fromPositions.Streams[message.EventStreamId] = message.NextEventNumber;
}
for (int index = 0; index < message.Events.Length; index++) {
var @event = message.Events[index].Event;
var @link = message.Events[index].Link;
Expand Down
Expand Up @@ -31,7 +31,7 @@ class PartitionCompletedWorkItem : CheckpointWorkItemBase {

protected override void WriteOutput() {
_projection.EmitEofResult(_partition, _state.Result, _checkpointTag, Guid.Empty, null);
//NOTE: write output is an ordered processing stage
// NOTE: write output is an ordered processing stage
// thus all the work items before have been already processed
// and as we are processing in the stream-by-stream mode
// it is safe to clean everything before this position up
Expand Down
Expand Up @@ -90,7 +90,7 @@ public class StreamEventReader : EventReader,
case ReadStreamResult.Success:
var oldFromSequenceNumber = StartFrom(message, _fromSequenceNumber);
_fromSequenceNumber = message.NextEventNumber;
var eof = message.Events.Length == 0;
var eof = (message.Events.Length == 0) && message.IsEndOfStream;
_eof = eof;
var willDispose = eof && _stopOnEof;

Expand Down
Expand Up @@ -63,7 +63,7 @@ public class TransactionFileEventReader : EventReader,
return;
}

var eof = message.Events.Length == 0;
var eof = (message.Events.Length == 0) && message.IsEndOfStream;
_eof = eof;
var willDispose = _stopOnEof && eof;
var oldFrom = _from;
Expand Down
Expand Up @@ -10,6 +10,8 @@
using Serilog;

namespace EventStore.Projections.Core.Services.v8 {
using System.Threading;
using Client.PersistentSubscriptions;

public class V8ProjectionStateHandler : IProjectionStateHandler {
private readonly PreludeScript _prelude;
Expand Down Expand Up @@ -123,6 +125,7 @@ public class EmittedEventJsonContract {
_query.InitializeShared();
}


public string GetStatePartition(
CheckpointTag eventPosition, string category, ResolvedEvent @event) {
CheckDisposed();
Expand Down