Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async / Await with IObserver<T> #459

Closed
michaelkreeger opened this issue Jan 12, 2018 · 18 comments
Closed

Async / Await with IObserver<T> #459

michaelkreeger opened this issue Jan 12, 2018 · 18 comments

Comments

@michaelkreeger
Copy link

michaelkreeger commented Jan 12, 2018

So, I have a very simple eventing system which is surprising difficult it seems to make work right with Rx.NET. I have producers and consumers with an at least once message guarantee. The consumers utilize both IObserverable to pull messages from an event source log and IObserver to handle those messages. The handling / observing of the messages requires interfacing with external system which are wrapped in API's that return Tasks. Following best practices we should async / await the tasks all the way up the stack. Here comes the issue, IObserver does not support returning a Task. This means some sort of blocking operation must be done against the returned Task in the implementation of the IObserver which violates best practice and introduces a lot of risk for deadlocks and other threading issues depending on hosting platform. So, should I use Rx.NET or roll my own here. Is there a solution to this that I’m just not seeing?

@michaelkreeger michaelkreeger changed the title Async / Await ... Is Rx.NET the right tool Async / Await with IObserver<T> Jan 13, 2018
@davidnemeti
Copy link

davidnemeti commented Jan 15, 2018

If I understand you correctly, you would like something like this (however, as you pointed out properly, this should not be done like this):

// !!! should NOT do this, because Subscribe cannot handle an async function properly !!!
Observable.Interval(TimeSpan.FromSeconds(1))
    .Subscribe(async number => await DoSomeWorkAsync(number));    // !!! Subscribe does not (a)wait for this async lambda

So, this should not be done, because Subscribe does not (a)wait for this async lambda (even though you await for the execution of the DoSomeWorkAsync method inside the async lambda), thus you would introduce uncontrollable concurrency this way; moreover, an async lambda which is not (a)waited is problematic anyway (the application might end before the lambda would end properly, and the exceptions - which the lambda might throw - can also cause problems).

Of course, you should not wait for the execution of the DoSomeWorkAsync method either (thus creating a sync lambda), because this way you would block your caller (which is the observable which is notifying you right now) for the whole execution of the DoSomeWorkAsync method (which is long running).

So, in case of an async operation (which is probably long running, that is why it is async in the first place) you should not call it from inside the Subscribe method, but "earlier"; you should turn your Task into an IObservable:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(number => Observable.FromAsync(async () => await DoSomeWorkAsync(number)))
    .Concat()
    .Subscribe();

or:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(number => Observable.FromAsync(async () => await ReturnSomethingAsync(number)))
    .Concat()
    .Subscribe(number => Console.WriteLine($"number: {number}"));

In the above examples, Select returns an IObservable for each number, thus it projects from IObservable<long> to IObservable<IObservable<Unit>> and IObservable<IObservable<long>>, respectively.

Concat ensures that there will be no overlapping in the execution of the tasks, which you can verify e.g. by using the following implementations for the above async methods:

async Task DoSomeWorkAsync(long number)
{
    Console.WriteLine($"DoSomeWorkAsync BEGIN '{number}'");
    await Task.Delay(TimeSpan.FromSeconds(3));
    Console.WriteLine($"DoSomeWorkAsync END '{number}'");
}

async Task<long> ReturnSomethingAsync(long number)
{
    Console.WriteLine($"ReturnSomethingAsync BEGIN '{number}'");
    await Task.Delay(TimeSpan.FromSeconds(3));
    Console.WriteLine($"ReturnSomethingAsync END '{number}'");
    return number * 10;
}

If you use the Merge method instead of Concat, then you can introduce some concurrent computing, and by specifying a proper parameter to Merge you can even determine the maximum number of concurrent task executions.

And if you were to execute a long running sync method, you should use Observable.Start (preferably combined with Observable.Defer):

Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(number => Observable.Defer(() => Observable.Start(() => LongRunningWork(number))))
    .Concat()
    .Subscribe();

BTW, Observable.FromAsync(...) is just a shortcut to Observable.Defer(() => Observable.StartAsync(...)), which is the async counterpart of the sync Observable.Defer(() => Observable.Start(...)) pair. Somehow, there is no Observable.FromSync(...) method in Rx.NET.

@michaelkreeger
Copy link
Author

Thank you. This is helpful. My next concern though is more from a pattern / design perspective. It appears the only way to handle an observer that may take some time (use Tasks) is to implement it as an observable. From a design perspective it seems wrong to implement what is logically an observer as an observable. Where am I wrong? Have I missed something with the observer pattern, Rx.NET design, other?

@davidnemeti
Copy link

davidnemeti commented Jan 15, 2018

Actually, these Rx LINQ methods (which take IObservable as input parameter) observe their input IObservable.

In a way, these methods are similar to the Subscribe method (which also takes IObservable as input parameter). However, the Subscribe method immediately subscribes to its input IObservable when it is being called, whereas these LINQ methods typically yield another IObservable as output (they transform their input to another output), and they will only subscribe to their input IObservable when someone subscribes to their output IObservable (we can call it a "deferred subscription").

An observer is something that can subscribe to and react to an observable; it defines what should be done when a next item is yielded, or upon completion/error.

So, if we took an Rx LINQ method, excluded its IObservable input parameter, and excluded its subscription logic which subscribes to this IObservable input parameter, and wrapped the remaining logic together with its remaining input parameters and its output IObservable, and put them into an object, we would get an observer: it could subscribe to observables, and react to them by yielding items on its "output" IObservable.

So, this Select actually subscribes to and observes its source IObservable when someone subscribes to its output IObservable:

source
    .Select(number => Observable.FromAsync(async () => await DoSomeWorkAsync(number)))

Now, to be honest, when I started learning Rx, I also found it strange that there were no such thing as SubscribeAsync, which would have got an async observer or async onNextAsync, etc. lambdas as parameters. We could have something like this:

public static class MyObservableExtensions
{
    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<Task> onNextAsync) =>
        source
            .Select(number => Observable.FromAsync(onNextAsync))
            .Concat()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<Task> onNextAsync) =>
        source
            .Select(number => Observable.FromAsync(onNextAsync))
            .Merge()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<Task> onNextAsync, int maxConcurrent) =>
        source
            .Select(number => Observable.FromAsync(onNextAsync))
            .Merge(maxConcurrent)
            .Subscribe();
}

Here, SubscribeAsync - just like standard Subscribe - ensures that the onNextAsync function gets called in a serialized fashion, i.e. the several executions of the onNextAsync function do not overlap (it satisfies the Rx contract).

On the other hand, SubscribeAsyncConcurrent methods do not give this guarantee: the executions of their onNextAsync function might overlap (it does not satisfy the Rx contract, since it introduces concurrency).

Of course, you could use these SubscribeAsync and SubscribeAsyncConcurrent methods only for the last async task (typically, if you have only one, single task) that should be applied to you observable. In all other cases, you should keep yielding observables to your other tasks:

Observable.Range(0, 30)
	.Select(number => Observable.FromAsync(() => CalculateLongRunningTask1(number)))
	.Merge(5)
	.Select(number => Observable.FromAsync(() => CalculateLongRunningTask2(number)))
	.Merge(10)
Observable.Range(0, 30)
	.Select(number => Observable.FromAsync(async () => await CalculateLongRunningTask1(number)))
	.Select(number => Observable.FromAsync(async () => await CalculateLongRunningTask2(await number)))
	.Merge(10)

BTW, if we implemented these SubscribeAsync and SubscribeAsyncConcurrent async methods, we should also implement their sync methods counterparts which would take care for long running sync operations by using Observable.Defer(() => Observable.Start(...)) in their implementations.

@michaelkreeger
Copy link
Author

Perfect, thanks. This answers my questions.

@danielcweber
Copy link
Collaborator

Perfect, thanks. This answers my questions.

@ghuntley Please close.

@dominikjeske
Copy link

I just want to clarify - there is no build in way in Rx to handle async method in Subscribe?

@asinitson
Copy link

asinitson commented Jul 10, 2019

What would be really nice to have is equivalent of this (from RxJS):
await observable.take(1).toPromise();

EDIT: Never mind, actually there is equivalent which works just fine:
await observable.Take(1);

@gentledepp
Copy link

gentledepp commented Nov 19, 2019

@davidnemeti thank you so much for these detailed clarifications.
When I talked to my team about these issues, one question came up: Namely, if it made any difference if we would use asyncOnNext().ToObservable() instead of 'Observable.FromAsync(() => asyncOnNext(x)'

For example

instead of

public static class MyObservableExtensions
{
    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<Task> onNextAsync) =>
        source
            .Select(number => Observable.FromAsync(onNextAsync))
            .Concat()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<Task> onNextAsync) =>
        source
            .Select(number => Observable.FromAsync(onNextAsync))
            .Merge()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<Task> onNextAsync, int maxConcurrent) =>
        source
            .Select(number => Observable.FromAsync(onNextAsync))
            .Merge(maxConcurrent)
            .Subscribe();
}

could we write

public static class MyObservableExtensions
{
    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<Task> onNextAsync) =>
        source
            .Select(number => onNextAsync().ToObservable()) // note ToObservable instead of FromAsync!
            .Concat()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<Task> onNextAsync) =>
        source
            .Select(number => onNextAsync().ToObservable()) // note ToObservable instead of FromAsync!
            .Merge()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<Task> onNextAsync, int maxConcurrent) =>
        source
            .Select(number => onNextAsync().ToObservable()) // note ToObservable instead of FromAsync!
            .Merge(maxConcurrent)
            .Subscribe();
}

and if no, what is the difference between 'Observable.FromAsync' and '.ToObservable'?
This question is just too hard for me 😫
@bruzkovsky fyi

@davidnemeti
Copy link

davidnemeti commented Nov 19, 2019

@gentledepp, Observable.FromAsync and .ToObservable are different.

The difference is that Observable.FromAsync is deferred, while .ToObservable is not; i.e. the latter will execute the given async function immediately once a new item is being yielded from the observable.

This difference is due to the fact that the parameter of Observable.FromAsync(...) is a Func<Task>, while the parameter of .ToObservable is a Task; i.e. the former is a not-yet-started asynchronous operation which can be started when it is desired, while the latter is an already-started asynchronous operation.

It means that when you create an observable by using Observable.FromAsync(...) the subscriber can control the starting of the asynchronous operation, while by using .ToObservable the subscriber cannot control the starting of the asynchronous operation because that operation has already started.

You can check this behavior by running the following example:

void Main()
{
    Observable.Interval(TimeSpan.FromSeconds(1))
        .SubscribeAsync(number => DoSomeWorkAsync(number));
}

async Task DoSomeWorkAsync(long number)
{
    Console.WriteLine($"DoSomeWorkAsync BEGIN '{number}'");
    await Task.Delay(TimeSpan.FromSeconds(3));
    Console.WriteLine($"DoSomeWorkAsync END '{number}'");
}

#if true
public static class MyObservableExtensions
{
    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync) =>
        source
            .Select(number => Observable.FromAsync(() => onNextAsync(number)))
            .Concat()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync) =>
        source
            .Select(number => Observable.FromAsync(() => onNextAsync(number)))
            .Merge()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync, int maxConcurrent) =>
        source
            .Select(number => Observable.FromAsync(() => onNextAsync(number)))
            .Merge(maxConcurrent)
            .Subscribe();
}
#else
public static class MyObservableExtensions
{
    public static IDisposable SubscribeAsync<T>(this IObservable<T> source, Func<T, Task> onNextAsync) =>
        source
            .Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync!
            .Concat()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync) =>
        source
            .Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync!
            .Merge()
            .Subscribe();

    public static IDisposable SubscribeAsyncConcurrent<T>(this IObservable<T> source, Func<T, Task> onNextAsync, int maxConcurrent) =>
        source
            .Select(number => onNextAsync(number).ToObservable()) // note ToObservable instead of FromAsync!
            .Merge(maxConcurrent)
            .Subscribe();
}
#endif

If you use the Observable.FromAsync(...) version then the asynchronous operations do not overlap. However, if you use the .ToObservable version then the asynchronous operations do overlap.

@gentledepp
Copy link

@davidnemeti thank you so much for this explanation! Suddenly everything makes sense.

So I would use ToObservable only if I knew, that the Task has already completed such as when using 'Task.FromResult()' for the observable.

Can I buy you a beer somehow? 🍺

@davidnemeti
Copy link

davidnemeti commented Nov 20, 2019

@gentledepp, you are welcome. Unfortunately, I don't like beer, so a simple "like" would be enough. :-)

Regarding your question about .ToObservable: of course, you could use it when you know that the Task has already completed, but I think that's not the point.

The point is that you should try to avoid working with pure Task and using .ToObservable in the first place; rather you should work with Func<Task> and use Observable.FromAsync.

Sure, if you only have a Task in your hand, then you have no other choice but to use .ToObservable. But in the overwhelming majority of cases you have more than this: you have Func<Task>, because you have async methods which represent asynchronous operations.

Thus, since you can work with Func<Task>, you can (should) use Observable.FromAsync.

Personally, I do not use .ToObservable, because it prevents me from controlling the starting of the asynchronous operations and thus controlling concurrency. Of course, you could add .Concat and .Merge to the observable (believing that you will control concurrency this way), but it does not really matter which of them you use, because by that time you already lost the possibility to control concurrency.

On the other hand, Observable.FromAsync allows me to control concurrency by adding .Concat and .Merge to the observable, and everything behaves exactly as it should.

@wh1t3cAt1k
Copy link

wh1t3cAt1k commented Oct 14, 2020

@davidnemeti sorry for necroposting, but I am wondering if it is possible, in your first example, to asynchronously handle not every element of the first sequence:

  1. Start the asynchronous handler as soon as the element arrives
  2. Drop arriving elements while the handler executes
  3. Resume "listening" when the handler finishes?

@davidnemeti
Copy link

@wh1t3cAt1k sure you can achieve that.

You could use the .Latest() function for that, however it is a blocking function:

Observable.Interval(TimeSpan.FromSeconds(1))
    .Latest()
    .Select(number => Observable.FromAsync(async () => await DoSomeWorkAsync(number)))
    .Concat();

E.g. it blocks until the first element arrives.

A more sophisticated solution could be to introduce our own function which is non-blocking and you can even introduce concurrency if needed:

public static class MyObservableExtensions
{
    public static IObservable<TResult> SelectAndOmit<T, TResult>(this IObservable<T> source, Func<T, IObservable<TResult>> process, Action<T> noProcess, int maximumConcurrencyCount = 1)
    {
        var semaphore = new SemaphoreSlim(initialCount: maximumConcurrencyCount, maxCount: maximumConcurrencyCount);

        return source
            .SelectMany(item =>
            {
                if (semaphore.Wait(millisecondsTimeout: 0))
                {
                    return Observable.Return(process(item).Finally(() => { semaphore.Release(); }));
                }
                else
                {
                    noProcess(item);
                    return Observable.Empty<IObservable<TResult>>();
                }
            })
            .Merge(maximumConcurrencyCount);
    }
}

You can use like this:

Observable.Interval(TimeSpan.FromSeconds(1))
    .SelectAndOmit(
        number => Observable.FromAsync(async () => await DoSomeWorkAsync(number)),
        number => Console.WriteLine($"no work {number}")
    );

With a maximum number of 2 concurrent processes:

