Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make EventStoreCatchUpSubscription Async #837

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
270 changes: 182 additions & 88 deletions src/EventStore.ClientAPI/EventStoreCatchUpSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.SystemData;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace EventStore.ClientAPI
{
Expand Down Expand Up @@ -70,18 +71,20 @@ public abstract class EventStoreCatchUpSubscription
private readonly ManualResetEventSlim _stopped = new ManualResetEventSlim(true);

/// <summary>
/// Read events until the given position or event number.
/// Read events until the given position or event number async.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
protected abstract void ReadEventsTill(IEventStoreConnection connection,
/// <returns></returns>
protected abstract Task ReadEventsTillAsync(IEventStoreConnection connection,
bool resolveLinkTos,
UserCredentials userCredentials,
long? lastCommitPosition,
int? lastEventNumber);

/// <summary>
/// Try to process a single <see cref="ResolvedEvent"/>.
/// </summary>
Expand Down Expand Up @@ -134,10 +137,10 @@ public abstract class EventStoreCatchUpSubscription
Verbose = verboseLogging;
}

internal void Start()
internal Task Start()
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: starting...", IsSubscribedToAll ? "<all>" : StreamId);
RunSubscription();
return RunSubscription();
}

/// <summary>
Expand Down Expand Up @@ -175,57 +178,109 @@ private void OnReconnect(object sender, ClientConnectionEventArgs clientConnecti
RunSubscription();
}

private void RunSubscription()
private Task RunSubscription()
{
return Task.Factory.StartNew(LoadHistoricalEvents, TaskCreationOptions.AttachedToParent)
.ContinueWith(_ => HandleErrorOrContinue(_));
}

private void LoadHistoricalEvents()
{
ThreadPool.QueueUserWorkItem(_ =>
if (Verbose) Log.Debug("Catch-up Subscription to {0}: running...", IsSubscribedToAll ? "<all>" : StreamId);

_stopped.Reset();
_allowProcessing = false;

if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: running...", IsSubscribedToAll ? "<all>" : StreamId);
if (Verbose)
Log.Debug("Catch-up Subscription to {0}: pulling events...", IsSubscribedToAll ? "<all>" : StreamId);

_stopped.Reset();
try
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events...", IsSubscribedToAll ? "<all>" : StreamId);
ReadEventsTill(_connection, _resolveLinkTos, _userCredentials, null, null);
}
ReadEventsTillAsync(_connection, _resolveLinkTos, _userCredentials, null, null)
.ContinueWith(_ => HandleErrorOrContinue(_, SubscribeToStream), TaskContinuationOptions.AttachedToParent);
}
else
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}

if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: subscribing...", IsSubscribedToAll ? "<all>" : StreamId);
_subscription = _streamId == string.Empty
? _connection.SubscribeToAllAsync(_resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials).Result
: _connection.SubscribeToStreamAsync(_streamId, _resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials).Result;
private void SubscribeToStream()
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: subscribing...", IsSubscribedToAll ? "<all>" : StreamId);

if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events (if left)...", IsSubscribedToAll ? "<all>" : StreamId);
ReadEventsTill(_connection, _resolveLinkTos, _userCredentials, _subscription.LastCommitPosition, _subscription.LastEventNumber);
}
}
catch (Exception exc)
{
DropSubscription(SubscriptionDropReason.CatchUpError, exc);
return;
}
var subscribeTask = _streamId == string.Empty
? _connection.SubscribeToAllAsync(_resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials)
: _connection.SubscribeToStreamAsync(_streamId, _resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials);

if (ShouldStop)
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
return;
}

if (Verbose) Log.Debug("Catch-up Subscription to {0}: processing live events...", IsSubscribedToAll ? "<all>" : StreamId);
subscribeTask.ContinueWith(_ => HandleErrorOrContinue(_, () =>
{
_subscription = _.Result;
ReadMissedHistoricEvents();
}), TaskContinuationOptions.AttachedToParent);
}
else
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}

private void ReadMissedHistoricEvents()
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events (if left)...", IsSubscribedToAll ? "<all>" : StreamId);

ReadEventsTillAsync(_connection, _resolveLinkTos, _userCredentials, _subscription.LastCommitPosition, _subscription.LastEventNumber)
.ContinueWith(_ => HandleErrorOrContinue(_, StartLiveProcessing), TaskContinuationOptions.AttachedToParent);
}
else
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}

private void StartLiveProcessing()
{
if (ShouldStop)
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
return;
}

if (_liveProcessingStarted != null)
_liveProcessingStarted(this);
if (Verbose) Log.Debug("Catch-up Subscription to {0}: processing live events...", IsSubscribedToAll ? "<all>" : StreamId);

if (Verbose) Log.Debug("Catch-up Subscription to {0}: hooking to connection.Connected", IsSubscribedToAll ? "<all>" : StreamId);
_connection.Connected += OnReconnect;
if (_liveProcessingStarted != null)
_liveProcessingStarted(this);

_allowProcessing = true;
EnsureProcessingPushQueue();
});
if (Verbose) Log.Debug("Catch-up Subscription to {0}: hooking to connection.Connected", IsSubscribedToAll ? "<all>" : StreamId);
_connection.Connected += OnReconnect;

_allowProcessing = true;
EnsureProcessingPushQueue();
}


