Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature request] Implement ToObservable for CancellationToken #1773

Open
noseratio opened this issue May 30, 2022 · 6 comments
Open

[feature request] Implement ToObservable for CancellationToken #1773

noseratio opened this issue May 30, 2022 · 6 comments

Comments

@noseratio
Copy link

noseratio commented May 30, 2022

Feature request

A function to convert a CancellationToken token into an observable that emit OnError with OperationCanceledException when the token is signaled. It can possibly be implemented like this:

public static class RxExt
{
    public static IObservable<T> ToObservable<T>(this CancellationToken @this)
    {
        if (@this.IsCancellationRequested)
        {
            return Observable.Throw<T>(new OperationCanceledException(@this));
        }
        if (!@this.CanBeCanceled)
        {
            return Observable.Never<T>();
        }
        return Observable.Create<T>(
            observer => @this.Register(
                (_, token) => observer.OnError(new OperationCanceledException(token)), 
                null));
    }
}

Which next library version (i.e., patch, minor or major)?

Patch or Minor

What are the platform(s), environment(s) and related component version(s)?

All supported platforms

How commonly is this feature needed (one project, several projects, company-wide, global)?

Company-wide

Please describe the feature.

Here's one possible use case, coupled with TakeUntil (I'm sure there are many others). Here, the Finally action is called upon the first item emit or when the token is signaled.

This code here is a bit contrived (I might as well use IObservable<T>.ToTask or IObservable<T>.RunAsync) but in our real-life Rx piple-lines this use case does makes sense:

IDisposable RunUntilCancelled(IObservable<long> sequence, Action action, CancellationToken token)
{
    return sequence
        .FirstAsync()
        .TakeUntil(token.ToObservable<long>())
        .Finally(action)
        .Publish()
        .Connect();
}

async Task test_CancellationToken_ToObservable()
{
    var sequence = Observable
        .Interval(TimeSpan.FromMilliseconds(1000))
        .Do(n => Console.WriteLine(n))
        .Skip(3);

    var sw = Stopwatch.StartNew();
    var tcs = new TaskCompletionSource();
    
    using var cts = new CancellationTokenSource(2500);
    using var subscription = RunUntilCancelled(sequence, tcs.SetResult, cts.Token);

    await tcs.Task;
    var lapse = sw.ElapsedMilliseconds;
    Trace.Assert(lapse >= 2500 && lapse < 3000);
}

Updated, the initial version was rather naive, a race condition has emerged since I stated using it. Here is an updated version aiming to solve it, and also giving a choice whether to use OnError or OnNext:

public static class RxExt
{
    public static IObservable<CancellationToken> ToObservable(
        this CancellationToken @this, 
        bool useOnError = true)
    {
        if (@this.IsCancellationRequested)
        {
            return useOnError ?
                Observable.Throw<CancellationToken>(new OperationCanceledException(@this)) :
                Observable.Return(@this);
        }

        if (!@this.CanBeCanceled)
        {
            return Observable.Never<CancellationToken>();
        }

        return Observable.Create<CancellationToken>(
            observer =>
            {
                int disposed = 0;
                bool IsListening() => Interlocked.CompareExchange(ref disposed, 0, 0) == 0;

                var rego = @this.Register((_, token) =>
                {
                    if (useOnError)
                    {
                        if (IsListening())
                        {
                            observer.OnError(new OperationCanceledException(token));
                        }
                    }
                    else
                    {
                        if (IsListening())
                        {
                            observer.OnNext(token);
                        }
                        if (IsListening())
                        {
                            observer.OnCompleted();
                        }
                    }
                }, state: null);

                return Disposable.Create(() =>
                {
                    if (Interlocked.CompareExchange(ref disposed, 1, 0) == 0)
                    {
                        rego.Unregister();
                    }
                });
            });
    }
}
@noseratio
Copy link
Author

Also good to see I'm not alone who finds this feature useful: https://stackoverflow.com/a/65202543/1768303

@quinmars
Copy link
Contributor

quinmars commented Jun 2, 2022

I do see how this can be useful. I don't have a clear opinion yet (nor am I who can make a decision) nonetheless I want to share my thoughts.

The semantics of cancellation of TPL and Rx are quite different. In Rx, you cancel a subscription, that is, you request to stop consumption of new notifications. On the other hand in TPL, a cancellation is a request to stop the production. Due to its imperative nature, this works with exception. The consumer has to actively consider the canceled outcome. Having this in mind, I'm not sure how an IObservable representation of a TPL cancellation token should like or if it is reasonable to have one. Instead of a ToObservable operator for the cancellation token, a Throw operator, which takes an cancellation token, might be more reasonable.

Maybe it is more fertile to look on the actual use cases. I think in your cases the TakeUntil operator from the cited SO question, would work very well. It does not blur the semantics of cancellations. It stops the consumption of the observable. With a completion notification however, but I would not expect anything different from a TakeUntil operator. There are no alien OperationCanceledException exception, with which you usually don't want to deal in the Rx world.

@noseratio
Copy link
Author

noseratio commented Jun 3, 2022

Hi @quinmars, thanks for replying!

With a completion notification however, but I would not expect anything different from a TakeUntil operator. There are no alien OperationCanceledException exception, with which you usually don't want to deal in the Rx world.

FWIW, if I complete the TakeUntil observable via OnComplete rather than OnError as I do above, that stream never ends. Here's a fiddle. That said, I'm now using a custom implementation of WithCancellation (in that fiddle too).

@cabauman
Copy link
Contributor

cabauman commented Jun 3, 2022

FWIW, if I complete the TakeUntil observable via OnComplete rather than OnError as I do above, that stream never ends.

Just need to include an OnNext right before the OnCompleted (TakeUntil looks for an emission rather than completion).

@noseratio
Copy link
Author

I've update the code to offer a choice between OnError and OnNext as well as to deal with a not-so-obvious race condition.

@A9G-Data-Droid
Copy link

My $0.02: I am using the TakeUntil<TSource>(this IObservable<TSource> source, CancellationToken cancellationToken) extension from that SO answer and it works great.

I am surprised that this isn't already a feature. It works great with awaiting an observable. If the common TPL pattern is that every awaitable has an optional cancel token, it seems like an oversight that you can await an observable but not cancel it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants