New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reactive Overloads for Static Parameters #18

Open
mattpodwysocki opened this Issue Oct 13, 2014 · 11 comments

Comments

Projects
None yet
7 participants
@mattpodwysocki
Contributor

mattpodwysocki commented Oct 13, 2014

Copied from https://rx.codeplex.com/workitem/44

Several operators in Rx accept static parameters such as Int32 and TimeSpan. Given that Rx is a reactive framework, wouldn't it make sense to provide observable variants as well?

In many cases, it seems that this would help users avoid having to do complex things related to re-subscribing, publishing, scheduling and/or buffering when writing dynamic queries, if these operators were to offer reactive overloads that avoid re-subscription altogether.

In at least one known case it seems that it's absolutely required to prevent data loss: http://rx.codeplex.com/workitem/43

The new signatures in general would simply replace the static parameters of their corresponding overloads with IObservable<T>, though perhaps in some cases due to overload conflicts a new naming convention for operators may be called for.

Note that this was already done for some overloads of Window, Buffer, Throttle, Sample, Delay and Timeout in Rx 2.0. Although, instead of replacing TimeSpan parameters with IObservable<TimeSpan>, an IObservable<TOther> parameter was added instead because observable notifications carry time.

Furthermore, instead of overloading Skip and Take in Rx 1.0, different operators were introduced: SkipUntil and TakeUntil. It seems that the same basic principles apply.

These operators are potential candicates for overloading as of Rx 2.0:

Buffer<TSource>(this IObservable<TSource>, int);
Buffer<TSource>(this IObservable<TSource>, int, int);
Buffer<TSource>(this IObservable<TSource>, TimeSpan, int);
Buffer<TSource>(this IObservable<TSource>, TimeSpan, int, IScheduler);
Buffer<TSource>(this IObservable<TSource>, TimeSpan, TimeSpan);
Buffer<TSource>(this IObservable<TSource>, TimeSpan, TimeSpan, IScheduler)

DistinctUntilChanged<TSource>(this IObservable<TSource>, IEqualityComparer<TSource>)

Merge<TSource>(this IEnumerable<IObservable<TSource>>, int)
Merge<TSource>(this IEnumerable<IObservable<TSource>>, int, IScheduler)
Merge<TSource>(this IObservable<IObservable<TSource>>, int)  (See the related work item above.)

ObserveOn<TSource>(this IObservable<TSource>, IScheduler)

Repeat<TSource>(this IObservable<TSource>, int)

Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, int)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, int, IScheduler)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, int, TimeSpan)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, int, TimeSpan, IScheduler)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, TimeSpan)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, TimeSpan, IScheduler)
Replay<TSource>(this IObservable<TSource>, int)
Replay<TSource>(this IObservable<TSource>, int, IScheduler)
Replay<TSource>(this IObservable<TSource>, int, TimeSpan)
Replay<TSource>(this IObservable<TSource>, int, TimeSpan, IScheduler)
Replay<TSource>(this IObservable<TSource>, TimeSpan)
Replay<TSource>(this IObservable<TSource>, TimeSpan, IScheduler)

Retry<TSource>(this IObservable<TSource>, int)

Skip<TSource>(this IObservable<TSource>, int)

SkipLast<TSource>(this IObservable<TSource>, int)
SkipLast<TSource>(this IObservable<TSource>, TimeSpan)
SkipLast<TSource>(this IObservable<TSource>, TimeSpan, IScheduler)

Take<TSource>(this IObservable<TSource>, int)
Take<TSource>(this IObservable<TSource>, int, IScheduler)

TakeLast<TSource>(this System.IObservable<TSource>, int)
TakeLast<TSource>(this IObservable<TSource>, int, IScheduler)
TakeLast<TSource>(this IObservable<TSource>, TimeSpan)
TakeLast<TSource>(this IObservable<TSource>, TimeSpan, IScheduler)
TakeLast<TSource>(this IObservable<TSource>, TimeSpan, IScheduler, IScheduler)

