Skip to content

Commit

Permalink
Merge pull request #837 from samhjohnson/async-catchupsubs
Browse files Browse the repository at this point in the history
Make EventStoreCatchUpSubscription Async
  • Loading branch information
gregoryyoung committed Mar 17, 2016
2 parents 5020942 + 60455a0 commit 39d7d26
Show file tree
Hide file tree
Showing 3 changed files with 845 additions and 89 deletions.
270 changes: 182 additions & 88 deletions src/EventStore.ClientAPI/EventStoreCatchUpSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.SystemData;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace EventStore.ClientAPI
{
Expand Down Expand Up @@ -70,18 +71,20 @@ public abstract class EventStoreCatchUpSubscription
private readonly ManualResetEventSlim _stopped = new ManualResetEventSlim(true);

/// <summary>
/// Read events until the given position or event number.
/// Read events until the given position or event number async.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
protected abstract void ReadEventsTill(IEventStoreConnection connection,
/// <returns></returns>
protected abstract Task ReadEventsTillAsync(IEventStoreConnection connection,
bool resolveLinkTos,
UserCredentials userCredentials,
long? lastCommitPosition,
int? lastEventNumber);

/// <summary>
/// Try to process a single <see cref="ResolvedEvent"/>.
/// </summary>
Expand Down Expand Up @@ -134,10 +137,10 @@ public abstract class EventStoreCatchUpSubscription
Verbose = verboseLogging;
}

internal void Start()
internal Task Start()
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: starting...", IsSubscribedToAll ? "<all>" : StreamId);
RunSubscription();
return RunSubscription();
}

/// <summary>
Expand Down Expand Up @@ -175,57 +178,109 @@ private void OnReconnect(object sender, ClientConnectionEventArgs clientConnecti
RunSubscription();
}

private void RunSubscription()
private Task RunSubscription()
{
return Task.Factory.StartNew(LoadHistoricalEvents, TaskCreationOptions.AttachedToParent)
.ContinueWith(_ => HandleErrorOrContinue(_));
}

private void LoadHistoricalEvents()
{
ThreadPool.QueueUserWorkItem(_ =>
if (Verbose) Log.Debug("Catch-up Subscription to {0}: running...", IsSubscribedToAll ? "<all>" : StreamId);

_stopped.Reset();
_allowProcessing = false;

if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: running...", IsSubscribedToAll ? "<all>" : StreamId);
if (Verbose)
Log.Debug("Catch-up Subscription to {0}: pulling events...", IsSubscribedToAll ? "<all>" : StreamId);

_stopped.Reset();
try
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events...", IsSubscribedToAll ? "<all>" : StreamId);
ReadEventsTill(_connection, _resolveLinkTos, _userCredentials, null, null);
}
ReadEventsTillAsync(_connection, _resolveLinkTos, _userCredentials, null, null)
.ContinueWith(_ => HandleErrorOrContinue(_, SubscribeToStream), TaskContinuationOptions.AttachedToParent);
}
else
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}

if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: subscribing...", IsSubscribedToAll ? "<all>" : StreamId);
_subscription = _streamId == string.Empty
? _connection.SubscribeToAllAsync(_resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials).Result
: _connection.SubscribeToStreamAsync(_streamId, _resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials).Result;
private void SubscribeToStream()
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: subscribing...", IsSubscribedToAll ? "<all>" : StreamId);

if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events (if left)...", IsSubscribedToAll ? "<all>" : StreamId);
ReadEventsTill(_connection, _resolveLinkTos, _userCredentials, _subscription.LastCommitPosition, _subscription.LastEventNumber);
}
}
catch (Exception exc)
{
DropSubscription(SubscriptionDropReason.CatchUpError, exc);
return;
}
var subscribeTask = _streamId == string.Empty
? _connection.SubscribeToAllAsync(_resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials)
: _connection.SubscribeToStreamAsync(_streamId, _resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials);

if (ShouldStop)
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
return;
}

if (Verbose) Log.Debug("Catch-up Subscription to {0}: processing live events...", IsSubscribedToAll ? "<all>" : StreamId);
subscribeTask.ContinueWith(_ => HandleErrorOrContinue(_, () =>
{
_subscription = _.Result;
ReadMissedHistoricEvents();
}), TaskContinuationOptions.AttachedToParent);
}
else
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}

private void ReadMissedHistoricEvents()
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events (if left)...", IsSubscribedToAll ? "<all>" : StreamId);

ReadEventsTillAsync(_connection, _resolveLinkTos, _userCredentials, _subscription.LastCommitPosition, _subscription.LastEventNumber)
.ContinueWith(_ => HandleErrorOrContinue(_, StartLiveProcessing), TaskContinuationOptions.AttachedToParent);
}
else
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}

private void StartLiveProcessing()
{
if (ShouldStop)
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
return;
}

if (_liveProcessingStarted != null)
_liveProcessingStarted(this);
if (Verbose) Log.Debug("Catch-up Subscription to {0}: processing live events...", IsSubscribedToAll ? "<all>" : StreamId);

if (Verbose) Log.Debug("Catch-up Subscription to {0}: hooking to connection.Connected", IsSubscribedToAll ? "<all>" : StreamId);
_connection.Connected += OnReconnect;
if (_liveProcessingStarted != null)
_liveProcessingStarted(this);

_allowProcessing = true;
EnsureProcessingPushQueue();
});
if (Verbose) Log.Debug("Catch-up Subscription to {0}: hooking to connection.Connected", IsSubscribedToAll ? "<all>" : StreamId);
_connection.Connected += OnReconnect;

_allowProcessing = true;
EnsureProcessingPushQueue();
}


private void HandleErrorOrContinue(Task task, Action continuation = null)
{
if (task.IsFaulted)
{
DropSubscription(SubscriptionDropReason.CatchUpError, task.Exception.GetBaseException());
task.Wait();
}
else if (task.IsCanceled)
{
DropSubscription(SubscriptionDropReason.CatchUpError, new TaskCanceledException(task));
task.Wait();
}
else if (continuation != null)
{
continuation();
}
}

private void EnqueuePushedEvent(EventStoreSubscription subscription, ResolvedEvent e)
{
if (Verbose)
Expand Down Expand Up @@ -369,39 +424,57 @@ public Position LastProcessedPosition
_nextReadPosition = fromPositionExclusive ?? Position.Start;
}


/// <summary>
/// Read events until the given position.
/// Read events until the given position async.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
protected override void ReadEventsTill(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
/// <returns></returns>
protected override Task ReadEventsTillAsync(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
{
bool done;
do
{
AllEventsSlice slice = connection.ReadAllEventsForwardAsync(_nextReadPosition, ReadBatchSize, resolveLinkTos, userCredentials).Result;
foreach (var e in slice.Events)
{
if (e.OriginalPosition == null) throw new Exception("Subscription event came up with no OriginalPosition.");
TryProcess(e);
}
_nextReadPosition = slice.NextPosition;
return connection.ReadAllEventsForwardAsync(_nextReadPosition, ReadBatchSize, resolveLinkTos, userCredentials)
.ContinueWith(_ =>
{
if (_.IsFaulted || _.IsCanceled)
{
_.Wait(); //force exception to be thrown
}
done = lastCommitPosition == null
? slice.IsEndOfStream
: slice.NextPosition >= new Position(lastCommitPosition.Value, lastCommitPosition.Value);
if (!ProcessEvents(lastCommitPosition, _.Result) && !ShouldStop)
{
ReadEventsTillAsync(connection, resolveLinkTos, userCredentials,
lastCommitPosition, lastEventNumber);
}
else if (Verbose)
{
Log.Debug(
"Catch-up Subscription to {0}: finished reading events, nextReadPosition = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadPosition);
}
}, TaskContinuationOptions.AttachedToParent);
}

if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
} while (!done && !ShouldStop);
private bool ProcessEvents(long? lastCommitPosition, AllEventsSlice slice)
{
foreach (var e in slice.Events)
{
if (e.OriginalPosition == null) throw new Exception("Subscription event came up with no OriginalPosition.");
TryProcess(e);
}
_nextReadPosition = slice.NextPosition;

if (Verbose)
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadPosition = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadPosition);
var done = lastCommitPosition == null
? slice.IsEndOfStream
: slice.NextPosition >= new Position(lastCommitPosition.Value, lastCommitPosition.Value);

if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
return done;
}

/// <summary>
Expand Down Expand Up @@ -458,23 +531,45 @@ public class EventStoreStreamCatchUpSubscription : EventStoreCatchUpSubscription
}

/// <summary>
/// Read events until the given event number.
/// Read events until the given event number async.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
protected override void ReadEventsTill(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
/// <returns></returns>
protected override Task ReadEventsTillAsync(IEventStoreConnection connection, bool resolveLinkTos, UserCredentials userCredentials,
long? lastCommitPosition, int? lastEventNumber)
{
return connection.ReadStreamEventsForwardAsync(StreamId, _nextReadEventNumber, ReadBatchSize, resolveLinkTos,userCredentials)
.ContinueWith(_ =>
{
if (_.IsFaulted || _.IsCanceled)
{
_.Wait(); //force exception to be thrown
}
if (!ProcessEvents(lastEventNumber, _.Result) && !ShouldStop)
{
ReadEventsTillAsync(connection, resolveLinkTos, userCredentials,
lastCommitPosition, lastEventNumber);
}
else if (Verbose)
{
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadEventNumber = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadEventNumber);
}
}, TaskContinuationOptions.AttachedToParent);
}

private bool ProcessEvents(int? lastEventNumber, StreamEventsSlice slice)
{
bool done;
do
switch (slice.Status)
{
var slice = connection.ReadStreamEventsForwardAsync(StreamId, _nextReadEventNumber, ReadBatchSize, resolveLinkTos, userCredentials).Result;
switch (slice.Status)
{
case SliceReadStatus.Success:
case SliceReadStatus.Success:
{
foreach (var e in slice.Events)
{
Expand All @@ -484,26 +579,25 @@ public class EventStoreStreamCatchUpSubscription : EventStoreCatchUpSubscription
done = lastEventNumber == null ? slice.IsEndOfStream : slice.NextEventNumber > lastEventNumber;
break;
}
case SliceReadStatus.StreamNotFound:
case SliceReadStatus.StreamNotFound:
{
if (lastEventNumber.HasValue && lastEventNumber != -1)
throw new Exception(string.Format("Impossible: stream {0} disappeared in the middle of catching up subscription.", StreamId));
throw new Exception(
string.Format("Impossible: stream {0} disappeared in the middle of catching up subscription.",
StreamId));
done = true;
break;
}
case SliceReadStatus.StreamDeleted:
throw new StreamDeletedException(StreamId);
default:
throw new ArgumentOutOfRangeException(string.Format("Unexpected StreamEventsSlice.Status: {0}.", slice.Status));
}

if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
} while (!done && !ShouldStop);
case SliceReadStatus.StreamDeleted:
throw new StreamDeletedException(StreamId);
default:
throw new ArgumentOutOfRangeException(string.Format("Unexpected StreamEventsSlice.Status: {0}.",
slice.Status));
}

if (Verbose)
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadEventNumber = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadEventNumber);
if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
return done;
}

/// <summary>
Expand Down

0 comments on commit 39d7d26

Please sign in to comment.