private void HandleErrorOrContinue(Task task, Action continuation = null)
{
if (task.IsFaulted)
{
DropSubscription(SubscriptionDropReason.CatchUpError, task.Exception.GetBaseException());
task.Wait();
}
else if (task.IsCanceled)
{
DropSubscription(SubscriptionDropReason.CatchUpError, new TaskCanceledException(task));
task.Wait();
}
else if (continuation != null)
{
continuation();
}
}

private void EnqueuePushedEvent(EventStoreSubscription subscription, ResolvedEvent e)
{
if (Verbose)
Expand Down Expand Up @@ -369,39 +424,57 @@ public Position LastProcessedPosition
_nextReadPosition = fromPositionExclusive ?? Position.Start;
}


/// <summary>
/// Read events until the given position.
/// Read events until the given position async.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
protected override void ReadEventsTill(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
/// <returns></returns>
protected override Task ReadEventsTillAsync(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
{
bool done;
do
{
AllEventsSlice slice = connection.ReadAllEventsForwardAsync(_nextReadPosition, ReadBatchSize, resolveLinkTos, userCredentials).Result;
foreach (var e in slice.Events)
{
if (e.OriginalPosition == null) throw new Exception("Subscription event came up with no OriginalPosition.");
TryProcess(e);
}
_nextReadPosition = slice.NextPosition;
return connection.ReadAllEventsForwardAsync(_nextReadPosition, ReadBatchSize, resolveLinkTos, userCredentials)
.ContinueWith(_ =>
{
if (_.IsFaulted || _.IsCanceled)
{
_.Wait(); //force exception to be thrown
}

done = lastCommitPosition == null
? slice.IsEndOfStream
: slice.NextPosition >= new Position(lastCommitPosition.Value, lastCommitPosition.Value);
if (!ProcessEvents(lastCommitPosition, _.Result) && !ShouldStop)
{
ReadEventsTillAsync(connection, resolveLinkTos, userCredentials,
lastCommitPosition, lastEventNumber);
}
else if (Verbose)
{
Log.Debug(
"Catch-up Subscription to {0}: finished reading events, nextReadPosition = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadPosition);
}
}, TaskContinuationOptions.AttachedToParent);
}

if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
} while (!done && !ShouldStop);
private bool ProcessEvents(long? lastCommitPosition, AllEventsSlice slice)
{
foreach (var e in slice.Events)
{
if (e.OriginalPosition == null) throw new Exception("Subscription event came up with no OriginalPosition.");
TryProcess(e);
}
_nextReadPosition = slice.NextPosition;

if (Verbose)
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadPosition = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadPosition);
var done = lastCommitPosition == null
? slice.IsEndOfStream
: slice.NextPosition >= new Position(lastCommitPosition.Value, lastCommitPosition.Value);

if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
return done;
}

/// <summary>
Expand Down Expand Up @@ -458,23 +531,45 @@ public class EventStoreStreamCatchUpSubscription : EventStoreCatchUpSubscription
}

/// <summary>
/// Read events until the given event number.
/// Read events until the given event number async.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
protected override void ReadEventsTill(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
/// <returns></returns>
protected override Task ReadEventsTillAsync(IEventStoreConnection connection, bool resolveLinkTos, UserCredentials userCredentials,
long? lastCommitPosition, int? lastEventNumber)
{
return connection.ReadStreamEventsForwardAsync(StreamId, _nextReadEventNumber, ReadBatchSize, resolveLinkTos,userCredentials)
.ContinueWith(_ =>
{
if (_.IsFaulted || _.IsCanceled)
{
_.Wait(); //force exception to be thrown
}

if (!ProcessEvents(lastEventNumber, _.Result) && !ShouldStop)
{
ReadEventsTillAsync(connection, resolveLinkTos, userCredentials,
lastCommitPosition, lastEventNumber);
}
else if (Verbose)
{
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadEventNumber = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadEventNumber);
}

}, TaskContinuationOptions.AttachedToParent);
}

private bool ProcessEvents(int? lastEventNumber, StreamEventsSlice slice)
{
bool done;
do
switch (slice.Status)
{
var slice = connection.ReadStreamEventsForwardAsync(StreamId, _nextReadEventNumber, ReadBatchSize, resolveLinkTos, userCredentials).Result;
switch (slice.Status)
{
case SliceReadStatus.Success:
case SliceReadStatus.Success:
{
foreach (var e in slice.Events)
{
Expand All @@ -484,26 +579,25 @@ public class EventStoreStreamCatchUpSubscription : EventStoreCatchUpSubscription
done = lastEventNumber == null ? slice.IsEndOfStream : slice.NextEventNumber > lastEventNumber;
break;
}
case SliceReadStatus.StreamNotFound:
case SliceReadStatus.StreamNotFound:
{
if (lastEventNumber.HasValue && lastEventNumber != -1)
throw new Exception(string.Format("Impossible: stream {0} disappeared in the middle of catching up subscription.", StreamId));
throw new Exception(
string.Format("Impossible: stream {0} disappeared in the middle of catching up subscription.",
StreamId));
done = true;
break;
}
case SliceReadStatus.StreamDeleted:
throw new StreamDeletedException(StreamId);
default:
throw new ArgumentOutOfRangeException(string.Format("Unexpected StreamEventsSlice.Status: {0}.", slice.Status));
}

if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
} while (!done && !ShouldStop);
case SliceReadStatus.StreamDeleted:
throw new StreamDeletedException(StreamId);
default:
throw new ArgumentOutOfRangeException(string.Format("Unexpected StreamEventsSlice.Status: {0}.",
slice.Status));
}

if (Verbose)
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadEventNumber = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadEventNumber);
if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
return done;
}

/// <summary>
Expand Down