TakeLastBuffer<TSource>(this IObservable<TSource>, int)
TakeLastBuffer<TSource>(this IObservable<TSource>, TimeSpan)
TakeLastBuffer<TSource>(this IObservable<TSource>, TimeSpan, IScheduler)

Window<TSource>(this IObservable<TSource>, int)
Window<TSource>(this IObservable<TSource>, int, int)
Window<TSource>(this IObservable<TSource>, TimeSpan, int)
Window<TSource>(this IObservable<TSource>, TimeSpan, int, IScheduler)
Window<TSource>(this IObservable<TSource>, TimeSpan, TimeSpan)
Window<TSource>(this IObservable<TSource>, TimeSpan, TimeSpan, IScheduler)

Without giving much thought to semantics or to potential overload collisions, here are their coresponding reactive overloads:

Note: ObserveOn is the only operator that should have an observable IScheduler parameter?

Buffer<TSource>(this IObservable<TSource>, IObservable<int>);
Buffer<TSource>(this IObservable<TSource>, IObservable<int>, IObservable<int>);
Buffer<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<int>);
Buffer<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<int>, IScheduler);
Buffer<TSource, TDuration, TShift>(this IObservable<TSource>, IObservable<TDuration>, IObservable<TShift>);
Buffer<TSource, TDuration, TShift>(this IObservable<TSource>, IObservable<TDuration>, IObservable<TShift>, IScheduler)

DistinctUntilChanged<TSource>(this IObservable<TSource>, IObservable<IEqualityComparer<TSource>>)

Merge<TSource>(this IEnumerable<IObservable<TSource>>, IObservable<int>)
Merge<TSource>(this IEnumerable<IObservable<TSource>>, IObservable<int>, IScheduler)
Merge<TSource>(this IObservable<IObservable<TSource>>, IObservable<int>)  (See the related work item above.)

ObserveOn<TSource>(this IObservable<TSource>, IObservable<IScheduler>)

Repeat<TSource>(this IObservable<TSource>, IObservable<int>)

Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<int>)
Replay<TSource, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<int>, IScheduler)
Replay<TSource, TDuration, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<int>, IObservable<TDuration>)
Replay<TSource, TDuration, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<int>, IObservable<TDuration>, IScheduler)
Replay<TSource, TDuration, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<TDuration>)
Replay<TSource, TDuration, TResult>(this IObservable<TSource>, Func<IObservable<TSource>,IObservable<TResult>>, IObservable<TDuration>, IScheduler)
Replay<TSource>(this IObservable<TSource>, IObservable<int>)
Replay<TSource>(this IObservable<TSource>, IObservable<int>, IScheduler)
Replay<TSource, TDuration>(this IObservable<TSource>, IObservable<int>, IObservable<TDuration>)
Replay<TSource, TDuration>(this IObservable<TSource>, IObservable<int>, IObservable<TDuration>, IScheduler)
Replay<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>)
Replay<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler)

Retry<TSource>(this IObservable<TSource>, IObservable<int>)

Skip<TSource>(this IObservable<TSource>, IObservable<int>)

SkipLast<TSource>(this IObservable<TSource>, IObservable<int>)
SkipLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>)
SkipLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler)

Take<TSource>(this IObservable<TSource>, IObservable<int>)
Take<TSource>(this IObservable<TSource>, IObservable<int>, IScheduler)

TakeLast<TSource>(this IObservable<TSource>, IObservable<int>)
TakeLast<TSource>(this IObservable<TSource>, IObservable<int>, IScheduler)
TakeLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>)
TakeLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler)
TakeLast<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler, IScheduler)

TakeLastBuffer<TSource>(this IObservable<TSource>, IObservable<int>)
TakeLastBuffer<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>)
TakeLastBuffer<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IScheduler)

Window<TSource>(this IObservable<TSource>, IObservable<int>)
Window<TSource>(this IObservable<TSource>, IObservable<int>, IObservable<int>)
Window<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<int>)
Window<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<int>, IScheduler)
Window<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<TDuration>)
Window<TSource, TDuration>(this IObservable<TSource>, IObservable<TDuration>, IObservable<TDuration>, IScheduler)
@mattpodwysocki

This comment has been minimized.

Show comment
Hide comment
@mattpodwysocki

mattpodwysocki Oct 13, 2014

Contributor

I'm thinking now that Rxx's WindowIntrospective operator may in fact be another "dynamic" variant of ObserveOn. Although the scheduler doesn't vary, the size of the internal queue does.

See also: https://rx.codeplex.com/workitem/82

Contributor

mattpodwysocki commented Oct 13, 2014

I'm thinking now that Rxx's WindowIntrospective operator may in fact be another "dynamic" variant of ObserveOn. Although the scheduler doesn't vary, the size of the internal queue does.

See also: https://rx.codeplex.com/workitem/82

@RxDave

This comment has been minimized.

Show comment
Hide comment
@headinthebox

This comment has been minimized.

Show comment
Hide comment
@headinthebox

headinthebox Nov 7, 2014

Contributor

Yup, obviously missing when we went through the operators.

Contributor

headinthebox commented Nov 7, 2014

Yup, obviously missing when we went through the operators.

@onovotny onovotny added the [area] Rx label Jun 17, 2016

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd Jul 20, 2016

Contributor

I'm wondering how can you reasonably change, for example, the limit in Take while it is running? I.e., you start out with a signal of 5, then change your mind and singal 3 as the limit. Now the source produces three items and stops when you are about to signal 7 as the new take limit...

The second problem is that the parameter signals may not be reusable and/or a single hot parameter source affects all Observers of the same assembled chain:

BehaviorSubject<int> takeLimit = new BehaviorSubject<int>();
Observable<int> o = Range(1, 10).Delay(TimeSpan.ofSeconds(1)).Take(takeLimit);

takeLimit.OnNext(5);
o.Subscribe(e => { });

takeLimit.OnNext(10); // will update the previous one's limit as well
o.Subscribe(e => { });

takeLimit.OnNext(1); // this too
o.Subscribe(e => { });

There is already a way of dynamically taking some items: TakeUntil (and TakeWhile if you have that).

Contributor

akarnokd commented Jul 20, 2016

I'm wondering how can you reasonably change, for example, the limit in Take while it is running? I.e., you start out with a signal of 5, then change your mind and singal 3 as the limit. Now the source produces three items and stops when you are about to signal 7 as the new take limit...

The second problem is that the parameter signals may not be reusable and/or a single hot parameter source affects all Observers of the same assembled chain:

BehaviorSubject<int> takeLimit = new BehaviorSubject<int>();
Observable<int> o = Range(1, 10).Delay(TimeSpan.ofSeconds(1)).Take(takeLimit);

takeLimit.OnNext(5);
o.Subscribe(e => { });

takeLimit.OnNext(10); // will update the previous one's limit as well
o.Subscribe(e => { });

takeLimit.OnNext(1); // this too
o.Subscribe(e => { });

There is already a way of dynamically taking some items: TakeUntil (and TakeWhile if you have that).

@RxDave

This comment has been minimized.

Show comment
Hide comment
@RxDave

RxDave Jul 21, 2016

Contributor

TakeUntil is the dynamic version of Take. I go into a bit more detail in my blog post linked above.

Contributor

RxDave commented Jul 21, 2016

TakeUntil is the dynamic version of Take. I go into a bit more detail in my blog post linked above.

@mattpodwysocki

This comment has been minimized.

Show comment
Hide comment
@mattpodwysocki

mattpodwysocki Jul 21, 2016

Contributor

@RxDave correct, just as SkipUntil is a dynamic version of Skip. So which operators should we consider here? The entire list? Note that we should backport RetryWhen and RepeatWhen from RxJava and RxJS to Rx .NET. @headinthebox thoughts?

Contributor

mattpodwysocki commented Jul 21, 2016

@RxDave correct, just as SkipUntil is a dynamic version of Skip. So which operators should we consider here? The entire list? Note that we should backport RetryWhen and RepeatWhen from RxJava and RxJS to Rx .NET. @headinthebox thoughts?

@RxDave

This comment has been minimized.

Show comment
Hide comment
@RxDave

RxDave Jul 21, 2016

Contributor

@mattpodwysocki It's tough to say which would actually be useful without scenarios in mind. Merge has a real-world scenario, so I'd say that's the most important one that I can think of.

#48

Perhaps the heuristic should be to choose those operators that are stateful and will cause an irreversible loss of state when its arguments must change; e.g., when using Switch to re-apply the operator with new arguments. Or at least, if avoiding state loss results in user code that solves the original problem that the operator was intended to solve in the first place, such as trying to dynamically adjust a Take or Skip query without the use of TakeUntil or SkipUntil, respectively.

Contributor

RxDave commented Jul 21, 2016

@mattpodwysocki It's tough to say which would actually be useful without scenarios in mind. Merge has a real-world scenario, so I'd say that's the most important one that I can think of.

#48

Perhaps the heuristic should be to choose those operators that are stateful and will cause an irreversible loss of state when its arguments must change; e.g., when using Switch to re-apply the operator with new arguments. Or at least, if avoiding state loss results in user code that solves the original problem that the operator was intended to solve in the first place, such as trying to dynamically adjust a Take or Skip query without the use of TakeUntil or SkipUntil, respectively.

@RxDave

This comment has been minimized.

Show comment
Hide comment
@RxDave

RxDave Jul 21, 2016

Contributor

Note that I generated that list of operators by simply looking for operators with scalar parameters. I didn't actually pay attention to the semantics or usefulness of such dynamic operators. It's possible that most of those operators wouldn't make much sense as dynamic operators. For example, I'm not so sure that ObserveOn<T>(this IO<T>, IO<IScheduler>> makes sense. Worth considering though?

Contributor

RxDave commented Jul 21, 2016

Note that I generated that list of operators by simply looking for operators with scalar parameters. I didn't actually pay attention to the semantics or usefulness of such dynamic operators. It's possible that most of those operators wouldn't make much sense as dynamic operators. For example, I'm not so sure that ObserveOn<T>(this IO<T>, IO<IScheduler>> makes sense. Worth considering though?

@akarnokd

This comment has been minimized.

Show comment
Hide comment
@akarnokd

akarnokd May 31, 2018

Contributor

Doesn't make sense to me after these years.

Contributor

akarnokd commented May 31, 2018

Doesn't make sense to me after these years.

@danielcweber

This comment has been minimized.

Show comment
Hide comment
@danielcweber

danielcweber Jun 1, 2018

Collaborator

Definitely nice to have but I have rarely felt the need, in most cases you can do pretty well with some Publish, WithLatestFrom and Switch although this is not optimal.

Collaborator

danielcweber commented Jun 1, 2018

Definitely nice to have but I have rarely felt the need, in most cases you can do pretty well with some Publish, WithLatestFrom and Switch although this is not optimal.

@RxDave

This comment has been minimized.

Show comment
Hide comment
@RxDave

RxDave Jun 1, 2018

Contributor

I agree it's rarely needed, but when it's needed it's actually necessary.

Reactive-Extensions#48

The problem is with any operators that may capture state, and whereby resubscribing loses that state. The current workarounds, perhaps, require us to redundantly implement the behavior of a problematic operator ourselves, defeating the entire purpose of the operator in the first place.

Operators that accept observables as arguments, especially only for some parameters, are immediately suspect.

Contributor

RxDave commented Jun 1, 2018

I agree it's rarely needed, but when it's needed it's actually necessary.

Reactive-Extensions#48

The problem is with any operators that may capture state, and whereby resubscribing loses that state. The current workarounds, perhaps, require us to redundantly implement the behavior of a problematic operator ourselves, defeating the entire purpose of the operator in the first place.

Operators that accept observables as arguments, especially only for some parameters, are immediately suspect.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment