-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposed Scheduler Interface Change for 0.18 (yes, again) #997
Comments
Here is the state of scheduler in rxcpp v2. In order to allow tight tail-recursion, the rxcpp (v2) scheduler has a type currently called schedulable class schedulable : public schedulable_base
{
composite_subscription lifetime;
scheduler controller;
action activity;
recursed_scope_type recursed_scope;
public:
schedulable()
{
}
schedulable(composite_subscription cs, scheduler q, action a)
: lifetime(std::move(cs))
, controller(std::move(q))
, activity(std::move(a))
{
}
inline composite_subscription& get_subscription() ;
inline scheduler& get_scheduler() ;
inline action& get_action() ;
inline auto set_recursed(const recurse& r) const;
// recursed - request tail-recursion
//
inline void operator()() const ;
// composite_subscription
//
inline bool is_subscribed() const ;
inline weak_subscription add(shared_subscription s) const ;
inline weak_subscription add(dynamic_subscription s) const ;
inline void remove(weak_subscription w) const ;
inline void clear() const ;
inline void unsubscribe() const ;
// scheduler
//
inline clock_type::time_point now() const ;
inline void schedule() const ;
inline void schedule(clock_type::duration when) const ;
inline void schedule(clock_type::time_point when) const ;
// action
//
inline action_duration::type get_duration() const;
inline void operator()(const schedulable& scbl, const recurse& r) const;
}; The tail-recursion can only be requested in the callback from the scheduler. So for this implementation, if you transport the schedulable out and then request tail-recursion, the process will exit. If I run into the Inner issue in observe_on, I will just make schedulable extract it from the scheduler in the constructor through a private or detail:: accessor. A scheduler takes a schedulable and ignores the scheduler it contains. class scheduler : public scheduler_base
{
public:
typedef scheduler_base::clock_type clock_type;
inline clock_type::time_point now() const ;
inline void schedule(const schedulable& scbl) const ;
inline void schedule(clock_type::duration when, const schedulable& scbl) const ;
inline void schedule(clock_type::time_point when, const schedulable& scbl) const ;
}; The range operator requests tail-recursion template<class T>
struct range : public source_base<T>
{
//...
template<class Subscriber>
void on_subscribe(Subscriber o) {
auto state = std::make_shared<state_type>(init);
state->sc.schedule(make_schedulable(
o, // share the same subscription with the scheduler
[=](const rxsc::schedulable& self){
if (state->remaining == 0) {
o.on_completed();
// o is unsubscribed
}
if (!o.is_subscribed()) {
// terminate loop
return;
}
// send next value
--state->remaining;
o.on_next(state->next);
state->next = static_cast<T>(state->step + state->next);
// tail recurse this same action to continue loop
self();
}));
}
}; In C++ it is essential to keep virtual function calls out of an inner loop. To make tail-recursion work efficiently the recursion objects create a space on the stack inside the virtual function call in the actor that allows the callback and the scheduler to share stack space that records the request and the allowance without any virtual calls in the loop. template<class F>
inline action make_action(F&& f, action_duration::type d = action_duration::runs_short) {
auto fn = std::forward<F>(f);
return action(std::make_shared<detail::action_type>(
d,
// tail-recurse inside of the virtual function call
// until a new action, lifetime or scheduler is returned
[fn](const schedulable& s, const recurse& r) {
auto scope = s.set_recursed(r);
while (s.is_subscribed()) {
r.reset();
fn(s);
if (!r.is_allowed() || !r.is_requested()) {
if (r.is_requested()) {
s.schedule();
}
break;
}
}
}));
} The current_thread uses a thread-local recursion to indicate that tail-recursion is only allowed when the queue is empty //...
const auto& recursor = queue::get_recursion().get_recurse();
// loop until queue is empty
for (
auto when = queue::top().when;
std::this_thread::sleep_until(when), true;
when = queue::top().when
)
{
auto what = queue::top().what;
queue::pop();
what(recursor);
if (queue::empty()) {
break;
}
}
//... |
Based on these implementation requirements for C++ would you recommend a different signature from this? class Scheduler {
public final Subscription schedule(Action1<Recurse> action);
public final Subscription schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
public final Subscription schedulePeriodically(Action1<Recurse> action, long initialDelay, long period, TimeUnit unit);
public abstract Inner createInner(); // for advanced use cases like `observeOn`
public int degreeOfParallelism();
public long now();
// now the primary interface
public static final class Recurse {
public final void schedule();
public final void schedule(long delay, TimeUnit unit);
public final void schedule(Action1<Recurse> action);
public final void schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
}
// now mostly an implementation detail except for advanced use cases
public abstract static class Inner implements Subscription {
public abstract void schedule(Action1<Recurse> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Recurse> action);
public long now();
}
} @headinthebox Does this change your perspective on the Java design at all? |
API changes as per ReactiveX#997
I have submitted a pull request with these changes. Usage looks like this: import java.util.concurrent.TimeUnit;
import rx.Scheduler.Inner;
import rx.Scheduler.Recurse;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
public class Test {
public static void main(String args[]) {
Schedulers.newThread().schedule(new Action1<Recurse>() {
@Override
public void call(Recurse r) {
System.out.println("do stuff");
// recurse
r.schedule(this);
}
});
Schedulers.newThread().schedule(recurse -> {
System.out.println("do stuff");
recurse.schedule();
});
Schedulers.newThread().schedule(recurse -> {
System.out.println("do stuff");
recurse.schedule(1000, TimeUnit.MILLISECONDS);
});
Schedulers.newThread().schedule(recurse -> {
recurse.schedule(re -> {
System.out.println("do more stuff");
});
});
Inner inner = Schedulers.newThread().createInner();
inner.schedule(re -> {
System.out.println("do stuff");
re.schedule(r -> {
System.out.println("do more stuff");
});
});
}
} Code outline: |
This is the time to voice opinions and affect change. Once we make these changes we're headed towards 1.0 and really do not want to change these signatures again. I welcome bike shedding and arguing over names, signatures, etc for the next couple days but would like to wrap it up and move forward by the weekend unless a glaring issue is found. I would appreciate suggestions for better names than |
@kirkshoop If you have a chance to answer my previous question, I'd appreciate it so we make sure these changes address broad use cases and are not bound to a specific language (since even though we're on the JVM, there are several different languages we support, and it would be preferable for Rx across platforms to be similar).
|
Oh. My. Well I had a long reply but it just got lost. I know better than to use the web interface. :( I can't write it again so here is the summary. RxCpp took the insight from a comment by @akarnokd, I think, to another RxJava issue that the parameter to subscribe is like subscriber which is also like subject in that they are all objects whose single-concern is to bind other single-concern objects together. In this case the parameter would be a type that binds the scheduler (with private access to its inner) the action and the subscription together and then expose the functionality of Recurse, Inner, Subscription and Action1. The subscription would be removed from the Action1 in this case. A name that comes to mind is ScheduleAction. I can still count the lines of Java that I have written so be gentle :)
|
Thank you @kirkshoop for the feedback. If I understand correctly, the primary change would be hiding the The problem I've had with combining them ( The reason for this scenario is that retrieving an protected void schedule() {
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
recursiveScheduler = scheduler.createInner();
add(recursiveScheduler);
}
recursiveScheduler.schedule(new Action1<Recurse>() {
@Override
public void call(Recurse inner) {
pollQueue();
}
});
}
} The reason is that the recursion happens externally (the operator is doing it) rather than internally (inside the Therefore, we have use cases where Thus, if we have protected void schedule() {
if (counter.getAndIncrement() == 0) {
if (recursiveScheduler == null) {
add(scheduler.schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
recursiveScheduler = inner;
pollQueue();
}
}));
} else {
recursiveScheduler.schedule(new Action1<Inner>() {
@Override
public void call(Inner inner) {
pollQueue();
}
});
}
}
} Is solving this use case gracefully a good enough reason for exposing The |
Thanks @benjchristensen! What prevents this?
|
The |
Also, by doing it that way it makes the This is why the |
I would like to pick up these particular points, in-person at React, if possible :) EDIT: I am 'kirk dot shoop at microsoft com' |
I had the chance to meet @kirkshoop in person while in London and we reviewed the needs of CPP and Java and came to agreement on the proposed model. We are considering slightly changed names though to improve the semantic meaning: class Scheduler {
public final Subscription schedule(Action1<Schedulable> action);
public final Subscription schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
public final Subscription schedulePeriodically(Action1<Schedulable> action, long initialDelay, long period, TimeUnit unit);
public abstract EventLoop createEventLoop(); // for advanced use cases like `observeOn`
public int degreeOfParallelism();
public long now();
// now the primary interface
public static final class Schedulable {
public final void schedule();
public final void schedule(long delay, TimeUnit unit);
public final void schedule(Action1<Schedulable> action);
public final void schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
}
// now mostly an implementation detail except for advanced use cases
public abstract static class EventLoop implements Subscription {
public abstract void schedule(Action1<Schedulable> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Schedulable> action);
public long now();
}
} Neither of us are sold on |
API changes as per ReactiveX#997
API changes as per ReactiveX#997
There are 3 types of usage patterns to address with this design: 1) Single ActionThe simplest case where only a single action is scheduled which results in a new inner EventLoop being created. Use Cases: subscribeOn, unsubscribeOn, timeout, interval, timer Example: scheduler.schedule(new Action1<Schedulable>() {
@Override
public void call(final Schedulable re) {
// do work here
}
}) 2) Inner RecursionSimple recursion where the work to be repeated is derived from inside the initially scheduled Action, generally just repeating itself. Use Cases: retry, repeat Example: scheduler.schedule(new Action1<Schedulable>() {
@Override
public void call(final Schedulable re) {
// do work here then recursively reschedule
re.schedule();
}
}) Beyond just rescheduling itself, it can reschedule with a delay, or schedule a different Action on the inner EventLoop it has access to via the 3) Outer RecursionThe more complex case, but actually quite common requirement is outer recursion. This use case exists because the scheduling of work is being driven by outer notifications, such as via Use Cases: observeOn, repeat ... and should be things like debounce which are currently implemented wrong. Example: final EventLoop eventLoop = scheduler.createEventLoop();
public void onNext(T t) {
eventLoop.schedule(new Action1<Schedulable>() {
@Override
public void call(final Schedulable re) {
// do work here
}
})
} A single EventLoop (Inner) needs to be created and reused for all notifications received, thus a reference to it must exist and be used for scheduling. Each Current DesignThe current design is: class Scheduler {
public final Subscription schedule(Action1<Schedulable> action);
public final Subscription schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
public final Subscription schedulePeriodically(Action1<Schedulable> action, long initialDelay, long period, TimeUnit unit);
public abstract EventLoop createEventLoop(); // for advanced use cases like `observeOn`
public int degreeOfParallelism();
public long now();
// now the primary interface
public static final class Schedulable {
public final void schedule();
public final void schedule(long delay, TimeUnit unit);
public final void schedule(Action1<Schedulable> action);
public final void schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
public long now();
}
// now mostly an implementation detail except for advanced use cases
public abstract static class EventLoop implements Subscription {
public abstract void schedule(Action1<Schedulable> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Schedulable> action);
public long now();
}
} This design achieves all of the goals, but it has duplication of signatures to favor a simple model for use cases (1) and (2) while still addressing (3). Alternate DesignWe could reduce some of the duplication of the Scheduler API by accepting use case (3) as the primary case and all others work within it. class Scheduler {
public abstract EventLoop createEventLoop(); // for advanced use cases like `observeOn`
public int degreeOfParallelism();
public long now();
// now the primary interface
public static final class Schedulable {
public final void schedule();
public final void schedule(long delay, TimeUnit unit);
public final void schedule(Action1<Schedulable> action);
public final void schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
public long now();
}
// now mostly an implementation detail except for advanced use cases
public abstract static class EventLoop implements Subscription {
public abstract void schedule(Action1<Schedulable> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Schedulable> action);
public long now();
}
} The drawback of this for usage is that now the // instead of this
subscriber.add(scheduler.schedule(new Action1<Schedulable>() {
@Override
public void call(final Schedulable re) {
// do work here
}
}))
// it would now be this
EventLoop loop = scheduler.createEventLoop();
subscriber.add(loop);
loop.schedule(new Action1<Schedulable>() {
@Override
public void call(final Schedulable re) {
// do work here
}
}); BikeshedNow is the time to bikeshed on this and argue over the design and names while accounting for these 3 use cases. I personally think we should leave the 3 redundant methods on |
Reviewing with @headinthebox ... Option 1class Scheduler {
public final Subscription schedule(Action1<Schedulable> action);
public final Subscription schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
public final Subscription schedulePeriodically(Action1<Schedulable> action, long initialDelay, long period, TimeUnit unit);
public abstract EventLoop getEventLoop(); // for advanced use cases like `observeOn`
public int degreeOfParallelism();
public long now();
// now the primary interface
public static final class Schedulable {
public final void reschedule();
public final void reschedule(long delay, TimeUnit unit);
public final void schedule(Action1<Schedulable> action);
public final void schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
public long now();
}
// now mostly an implementation detail except for advanced use cases
public abstract static class EventLoop implements Subscription {
public abstract void schedule(Action1<Schedulable> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Schedulable> action);
public long now();
}
} Option 2class Scheduler {
public final Subscription schedule(Action1<Schedulable> action);
public final Subscription schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
public final Subscription schedulePeriodically(Action1<Schedulable> action, long initialDelay, long period, TimeUnit unit);
public abstract Inner getInner(); // for advanced use cases like `observeOn`
public int degreeOfParallelism();
public long now();
// now the primary interface
public static final class Schedulable {
public final void reschedule();
public final void reschedule(long delay, TimeUnit unit);
public final void schedule(Action1<Schedulable> action);
public final void schedule(Action1<Schedulable> action, final long delayTime, final TimeUnit unit);
public long now();
}
// now mostly an implementation detail except for advanced use cases
public abstract static class Inner implements Subscription {
public abstract void schedule(Action1<Schedulable> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Schedulable> action);
public long now();
}
} Option 3class Scheduler {
public final Subscription schedule(Action1<Recurse> action);
public final Subscription schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
public final Subscription schedulePeriodically(Action1<Recurse> action, long initialDelay, long period, TimeUnit unit);
public abstract Inner getInner(); // for advanced use cases like `observeOn`
public int degreeOfParallelism();
public long now();
// now the primary interface
public static final class Recurse {
public final void reschedule();
public final void reschedule(long delay, TimeUnit unit);
public final void schedule(Action1<Recurse> action);
public final void schedule(Action1<Recurse> action, final long delayTime, final TimeUnit unit);
public long now();
}
// now mostly an implementation detail except for advanced use cases
public abstract static class Inner implements Subscription {
public abstract void schedule(Action1<Recurse> action, long delayTime, TimeUnit unit);
public abstract void schedule(Action1<Recurse> action);
public long now();
}
} |
Preference by @headinthebox is option 2 above which changes |
We plan on making the final decision tomorrow, April 15th now that this has been available for debate for over a week. |
I would like to see something descriptive in place of inner. If EventLoop is not descriptive, then use SerialQueue or anything else that specifies the behavior of scheduling on the returned object. Kirk |
just a gut feeling, but *able sounds like an interface to me. In fact, you could almost extract the different |
and just wondering (sorry, it's late and I should be going to bed, so I might be rambling): what happens if the schedule methods on |
It means that work must be scheduled and executed just to get the reference. This in turn means that volatile mutable references must be used (and set on first work such as an
Good feedback.
What do you suggest calling the I don't understand what "you could almost extract the different schedule() methods" is referring to as far as API design. Being an interface is not helpful in this case. The point of the |
By the way, we could always swing the opposite direction and make the user worry about the use cases and only expose the most basic functional aspects: class Scheduler {
public abstract InnerClassOfSomeName getInnerClassOfSomeName();
public int degreeOfParallelism();
public long now();
public abstract static class InnerClassOfSomeName implements Subscription {
public abstract void schedule(Action0 action, long delayTime, TimeUnit unit);
public abstract void schedule(Action0 action);
public final void schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit);
public long now();
}
} 1) Single Actionfinal InnerClassOfSomeName is = scheduler.getInnerClassOfSomeName();
is.schedule(new Action0() {
@Override
public void call() {
// do work here
}
})
is.unsubscribe() with lambda final InnerClassOfSomeName is = scheduler.getInnerClassOfSomeName();
is.schedule(() -> {
// do work here
})
is.unsubscribe() 2) Inner Recursionfinal InnerClassOfSomeName is = scheduler.getInnerClassOfSomeName();
is.schedule(new Action0() {
@Override
public void call() {
// do work here then recursively reschedule
is.schedule(this); // this will NOT work with lambdas, only anonymous inner classes
}
})
is.unsubscribe() 3) Outer Recursionfinal InnerClassOfSomeName is = scheduler.getInnerClassOfSomeName();
public void onNext(T t) {
is.schedule(new Action0() {
@Override
public void call() {
// do work here
}
})
}
is.unsubscribe() with lambdas final InnerClassOfSomeName is = scheduler.getInnerClassOfSomeName();
public void onNext(T t) {
is.schedule(() -> {
// do work here
})
}
is.unsubscribe() I don't like this approach, but it's an option. |
right you are, scratch that. I was thinking of something like this: final InnerClassOfSomeName is = scheduler.schedule(new Action0() {
@Override
public void call() {
// do work here then reschedule
is.reschedule();
}
});
is.unsubscribe() Obviously, that won't work. Sorry for wasting your time.
That was a knee-jerk reaction because both void schedule(Action1<Schedulable> action, long delayTime, TimeUnit unit);
void schedule(Action1<Schedulable> action);
long now(); You're probably correct that this doesn't define an interface.
My objection was that in the JDK, adjectives are often (usually?) interfaces or annotations ( I don't really have a better suggestion either, and Please don't let me hold you up. I just ran across a tweet by @headinthebox and started bikeshedding on an impulse. |
@kirkshoop, I actually think Inner should have a less prescriptive name. |
@ccmtaylor not a problem at all! I appreciate your involvement. Can't you see how many breaking changes to the API have happened due to mistakes I've either written or merged! I only know about this particular use case because I've implemented
good point
If the JVM supported extension methods
That's what we asked for on this one, so thank you! The craziest ideas or simplest questions can spark a thought and change a design, or just prove an API isn't clear. |
I vote for |
There's nothing about a
I'm okay with this option. I also like |
@benjchristensen I'd also vote for |
I like Worker. |
Worker, +1. |
Worker looks like it's going to win ... @headinthebox are you okay with this? The signature will look like this: class Scheduler {
public abstract Worker createWorker();
public int parallelism();
public long now();
public abstract static class Worker implements Subscription {
public abstract Subscription schedule(Action0 action, long delayTime, TimeUnit unit);
public abstract Subscription schedule(Action0 action);
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit);
public long now();
}
} |
I can live with that. Guess we have a winner then. |
I will make the change to |
As per decision at ReactiveX#997 (comment)
As per decision at ReactiveX/RxJava#997 (comment)
As per decision at ReactiveX/RxJava#997 (comment)
As per decision at ReactiveX/RxJava#997 (comment)
As per decision at ReactiveX/RxJava#997 (comment)
As per decision at ReactiveX/RxJava#997 (comment)
As per decision at ReactiveX/RxJava#997 (comment)
As per decision at ReactiveX/RxJava#997 (comment)
As per decision at ReactiveX/RxJava#997 (comment)
Reviewing the
Scheduler
interface changes of 0.17 with @headinthebox revealed that we're not 100% happy with the outcome, particularly after learning that Java 8 does not allow referencingthis
from within a lambda.The
Scheduler
interface as of 0.17 is:We have determined two problems with this:
In practice we have found that usage is always one of two things, either you just interact with the outer and don't care about the
Inner
, or you immediately need theInner
and have to do an awkward first scheduling just to get access to theInner
. (See here and weep.)The
Action1<Scheduler.Inner>
signature was chosen and put on both outer and inner so that an inner class could refer to itself usingthis
to simply reschedule itself from the outer onto the inner.It was assumed this would work in Java 8 lambdas but unfortunately we did not prove it.
This works with anonymous classes:
but this does not with lambdas:
So we end up with this:
At that point it's clear that
Inner
is not working well and we haveRecurse
to fix the problem.Thus, the proposed changes (breaking again) are:
The name of
Recurse
is up for debate. It may be possible to mergeRecurse
andInner
but I haven't figured it out yet. The reason is thatInner
is a single instance representing a thread or event-loop whereasRecurse
represents anAction
or work. Thus a givenInner
could have multipleRecurse
actions scheduled on to it. It is being anAction
that allows it to recurse by invokingschedule()
that just reschedules itself.This would make it better support Java 8 lambdas and simply recursion, while also better supporting (via the
createInner()
method) the more complicated use cases likeobserveOn
where current code is very awkward.This needs to be the last refactor of this so we nail it down and stop breaking things and can get to 1.0.
Let the discussion begin ...
The text was updated successfully, but these errors were encountered: