Skip to content

Commit

Permalink
Merge pull request #924 from EventStore/revert-client-async-sub
Browse files Browse the repository at this point in the history
Revert of Make EventStoreCatchUpSubscription Async #837
  • Loading branch information
hayley-jean committed May 20, 2016
2 parents ae2becd + 3fd1b95 commit eb8e546
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 871 deletions.
270 changes: 88 additions & 182 deletions src/EventStore.ClientAPI/EventStoreCatchUpSubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
using EventStore.ClientAPI.Exceptions;
using EventStore.ClientAPI.SystemData;
using System.Collections.Concurrent;
using System.Threading.Tasks;

namespace EventStore.ClientAPI
{
Expand Down Expand Up @@ -69,20 +68,18 @@ public abstract class EventStoreCatchUpSubscription
private readonly ManualResetEventSlim _stopped = new ManualResetEventSlim(true);

/// <summary>
/// Read events until the given position or event number async.
/// Read events until the given position or event number.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
/// <returns></returns>
protected abstract Task ReadEventsTillAsync(IEventStoreConnection connection,
protected abstract void ReadEventsTill(IEventStoreConnection connection,
bool resolveLinkTos,
UserCredentials userCredentials,
long? lastCommitPosition,
int? lastEventNumber);

/// <summary>
/// Try to process a single <see cref="ResolvedEvent"/>.
/// </summary>
Expand Down Expand Up @@ -126,10 +123,10 @@ public abstract class EventStoreCatchUpSubscription
Verbose = settings.VerboseLogging;
}

internal Task Start()
internal void Start()
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: starting...", IsSubscribedToAll ? "<all>" : StreamId);
return RunSubscription();
RunSubscription();
}

/// <summary>
Expand Down Expand Up @@ -167,109 +164,57 @@ private void OnReconnect(object sender, ClientConnectionEventArgs clientConnecti
RunSubscription();
}

private Task RunSubscription()
{
return Task.Factory.StartNew(LoadHistoricalEvents, TaskCreationOptions.AttachedToParent)
.ContinueWith(_ => HandleErrorOrContinue(_));
}

private void LoadHistoricalEvents()
private void RunSubscription()
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: running...", IsSubscribedToAll ? "<all>" : StreamId);

_stopped.Reset();
_allowProcessing = false;

if (!ShouldStop)
{
if (Verbose)
Log.Debug("Catch-up Subscription to {0}: pulling events...", IsSubscribedToAll ? "<all>" : StreamId);

ReadEventsTillAsync(_connection, _resolveLinkTos, _userCredentials, null, null)
.ContinueWith(_ => HandleErrorOrContinue(_, SubscribeToStream), TaskContinuationOptions.AttachedToParent);
}
else
ThreadPool.QueueUserWorkItem(_ =>
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}

private void SubscribeToStream()
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: subscribing...", IsSubscribedToAll ? "<all>" : StreamId);

var subscribeTask = _streamId == string.Empty
? _connection.SubscribeToAllAsync(_resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials)
: _connection.SubscribeToStreamAsync(_streamId, _resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials);


