Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Missing RetryUntil/RetryWhen #449

Closed
JanEggers opened this issue Nov 23, 2017 · 4 comments
Closed

Missing RetryUntil/RetryWhen #449

JanEggers opened this issue Nov 23, 2017 · 4 comments

Comments

@JanEggers
Copy link

i tried to implement RetryUntil in my code with:

public static Task<T> RetryUntil<T,TOther>(this object any, IObservable<T> obs, IObservable<TOther> end)
        {
            using ( var cts = new CancellationTokenSource())
            using (end.Subscribe(v => cts.Cancel(true)))
            {
                return obs
                    .Retry()
                    .TakeUntil(end)
                    .ToTask(cts.Token);
            }
        }

but that gives me an invalidOperationException: "Sequence Contains no Elements". if end publishes a value.

Is there any way i can get a TaskCancellation Exception.

Also RetryWhen is missing from Rx.net and id like to retry after some amount of time rather than instant. How can i do that without RetryWhen?

Is there any reason the above methods are not included? / may i submit a pr?

@akarnokd
Copy link
Collaborator

akarnokd commented Nov 23, 2017

You could emulate RetryWhen via existing operators and subjects (although the most reliable way would be a custom IObservable implementation):

public static IObservable<T> RetryWhen<T, U>(this IObservable<T> source,
    Func<IObservable<Exception>, IObservable<U>> handler)
{
    return Observable.Defer(() =>
    {
        var errorSignal = new Subject<Exception>();
        var retrySignal = handler(errorSignal);
        var sources = new BehaviorSubject<IObservable<T>>(source);

        return Observable.Using(
            () => retrySignal.Select(s => source).Subscribe(sources),
            r => sources.Select(src =>
                src.Do(v => { }, e => errorSignal.OnNext(e), () => errorSignal.OnCompleted())
                .OnErrorResumeNext(Observable.Empty<T>())
            ).Concat()
        )
        ;
    });
}

Example:

int[] count = { 3 };

Observable.Defer(() =>
{
    if (count[0]-- == 0)
    {
        return Observable.Return("Success");
    }
    return Observable.Throw<String>(new Exception());
})
.RetryWhen(
    f => f.SelectMany(e =>
    {
        Console.WriteLine("Retrying...");
        return Observable.Timer(TimeSpan.FromSeconds(1));
    })
)
.Subscribe(Console.WriteLine, Console.WriteLine, () => Console.WriteLine("Done"));

Thread.Sleep(5000);

Is there any reason the above methods are not included?

Some ReactiveX libraries evolve faster than others and not all features are available everywhere. With C# and extension methods, you are in a much better position to create your own custom operators.

@JanEggers
Copy link
Author

thx for explaining

@quinmars
Copy link
Contributor

PR #486 adds a RetryWhen operator and is already merged. Can this issue be closed?

@JanEggers
Copy link
Author

yup thx

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants