Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async subscription handlers #861

Closed
danielmarbach opened this issue Mar 22, 2016 · 45 comments
Closed

Async subscription handlers #861

danielmarbach opened this issue Mar 22, 2016 · 45 comments
Labels
kind/question Issues which are questions

Comments

@danielmarbach
Copy link
Contributor

@dnauck showed me some event store projects he'd built at a recent event. He was using asynchronous libraries inside the Action<EventStoreSubscription, ResolvedEvent> eventAppeared. Because these action delegates are sync he had the following possibilities:

async void Handle(EventStoreSubscription subscription, ResolvedEvent event)

void Handle(EventStoreSubscription subscription, ResolvedEvent event) {
   HandleInternal().GetAwaiter().GetResult();
}

void Handle(EventStoreSubscription subscription, ResolvedEvent event) {
   AsyncPump.Invoke(() => HandleInternal());
}

void Handle(EventStoreSubscription subscription, ResolvedEvent event) {
   AsyncMethod().Result();
   AnotherAsyncMethod().Wait();
}

We all agree that async void is possibly the worst variant a user could choose. The other ones are cumbersome. So how about we change it to

        Func<EventStoreSubscription, ResolvedEvent, Task> eventAppeared,

and apply the same change to the other variants of the API.

If you guys agree that it would be a good change I could send in the PR. There are multiple ways to handle this.

  1. Send in a PR which switches to Funcs, synchronize with GetAwaiter().GetResult() inside the client lib
  2. In more PRs change the eventing system internally to fully support an async variant of IHandle (step by step)

Caveats: This is a breaking change.

What's your thoughts?

@gregoryyoung
Copy link
Contributor

Why does this need to be in the client library at all?

@gregoryyoung
Copy link
Contributor

[Edited to include source links]

If I wasn't clear ...

You can't go all the way through using async. The reason for this can be seen in a simple example. (_,x) => Thread.Sleep(5000) if this went all the way back this would actually pause the code that is processing the socket. This is why as of 3.5.0 all callbacks were dispatched through the threadpool (to isolated code in the client from client handlers).

https://github.com/EventStore/EventStore/blob/release-v3.5.0/src/EventStore.ClientAPI/EventStoreCatchUpSubscription.cs#L269

and

https://github.com/EventStore/EventStore/blob/release-v3.5.0/src/EventStore.ClientAPI/EventStoreCatchUpSubscription.cs#L272

is the furthest back you could push the async without either risking handlers breaking logic in the subscription or handlers breaking logic in the connection itself.

As you can't push the async any further back than the dispatch to threadpool immediately before your handler anyways is there much point in having it? As you point out its quite reasonable in the handler to just call AsyncPump.Invoke(() => Handler(message)). What would be the gain of moving this into the client?

@danielmarbach
Copy link
Contributor Author

For me it was more a convenience thing, as more and more libs move towards async APIs it feels clumsy to have sync handler only. Not necessarily about perf. I see the reasoning about the socket, and why callbacks are dispatched on the threadpool. And similar model could be achieved by using Task.Run with an async callback lambda. Don't get me wrong I'm not arguing for anything here. I wanted to point out the "drawbacks" of the current API in terms of usability when users call async APIs and offer a PR.

Am 25.03.2016 um 14:13 schrieb Greg Young notifications@github.com:

If I wasn't clear ...

To start with you can't go all the way through using async. The reason for this can be seen in a simple example. (_,x) => Thread.Sleep(5000) if this went all the way back this would actually pause the code that is processing the socket. This is why as of 3.5.0 all callbacks were dispatched through the threadpool (to isolated code in the client from client handlers).

As you can't push the async any further back than the dispatch to threadpool immediately before your handler anyways is there much point in having it? As you point out its quite reasonable in the handler to just call AsyncPump.Invoke(() => Handler(message)). What would be the gain of moving this into the client?


You are receiving this because you authored the thread.
Reply to this email directly or view it on GitHub

@gregoryyoung
Copy link
Contributor

You could use a Task.Run but that would only bring it back to the catchupsubscription (one call back).

@pgermishuys pgermishuys added the kind/question Issues which are questions label Mar 30, 2016
@andrii-litvinov
Copy link

I also find it a bit inefficient that all options to handle events are to block on a handler in either way. And I think that making EventAppeared delegate a function that returns a task makes a very good sense.

I have worked around it in Persistent subscription by setting bufferSize to 1 to preserve an order and used async void method with manual acks not to block on task.

I have briefly examined the code and believe TreadPool.QueueWorkItem can be safely replaced With Task.Run. And the rest of event processing logic can use async/await without breaking the logic. Processing will still happen on a ThreadPool threads. ProcessLiveQueue will also be async and return Task. And all the methods involved in working with event can be async if needed.

I'm not sure yet if EventStoreCatchUpSubscription is meant to processed events in parallel, but if it is approach with Task.Run and async handlers will help utilize resources more efficiently without need to block thread.

Also could be an improvement to replace ConcurrentQueue with BufferBlock to await for new events to appear.

@jpierson
Copy link

jpierson commented Apr 28, 2016

I just ran into the scenario where an event handler contains some async operations today and found this guidance in Google Groups on the subject. From the discussion here and there it's not clear what the options are and benefits and trade offs of these options. I would like more coherent documentation exists somewhere on the subject and would be glad to do what I can to further community understanding of the subject.

@gregoryyoung
Copy link
Contributor

When using autoack the event will be acknowledged when the handler is
finished.

If you want to use async stuff turn off autoack and use manual acks eg:

MyHandler(x) {
SomeAsyncStuff(...).ContinueWith(q => subscription.Acknowledge(x))
}

On Thu, Apr 28, 2016 at 6:10 PM, Jeff notifications@github.com wrote:

I just ran into the scenario where an event handler contains some async
operations today and found this guidance in Google Groups
https://groups.google.com/forum/#!topic/event-store/sQI2SIEGU2o on the
subject. From the discussion here and there it's not clear what the options
are and benefits and trade offs of these options. If more coherent
documentation exists somewhere on the subject I'd be glad to do what I can
to further community understanding of the subject.


You are receiving this because you commented.
Reply to this email directly or view it on GitHub
#861 (comment)

Studying for the Turing test

@pgermishuys pgermishuys added this to the Unknown milestone Jun 10, 2016
@misiektg86
Copy link

misiektg86 commented Dec 11, 2016

Hi, we have few topshelf services and we do some async processing within eventAppeared handler and at the end also within this handler we use _eventStoreConnection.AppendToStreamAsync(...) method to save subscription checkpoint in the ES stream. In our case it turned out that only approach with async void Handle(EventStoreSubscription subscription, ResolvedEvent event) didn't cause service to hang after some period of time. Otherwise it hangs exactly on AppendToStreamAsync(...). I know that I should avoid async void as much as possible but here I gave up. Any ideas why only this works ?

@gregoryyoung
Copy link
Contributor

I am guessing it is a typical async problem where you are blocking things without realizing it. Perhaps you can provide an example and we can look through it.

@misiektg86
Copy link

misiektg86 commented Dec 12, 2016

Hi @gregoryyoung, we noticed that this.EventAppeared((EventStoreCatchUpSubscription) this, e); in

   protected override void TryProcess(ResolvedEvent e)
    {
      bool flag = false;
      Position? originalPosition = e.OriginalPosition;
      Position processedPosition = this._lastProcessedPosition;
      if ((originalPosition.HasValue ? (originalPosition.GetValueOrDefault() > processedPosition ? 1 : 0) : 0) != 0)
      {
        this.EventAppeared((EventStoreCatchUpSubscription) this, e);
        this._lastProcessedPosition = e.OriginalPosition.Value;
        flag = true;
      }
      if (!this.Verbose)
        return;
      this.Log.Debug("Catch-up Subscription to {0}: {1} event ({2}, {3}, {4} @ {5}).", this.IsSubscribedToAll ? (object) "<all>" : (object) this.StreamId, flag ? (object) "processed" : (object) "skipping", (object) e.OriginalEvent.EventStreamId, (object) e.OriginalEvent.EventNumber, (object) e.OriginalEvent.EventType, (object) e.OriginalPosition);
    }

is invoked synchronously. May async void Handle(EventStoreSubscription subscription, ResolvedEvent event) break projection logic ? Because the handler is async and is not awaited . Can it somehow break the order of projected events ?

@gregoryyoung
Copy link
Contributor

it is handled synchronously (by design), if you want it to be asynchronous then make it asynchronous past the handler.

@danielmarbach
Copy link
Contributor Author

danielmarbach commented Dec 12, 2016 via email

@alexeyzimarev
Copy link
Member

My suggestion is for async Task Handle(EventStoreSubscription subscription, ResolvedEvent event) to have

connection.SubscribeToStreamFrom(streamName, lastCheckpoint,
                new CatchUpSubscriptionSettings(1000, 100, true, true),
                async (_, @event) => await Publish(@event).ConfigureAwait(false),
                subscriptionDropped: HandleDroppedSubscription);

@misiektg86
Copy link

misiektg86 commented Dec 12, 2016

Yes @alexeyzimarev , but this handler is not awaited under the hood

@alexeyzimarev
Copy link
Member

but it is awaited inline, isn't it

@misiektg86
Copy link

misiektg86 commented Dec 12, 2016

Ok but what will happen within

 this.EventAppeared((EventStoreCatchUpSubscription) this, e);
        this._lastProcessedPosition = e.OriginalPosition.Value;

Will it wait until your task finish ? Or just invoke the handler and go further. What if there will be exception within your task ?

@gregoryyoung
Copy link
Contributor

gregoryyoung commented Dec 12, 2016 via email

@misiektg86
Copy link

misiektg86 commented Dec 12, 2016

I don't say I want :) I just experienced service hang because I use AppendToStreamAsync() within this handler and I'm wondering how I can overcome it :). I will try to create example to reproduce my issue

@andrii-litvinov
Copy link

I believe processing can still happen in order and asynchronously at the same time.

@gregoryyoung
Copy link
Contributor

@andrii-litvinov you can do that if you want, the point of a catchupsubscription though is it provides and ordering assurance.

@andrii-litvinov
Copy link

@gregoryyoung Sure, I understand the idea of having events processed in order. I also think that code that invokes EventAppeared delegate can await it's returning task and only after that record processed position. Of course some changes are required to the internals of a subscription, but it is possible to achieve both ordered and asynchronous processing. What I am trying to say is that asynchronous processing not necessarily mean parallel.

@gregoryyoung
Copy link
Contributor

gregoryyoung commented Dec 12, 2016 via email

@misiektg86
Copy link

I have checked it and order is preserved only in blocking manner of course(no async await). We apparently need to change the way we handle this eventappeared handler.

@andrii-litvinov
Copy link

What would be the point of awaiting an async handler?

The point would be to preserve the order. To record _lastProcessedPosition and to invoke a handler for a next event only after handler have finished processing of current event. And, at the same time, being able to perform some asynchronous I/O in the handler.

@gregoryyoung
Copy link
Contributor

gregoryyoung commented Dec 12, 2016 via email

@andrii-litvinov
Copy link

andrii-litvinov commented Dec 12, 2016

That's basically how I do it, I use BufferBlock to schedule async operation. And then process events from the BufferBlock sequentially and asynchronously. It requires a bit of additional bookkeeping but otherwise it works well. It just would be nice if EventStore could provide similar way of consuming events out of the box as almost any handler does perform some I/O which is async. An example case could be to process a command and to write generated events to EventStore.

@andrii-litvinov
Copy link

@misiektg86, sure, the code is below. We use is with persistent subscription and it also allows to process messages concurrently where applicable. E.g. commands to place or modify same order will be processed sequentially, but commands for different orders will be processed in parallel (example is completely artificial). And we use BufferBlock from TPL Dataflow. I believe you can strip out the code that is irrelevant in you case, but I hope the idea makes sense. Otherwise please ask.

public class MessageBalancer : IDisposable
{
	private readonly BufferBlock<MessageItem>[] _buckets;
	private readonly ILogger _logger;

	public MessageBalancer(ILogger logger, int? degreeOfParallelism = null)
	{
		_logger = logger;
		_buckets = Enumerable.Range(0, degreeOfParallelism ?? Environment.ProcessorCount).Select(i => new BufferBlock<MessageItem>()).ToArray();
		foreach (BufferBlock<MessageItem> block in _buckets) Consume(block);
	}

	public async Task<ExecutionResult> Dispatch<TMessage, TKey>(TMessage message, Func<TMessage, TKey> keySelector, Func<TMessage, Task> handler) where TMessage : IMessage
	{
		int index = GetIndex(keySelector(message));
		ITargetBlock<MessageItem> block = _buckets[index];
		var item = new MessageItem { Message = message, CompletionSource = new TaskCompletionSource<ExecutionResult>(), Handler = message1 => handler((TMessage)message1) };
		await block.SendAsync(item).ConfigureAwait(false);
		return await item.CompletionSource.Task.ConfigureAwait(false);
	}

	private int GetIndex<T>(T key)
	{
		int hashCode = key.GetHashCode();
		unchecked
		{
			// A hash code can be negative, and thus its remainder can be negative also.
			// Do the math in unsigned ints to be sure we stay positive.
			return (int)((uint)hashCode % (uint)_buckets.Length);
		}
	}

	private async void Consume(ISourceBlock<MessageItem> block)
	{
		while (true)
		{
			MessageItem messageItem;
			try
			{
				messageItem = await block.ReceiveAsync().ConfigureAwait(false);
			}
			catch (InvalidOperationException e)
			{
				_logger.Info(e, "Block was marked as complete.");
				break;
			}

			try
			{
				Task task = messageItem.Handler(messageItem.Message);
				var result = task as Task<ExecutionResult>;

				if (result != null)
				{
					messageItem.CompletionSource.SetResult(await result);
				}
				else
				{
					await task;
					messageItem.CompletionSource.SetResult(ExecutionResult.Acknowledge());
				}
			}
			catch (Exception e)
			{
				_logger.Error(e);
				messageItem.CompletionSource.SetResult(ExecutionResult.Retry(e.Message));
			}
		}
	}

	private class MessageItem
	{
		public IMessage Message { get; set; }
		public TaskCompletionSource<ExecutionResult> CompletionSource { get; set; }
		public Func<IMessage, Task> Handler { get; set; }
	}

	public void Dispose()
	{
		foreach (BufferBlock<MessageItem> block in _buckets) block.Complete();
	}
}

@misiektg86
Copy link

Thanks @andrii-litvinov

@misiektg86
Copy link

Hi @gregoryyoung finally I found the issue with deadlock. It turns out that you cannot access EventStore from Connection.Connected handler on the same thread. Here https://github.com/misiektg86/SubscriptionDeadlockExample I have created example with steps to show this issue.

@gregoryyoung
Copy link
Contributor

gregoryyoung commented Dec 14, 2016 via email

@hayley-jean
Copy link
Member

@misiektg86 Currently, the events raised by the connection are done so on the same thread that the connection is running on.

So when the connection raises the Connected event, it waits for your handler to complete before continuing processing. The handler tries to call a read on the connection, but it needs to wait for the connection to finish calling these handlers before it can process the read, which results in the deadlock you are seeing.

This is something we are aware of, and I have had a look at it, but I want to finish the work I am currently busy with before paying more attention to it.

As a work around, you can use ContinueWith() instead of waiting on the task :

 clientConnectionEventArgs.Connection.ReadStreamEventsBackwardAsync("$ce-abc", -1, 1, true,
    new UserCredentials("admin", "changeit")).ContinueWith(t =>
    {
        Console.WriteLine("After reading stream in connected handler");
    });

@Salgat
Copy link
Contributor

Salgat commented Feb 11, 2017

@andrii-litvinov This is what we do also but it becomes an issue when your stream has hundreds of thousands of events and need to throttle how many are coming in (which is why async support would be so useful).

@andrii-litvinov
Copy link

@Salgat, PersistentSubscription does that throttling automatically which can be controlled by buffer size parameter. In ChatchUpSubscrition I ended up closing subscription when certain threshold is reached and opening it again when events get processed.

And I very much agree that it would be nice to have async support out of the box here.

@Salgat
Copy link
Contributor

Salgat commented Feb 11, 2017

@andrii-litvinov Are you sure? I was under the impression that, while internally it buffers to be able to provide events in a performant manner, it will still try to provide events as fast as your handler can accept them. If that's not the case, what setting in http://docs.geteventstore.com/dotnet-api/3.2.0/competing-consumers/ are you referring to? Also, your idea of closing catchup subscriptions as a throttling mechanism is very interesting, I may look into using that.

@andrii-litvinov
Copy link

@Salgat, There is parameter int bufferSize in ConnectToPersistentSubscription method. I think the way you understand it is correct. In PersistentSubscription I also use manual acks to ensure that subscription sends no more events than specified in bufferSize to the handler.

@gregoryyoung
Copy link
Contributor

gregoryyoung commented Feb 11, 2017 via email

@pgermishuys pgermishuys removed this from the Unknown milestone Apr 4, 2017
@rodolfograve
Copy link
Contributor

I'm adding my vote and more arguments in favor of adding support for async handlers. As @andrii-litvinov and @Salgat have said before, it's not about parallelism but async I/O.

With the current API, we are forced to do:

OnEventAppeared(...) => EventStoreConnection.AppendToStreamAsync(...).Wait()

The code above clearly demonstrates the irony of GetEventStore having an async API and being unable to use it.

With all the new async APIs available, it can be much worse, like:

OnEventAppeared(...)
{
fileStream.ReadAsync(...).Wait();
sqlDataReader.ReadAsync(...).Wait();
eventStoreConnection.AppendToStreamAsync(...).Wait();
smtpClient.SendAsync(...).Wait();
}

The code above will waste a Thread Pool thread, blocked doing nothing while network and disk operations are performed.

I agree that when the only component using threads from the Thread Pool is the EventStoreConnection all this is very much irrelevant, since only one of the threads from the Thread Pool will be blocked and it will have no impact on the application.

However, there are plenty of scenarios where the EventStoreConnection will be sharing the Thread Pool with other components. For instance, when running a service that connects to multiple GetEventStore servers and hosts OWIN with WebApi and SignalR middlewares. As you can imagine, that's our case.

To make it worse, we also try to run our services on very small VMs which have 2 o 4 virtual cores (which means the optimal number of threads in the Thread Pool is around 2 o 4), and we are currently suffering the consequences of having to .Wait inside OnEventAppeared: WebApi and SignalR calls slow down, EventStore misses hearbeats and disconnects, etc.

Adding an Actor that enqueues the messages and processes them on a separate thread is definitely possible, but adds a lot of complexity on our side, having to replicate a lot of functionality that GetEventStore already provides, like throttling.

Both WebApi and SignalR already implement a pattern whereby if a handler is a Task, it is awaited by the "engine". If it isn't, it's run synchronously.

I hope the GetEventStore team listens to this feedback. Adding this shouldn't be very hard and shouldn't add too much complexity, and in my opinion it will bring GetEventStore fully into the new async world.

One thing is for sure: my team and I would benefit a lot from having this, and we are even willing to provide a PR if necessary.

@gregoryyoung
Copy link
Contributor

gregoryyoung commented May 12, 2017 via email

@Salgat
Copy link
Contributor

Salgat commented May 13, 2017 via email

@gregoryyoung
Copy link
Contributor

gregoryyoung commented May 13, 2017 via email

@rodolfograve
Copy link
Contributor

rodolfograve commented May 13, 2017

@gregoryyoung I think I completely understand your position. Supporting async operations is more complex than not, and as the owners of the library it's also your decision. However, please consider:

Why can't you schedule this yourself?

Sure we can, we now need to, and we will have to deal with the extra complexities.

If we set these as the goals of the perfect solution:

  1. Release the thread that is running OnEventAppeared back to the thread pool as long as an async operation is executed (examples in my previous post)
  2. Prevent parallel processing of events, i.e. only one event can be processed at a time, thus guaranteeing the order. This is not about parallelism but asynchronicity.
  3. A big extra would be to keep the solution compatible with Persistent Subscriptions, i.e. an event should not be considered consumed by the client until it has been really processed, not just read off into a queue from where it will be processed later

The first and most simplistic approach that comes to mind has already been described and consists of having an Actor processing the messages. That frees up the thread running OnEventAppeared immediately, but it has a few important issues:

  1. All messages in the stream will be read as fast as GetEventStore can into the client's memory. Other than disconnecting/reconnecting periodically (as @andrii-litvinov mentioned) I can't see any other way of implementing throttling without blocking the thread. Apart from being inefficient, I hope we can all agree that this is not easy to implement.
  2. Because of 1, this solution doesn't satisfy goal 3 (although perhaps it can be made to by using manual acks as @gregoryyoung mentioned above?).
  3. It forces the OnEventAppeared handler to assume all operations are async, when in fact, some may, some may not.

Given all the above, I would argue that the point of libraries is to prevent all that complexity from being dumped on the clients.

Perhaps a compromise would be to add an extra class to the API that would handle all of this? I understand this is how catch up subscriptions came to exist, to handle the complexity of transitioning from catching up with existing events to live events, instead of letting clients deal with it.

The GetEventStore team obviously know their threading model the best and could implement this in the best possible way. This approach would give them control if/when they decide to change the whole thing, as long as the async contract is kept?

I know I'm putting a lot on the same post but I'm trying to be complete in my analysis in the hope that we can get either a change in the library or a recommended approach to achieve all goals 1, 2, and 3.

Cheers

@rodolfograve
Copy link
Contributor

rodolfograve commented May 13, 2017

@Salgat

The thread blocking problem exists in one form or another

I think it's OK to block a dedicated thread. If you have good reasons to need a dedicated background thread in the first place, that is. What I don't think is OK is to block a thread pool thread.

It also doesn't have to be like that. All I/O operations can be done asynchronously. It might make other things more complex, or break your current model, but it certainly could be done in a way that all network operations inside the connection could be handled asynchronously, that is, not parallel but not blocking.

@gregoryyoung

I agree that if your interface wasn't async, expectations wouldn't be so high :-) It would probably also make GetEventStore less scalable and ultimately less attractive.

I should also add that GetEventStore is giving us so much already, and we really like it. We use it as a message bus through which applications communicate, as well as a more typical event store, and so far it has been able to handle it all impressively. The ops story is also great.

This is me asking even more from it!

@rodolfograve
Copy link
Contributor

rodolfograve commented May 16, 2017

Issues #1051, #1144 and #1179 are all asking for support for async handlers and better usage of thread pool threads.

@rodolfograve
Copy link
Contributor

Hi.

I have just created PR #1310 to implement this: Func<TConnection, ResolvedEvent, Task>

@jageall
Copy link
Contributor

jageall commented Sep 27, 2019

Looks like this was fixed with #1310

@jageall jageall closed this as completed Sep 27, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
kind/question Issues which are questions
Projects
None yet
Development

No branches or pull requests