subscribeTask.ContinueWith(_ => HandleErrorOrContinue(_, () =>
{
_subscription = _.Result;
ReadMissedHistoricEvents();
}), TaskContinuationOptions.AttachedToParent);
}
else
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}
if (Verbose) Log.Debug("Catch-up Subscription to {0}: running...", IsSubscribedToAll ? "<all>" : StreamId);
private void ReadMissedHistoricEvents()
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events (if left)...", IsSubscribedToAll ? "<all>" : StreamId);
_stopped.Reset();
try
{
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events...", IsSubscribedToAll ? "<all>" : StreamId);
ReadEventsTill(_connection, _resolveLinkTos, _userCredentials, null, null);
}
ReadEventsTillAsync(_connection, _resolveLinkTos, _userCredentials, _subscription.LastCommitPosition, _subscription.LastEventNumber)
.ContinueWith(_ => HandleErrorOrContinue(_, StartLiveProcessing), TaskContinuationOptions.AttachedToParent);
}
else
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
}
}
if (!ShouldStop)
{
if (Verbose) Log.Debug("Catch-up Subscription to {0}: subscribing...", IsSubscribedToAll ? "<all>" : StreamId);
_subscription = _streamId == string.Empty
? _connection.SubscribeToAllAsync(_resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials).Result
: _connection.SubscribeToStreamAsync(_streamId, _resolveLinkTos, EnqueuePushedEvent, ServerSubscriptionDropped, _userCredentials).Result;
private void StartLiveProcessing()
{
if (ShouldStop)
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
return;
}
if (Verbose) Log.Debug("Catch-up Subscription to {0}: pulling events (if left)...", IsSubscribedToAll ? "<all>" : StreamId);
ReadEventsTill(_connection, _resolveLinkTos, _userCredentials, _subscription.LastCommitPosition, _subscription.LastEventNumber);
}
}
catch (Exception exc)
{
DropSubscription(SubscriptionDropReason.CatchUpError, exc);
return;
}
if (Verbose) Log.Debug("Catch-up Subscription to {0}: processing live events...", IsSubscribedToAll ? "<all>" : StreamId);
if (ShouldStop)
{
DropSubscription(SubscriptionDropReason.UserInitiated, null);
return;
}
if (_liveProcessingStarted != null)
_liveProcessingStarted(this);
if (Verbose) Log.Debug("Catch-up Subscription to {0}: processing live events...", IsSubscribedToAll ? "<all>" : StreamId);
if (Verbose) Log.Debug("Catch-up Subscription to {0}: hooking to connection.Connected", IsSubscribedToAll ? "<all>" : StreamId);
_connection.Connected += OnReconnect;
if (_liveProcessingStarted != null)
_liveProcessingStarted(this);
_allowProcessing = true;
EnsureProcessingPushQueue();
}
if (Verbose) Log.Debug("Catch-up Subscription to {0}: hooking to connection.Connected", IsSubscribedToAll ? "<all>" : StreamId);
_connection.Connected += OnReconnect;
private void HandleErrorOrContinue(Task task, Action continuation = null)
{
if (task.IsFaulted)
{
DropSubscription(SubscriptionDropReason.CatchUpError, task.Exception.GetBaseException());
task.Wait();
}
else if (task.IsCanceled)
{
DropSubscription(SubscriptionDropReason.CatchUpError, new TaskCanceledException(task));
task.Wait();
}
else if (continuation != null)
{
continuation();
}
_allowProcessing = true;
EnsureProcessingPushQueue();
});
}

private void EnqueuePushedEvent(EventStoreSubscription subscription, ResolvedEvent e)
{
if (Verbose)
Expand Down Expand Up @@ -411,57 +356,39 @@ public Position LastProcessedPosition
_nextReadPosition = fromPositionExclusive ?? Position.Start;
}


/// <summary>
/// Read events until the given position async.
/// Read events until the given position.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
/// <returns></returns>
protected override Task ReadEventsTillAsync(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
{
return connection.ReadAllEventsForwardAsync(_nextReadPosition, ReadBatchSize, resolveLinkTos, userCredentials)
.ContinueWith(_ =>
{
if (_.IsFaulted || _.IsCanceled)
{
_.Wait(); //force exception to be thrown
}
if (!ProcessEvents(lastCommitPosition, _.Result) && !ShouldStop)
{
ReadEventsTillAsync(connection, resolveLinkTos, userCredentials,
lastCommitPosition, lastEventNumber);
}
else if (Verbose)
{
Log.Debug(
"Catch-up Subscription to {0}: finished reading events, nextReadPosition = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadPosition);
}
}, TaskContinuationOptions.AttachedToParent);
}

private bool ProcessEvents(long? lastCommitPosition, AllEventsSlice slice)
protected override void ReadEventsTill(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
{
foreach (var e in slice.Events)
bool done;
do
{
if (e.OriginalPosition == null) throw new Exception("Subscription event came up with no OriginalPosition.");
TryProcess(e);
}
_nextReadPosition = slice.NextPosition;
AllEventsSlice slice = connection.ReadAllEventsForwardAsync(_nextReadPosition, ReadBatchSize, resolveLinkTos, userCredentials).Result;
foreach (var e in slice.Events)
{
if (e.OriginalPosition == null) throw new Exception("Subscription event came up with no OriginalPosition.");
TryProcess(e);
}
_nextReadPosition = slice.NextPosition;

var done = lastCommitPosition == null
? slice.IsEndOfStream
: slice.NextPosition >= new Position(lastCommitPosition.Value, lastCommitPosition.Value);
done = lastCommitPosition == null
? slice.IsEndOfStream
: slice.NextPosition >= new Position(lastCommitPosition.Value, lastCommitPosition.Value);

if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
return done;
if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
} while (!done && !ShouldStop);

if (Verbose)
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadPosition = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadPosition);
}

/// <summary>
Expand Down Expand Up @@ -516,45 +443,23 @@ public class EventStoreStreamCatchUpSubscription : EventStoreCatchUpSubscription
}

/// <summary>
/// Read events until the given event number async.
/// Read events until the given event number.
/// </summary>
/// <param name="connection">The connection.</param>
/// <param name="resolveLinkTos">Whether to resolve Link events.</param>
/// <param name="userCredentials">User credentials for the operation.</param>
/// <param name="lastCommitPosition">The commit position to read until.</param>
/// <param name="lastEventNumber">The event number to read until.</param>
/// <returns></returns>
protected override Task ReadEventsTillAsync(IEventStoreConnection connection, bool resolveLinkTos, UserCredentials userCredentials,
long? lastCommitPosition, int? lastEventNumber)
{
return connection.ReadStreamEventsForwardAsync(StreamId, _nextReadEventNumber, ReadBatchSize, resolveLinkTos,userCredentials)
.ContinueWith(_ =>
{
if (_.IsFaulted || _.IsCanceled)
{
_.Wait(); //force exception to be thrown
}
if (!ProcessEvents(lastEventNumber, _.Result) && !ShouldStop)
{
ReadEventsTillAsync(connection, resolveLinkTos, userCredentials,
lastCommitPosition, lastEventNumber);
}
else if (Verbose)
{
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadEventNumber = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadEventNumber);
}
}, TaskContinuationOptions.AttachedToParent);
}

private bool ProcessEvents(int? lastEventNumber, StreamEventsSlice slice)
protected override void ReadEventsTill(IEventStoreConnection connection, bool resolveLinkTos,
UserCredentials userCredentials, long? lastCommitPosition, int? lastEventNumber)
{
bool done;
switch (slice.Status)
do
{
case SliceReadStatus.Success:
var slice = connection.ReadStreamEventsForwardAsync(StreamId, _nextReadEventNumber, ReadBatchSize, resolveLinkTos, userCredentials).Result;
switch (slice.Status)
{
case SliceReadStatus.Success:
{
foreach (var e in slice.Events)
{
Expand All @@ -564,25 +469,26 @@ private bool ProcessEvents(int? lastEventNumber, StreamEventsSlice slice)
done = lastEventNumber == null ? slice.IsEndOfStream : slice.NextEventNumber > lastEventNumber;
break;
}
case SliceReadStatus.StreamNotFound:
case SliceReadStatus.StreamNotFound:
{
if (lastEventNumber.HasValue && lastEventNumber != -1)
throw new Exception(
string.Format("Impossible: stream {0} disappeared in the middle of catching up subscription.",
StreamId));
throw new Exception(string.Format("Impossible: stream {0} disappeared in the middle of catching up subscription.", StreamId));
done = true;
break;
}
case SliceReadStatus.StreamDeleted:
throw new StreamDeletedException(StreamId);
default:
throw new ArgumentOutOfRangeException(string.Format("Unexpected StreamEventsSlice.Status: {0}.",
slice.Status));
}
case SliceReadStatus.StreamDeleted:
throw new StreamDeletedException(StreamId);
default:
throw new ArgumentOutOfRangeException(string.Format("Unexpected StreamEventsSlice.Status: {0}.", slice.Status));
}

if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
return done;
if (!done && slice.IsEndOfStream)
Thread.Sleep(1); // we are waiting for server to flush its data
} while (!done && !ShouldStop);

if (Verbose)
Log.Debug("Catch-up Subscription to {0}: finished reading events, nextReadEventNumber = {1}.",
IsSubscribedToAll ? "<all>" : StreamId, _nextReadEventNumber);
}

/// <summary>
Expand Down

0 comments on commit eb8e546

Please sign in to comment.