Observable.Interval(TimeSpan.FromSeconds(1))
    .SelectAndOmit(
        number => Observable.FromAsync(async () => await DoSomeWorkAsync(number)),
        number => Console.WriteLine($"no work {number}"),
        maximumConcurrencyCount: 2
    );

@wh1t3cAt1k
Copy link

wh1t3cAt1k commented Oct 15, 2020

@davidnemeti thanks a lot!

It looks like there are multiple ways to achieve this, as usual with Rx.

Someone helped me out on StackOverflow, too: see "ExhaustMap" implementation: https://stackoverflow.com/questions/64353907/how-can-i-implement-an-exhaustmap-handler-in-rx-net

However, it does not have concurrency control as your solution does.

jfoshee added a commit to IntBasis/IntBasis.DocumentOriented that referenced this issue Sep 4, 2022
@Metadorius
Copy link

Metadorius commented Jun 12, 2023

I am wondering if the approach suggested by @davidnemeti in this thread is still the best approach you could use when you need to run an async method in a sync context, now that AsyncRX.NET NuGet is released, for .NET Standard 2.0 even (cc @idg10, I am the dude from Reddit comments who asked about .NET FX support). I was refactoring my code (currently using ReactiveUI with WinForms) and seeing as ReactiveUI and ReactiveMarbles.ObservableEvents are yet to catch up with AsyncRX.NET it seems that I either

  • need to use the approach outlined here so that I won't need to deal with IAsyncDisposable and other stuff, or
  • convert the source-generated events ToAsyncObservable, subscribe asynchronously, and to dispose make a blocking wait for the disposable (I guess I need some IAsyncDisposable -> IDisposable wrapper too).

From the sound of it, the second option sounds a bit icky to do. Maybe there are other approaches to solving the problem that I am not seeing after a look on the source code?

@idg10
Copy link
Collaborator

idg10 commented Jun 14, 2023

AsyncRX.NET is currently still experimental, and that's partly because it's not yet clear what the full implications of using it in will be. For example, in the rxnet slack channel, @anaisbetts made this point:

I don't particularly like async Observables because they seem to break IScheduler

which gets to the heart of something that will need to be resolved if AsyncRX.NET is to be used successfully in scenarios such as this.

I haven't fully understood exactly what it is you're doing that has let to you to:

need to run an async method in a sync context

Did Ani already unblock you on the rxnet channel? If not, could you explain what you're doing that needs this?

@szmcdull
Copy link

szmcdull commented Oct 25, 2023

How to turn Subscribe(onNext, onError, onComplete) into async/await version, with 3 different callback?

@mihaly-bence16
Copy link

mihaly-bence16 commented Dec 8, 2023

How to turn Subscribe(onNext, onError, onComplete) into async/await version, with 3 different callback?

I would say that, create extension methods.

Something like:

public static IDisposable SubscribeAsync<T> (this IObservable<T> source, Func<T, Task> onNextAsync)
{
    return source.SubscribeAsync(onNextAsync, ex => throw ex, () => { });
}

public static IDisposable SubscribeAsync<T> (this IObservable<T> source, Func<T, Task> onNextAsync, Action onCompleted)
{
    return source.SubscribeAsync(onNextAsync, ex => throw ex, onCompleted);
}

public static IDisposable SubscribeAsync<T> (this IObservable<T> source, Func<T, Task> onNextAsync, Action<Exception> onError)
{
    return source.SubscribeAsync(onNextAsync, onError, () => { });
}

public static IDisposable SubscribeAsync<T> (this IObservable<T> source,
    Func<T, Task> onNextAsync, Action<Exception> onError, Action onCompleted)
{
    return source
        .Select(param => Observable.FromAsync(() => onNextAsync(param)))
        .Merge()
        .Subscribe(
            unit => { },
            ex => onError(ex),
            onCompleted);
}

Correct me if I am wrong.

Edit: notice I am not re-throwing the exceptions and I am not sure about the consequences.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests