Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handler scheduler fast path #228

Closed
wants to merge 2 commits into from

Conversation

laynepenney
Copy link

When HandlerScheduler action delay is zero and (current thread == target thread), call action directly instead of delegating to the handler.

@JakeWharton
Copy link
Member

Prior art @ JakeWharton/RxBinding#28.

cc @dlew @loganj @mttkay

@laynepenney
Copy link
Author

Thanks. I assumed that it was brought up before, but I didn't see it.

@laynepenney
Copy link
Author

Use case that I've seen. Assume that you want to wrap another library so that you can use Rx pattern throughout your app. This library publishes to the main thread, so if you use AndroidSchedulers.mainThread() in all of you observeOn patterns, you end up with two handler.post() calls to the same handler.

interface Callback<T> {
    void onResult(T t);
}

static class NetworkCall extends AsyncTask<String, Void, String> {
    private final Callback<String> callback;

    private NetworkCall(Callback<String> callback) {
        this.callback = callback;
    }

    @Override protected String doInBackground(String... params) {
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // ignore
        }
        return "finished";
    }

    @Override protected void onPostExecute(String s) {
        callback.onResult(s);
    }
}

Observable.OnSubscribe<String> someNetworkCallIWantToWrap = new Observable.OnSubscribe<String>() {
    @Override public void call(final Subscriber<? super String> subscriber) {
        new NetworkCall(new Callback<String>() {
            @Override public void onResult(String s) {
                // Calling the subscriber from the main thread
                subscriber.onNext(s);
                subscriber.onCompleted();
            }
        });
    }
};

// Sample use case:
// Some third party network library I need is using Async Tasks
// I want to wrap it in Rx, to match some other things I'm doing.
// By default, that call will post the result to the main thread.
// Fast past avoids repeat schedules to the handler thread,
// and instead posts directly if possible

Observable.create(someNetworkCallIWantToWrap)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<String>() {...);

@mttkay
Copy link
Collaborator

mttkay commented Sep 16, 2015

One option would be to make this an explicit API, i.e. a separate kind of scheduler. At least that way it's clearly communicated in code what the behavior might be. Thoughts?

@dlew
Copy link
Collaborator

dlew commented Sep 16, 2015

@akarnokd I'm wondering if you could give input here - what do you think of implementing a Scheduler such that delay(0) + already on requested thread just calls the Action directly instead of waiting?

@loganj
Copy link

loganj commented Sep 16, 2015

This keeps coming back, and there are definitely some good reasons to do it. Unfortunately I don't think it's safe without some explicit recognition of the synchronous behavior by RxJava internals. Error handling gets all screwed up without that.

I'll also admit that I've been able to migrate my app to an always-async main thread scheduler a la RxAndroid's with no noticeable ill effects.

@akarnokd
Copy link
Member

@dlew The problem with that if said action does recursive scheduling, you may end up with stack overflow or at least a much deeper stack.

@loganj
Copy link

loganj commented Sep 16, 2015

@akarnokd @dlew my last iteration of the "sometimes-posting" main thread scheduler would actually use an internal trampoline for the case where it's called from the main thread.

@akarnokd
Copy link
Member

Yes, trampolining in this case is a solution. I proposed something like this for RxSwing (ReactiveX/RxSwing#24) but it was closed without any good reason...

@loganj
Copy link

loganj commented Sep 16, 2015

IIRC this block in OperatorObserveOn was problematic during error handling. I don't recall the details, it was a pretty hairy debugging session:

https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorObserveOn.java#L57-L67

@akarnokd
Copy link
Member

ObserveOn is pretty robust. What was the problem? Was it the fact that errors cut ahead of values by design?

@loganj
Copy link

loganj commented Sep 16, 2015

I think an unsubscribe was executed on the scheduler with the expectation that it would occur after an error was handled/dispatched, but because my scheduler looked asynchronous to ObserveOn but actually was synchronous in that case, the unsubscribe occurred first.

@artem-zinnatullin
Copy link
Contributor

Some additional input:

This PR brings kind of undefined behavior to the RxAndroid, because your code after observeOn() may or may not fall into synchronous blocking execution.

Current implementation gives you strong guarantees that work is scheduled and will be done in some future. Also, current behavior is good for Android App's UI in general, because almost all other work such as user interactions with the device, intents and so on in the Android Framework go through Handler & Looper and your code will become part of this queue with respect to the Android Framework and user of the device.

Also, just curious: is this really frequent case when you use observeOn(HandlerScheduler.from(...)) and Observable subscribed on the same thread? I don't see this in our projects.

@loganj

This keeps coming back, and there are definitely some good reasons to do it.

What are these good reasons to do it?

@laynepenney in your example you don't need neither subscribeOn() nor observeOn() since AsyncTask won't respect your subscribeOn() (you just wasting processing resources of the device) and observeOn(AndroidSchedulers.mainThread()) does nothing useful in your case.

@mttkay
Copy link
Collaborator

mttkay commented Sep 16, 2015

What are these good reasons to do it?

Because the Android main looper becomes a bottleneck very quickly. All draw calls go through it, for instance. This means that anything you schedule using mainThread() will compete with other random stuff for the main event queue. What happens in practice is that at some point you will get MissingBackpressureExceptions because your observables emit items faster than your subscribers can catch up with them, because those notifications are waiting in line behind 500 draw calls.

@laynepenney
Copy link
Author

I like the idea of having this functionality in a separate scheduler, or at least with some options in the create method that defaults to current behavior.

@artem-zinnatullin Yes, my example explicitly uses an Async Task where both subscribeOn and observeOn are unnecessary. My intended case is supposing that I don't know exactly how the third party call behaves (not my code). Suppose I have various types of source observables with different implementations (some mine, some third party that I wrapped). When I subscribe for the data in the UI, I want to establish the same contract for everything, explicitly observing on the main thread while sometimes ignorant of the actual source implementation.

@artem-zinnatullin
Copy link
Contributor

What happens in practice is that at some point you will get MissingBackpressureExceptions because your observables emit items faster than your subscribers can catch up with them, because those notifications are waiting in line behind 500 draw calls.

I'd say that it's more problem of the RxJava itself (backpressure and buffers instead of just pure event processing, yes, RxJava v2, I'm looking at you), because as user of the device you want 60 fps and as fast responses to your interactions as possible.

And again, I am curious how often do you have observeOn(mainThread()) on the Observable which already emits data on the Main Thread? I understand theoretical MissingBackpressureExceptions problem, but I am just not sure that this PR will fix it.

In my opinion, synchronous blocking execution is the same kind of evil as Handler.postAtFrontOfQueue().

@laynepenney
Copy link
Author

I'll admit, though, in my use case the optimization would be pretty minor.

@benjchristensen
Copy link
Member

@artem-zinnatullin If getting a MissingBackpressureException, that means the consumer is slower than your producer. There really is no such thing as "just pure event processing" once you are doing thread-hopping and using event loops - you must deal with buffers. Purity doesn't really exist in our imperative, multi-threaded, interrupt-driven, time-sliced environment, unless you stay on a single thread. RxJava v1 makes it explicit that you have to deal with this by not allowing unbounded buffer growth. Thus, if you didn't receive the MissingBackpressureException, then it would default to unbounded buffering instead. If you wish to opt-in to larger buffers (effectively allowing "unbounded" buffer growth), then you can do so either by setting the default buffer size (instead of the default 16 on Android) or by using onBackpressureBuffer which will allow unbounded buffer growth (until you run out of memory, or decide the latency spikes warrant a better approach).

When hopping threads (such as when using observeOn), some of the available options are:

  • blocking the calling thread (callstack blocking)
  • buffer the data between threads
  • apply flow control such as sampling, dropping, throttling, etc
  • use pull instead of push (the request(n) semantics with a pullable data source)

The RxJava community has chosen to never do callstack blocking since that would make RxJava unusable in environments with event loops. It has also decided to default to bounded buffers to cap memory and latency growth.

Considering that, what would you do differently when passing over threads via a buffer with different speed producers and consumers?

@artem-zinnatullin
Copy link
Contributor

@benjchristensen

There really is no such thing as "just pure event processing" once you are doing thread-hopping and using event loops - you must deal with buffers.

Yeah, I mean, that it will be nice to see an "Observable" whose Operators won't care about backpressure and observeOn() will just use kind of Queue for scheduled work. AFAIK @headinthebox is working on RxMobile with kind of same idea in mind. But it's off-topic.

Considering that, what would you do differently when passing over threads via a buffer with different speed producers and consumers?

Current behavior of RxAndroid's HandlerScheduler is correct, it works with respect to the Android Framework. I am not sure that this PR will improve common usage cases, instead it gives undefined execution scheduling.

And again, I am curious how often do you have observeOn(mainThread()) on the Observable which already emits data on the Main Thread?

Still no answer :) I guess, this kind of HandlerScheduler can be implemented as a separate HandlerScheduler* so everybody who needs it could use it.

Just as a reminder for everybody who interested in this issue: RxAndroid's Plugin system allows you to override AndroidSchedulers.mainThread() and provide custom implementation without changing existent code that uses AndroidSchedulers.mainThread().

@laynepenney
Copy link
Author

If it's in everyone's best interests, I can update this PR to be a different scheduler keeping the existing functionality in place.

@mttkay
Copy link
Collaborator

mttkay commented Sep 18, 2015

+1

On Thu, Sep 17, 2015 at 6:30 PM, Layne Penney notifications@github.com
wrote:

If it's in everyone's best interests, I can update this PR to be a
different scheduler keeping the existing functionality in place.


Reply to this email directly or view it on GitHub
#228 (comment).

Matthias Käppler

Engineer

Twitter: https://twitter.com/mttkay

Skype: matthias-sc

SoundCloud Ltd. | Rheinsberger Str. 76/77, 10115 Berlin, Germany | +49
(0)172 2345679

Managing Director: Alexander Ljung | Incorporated in England & Wales
with Company
No. 6343600 | Local Branch Office | AG Charlottenburg | HRB 110657B

Capture and share your music & audio on SoundCloud
http://soundcloud.com/creators

…or direct action optimization. Create sample for fastPath scheduler, with a sample use case. Update tests.
@laynepenney
Copy link
Author

I updated the pull request. I made a separate scheduler, added a sample, and updated tests. I have a TODO in there, not sure if this scheduler should have a plugin hook. I assume not, but TODO to verify.

I'm also not sure on the name. I called it FastPathHandlerScheduler, but I'm not sure that is a great name. Any other ideas? I appreciate any and all feedback.

@JakeWharton
Copy link
Member

Calling it "fast path" will (potentially erroneously) steer people to using
it without understanding why. We'd need a better name if it were to be
merged.

The Javadoc for this implementation needs an unambiguous rule as to when
this version should be preferred. Right now it only describes the
implementation which isn't really helpful to a consumer. It doesn't inform
my decision beyond making me think it might sometimes be faster but who
knows when.

Without some rule or decision tree for users, we're just creating decision
paralysis with two implementations that only differ in subtle ways.

On Sat, Sep 19, 2015, 3:09 PM Layne Penney notifications@github.com wrote:

I updated the pull request. I made a separate scheduler, added a sample,
and updated tests. I have a TODO in there, not sure if this scheduler
should have a plugin hook. I assume not, but TODO to verify.

I'm also not sure on the name. I called it FastPathHandlerScheduler, but
I'm not sure that is a great name. Any other ideas? I appreciate any and
all feedback.


Reply to this email directly or view it on GitHub
#228 (comment).

@loganj
Copy link

loganj commented Sep 19, 2015

Really pretty sure this is going to result in swallowed errors, due to out-of-order unsubscribe and downstream onError. My impl of the same idea did.

I'll try to put together a demonstration, haven't bothered to in the past. Just avoided the problem in app code.

IIRC it was an exception thrown in a subscriber's onError (or possibly onNext?) that did it.

@laynepenney
Copy link
Author

@JakeWharton Noted. I haven't yet thought of a better name, but I'll keep thinking. I guess it might be helpful in the javadoc to have some sample use cases that people have brought up, steering people to use this handler only in certain cases, when it's needed?

@loganj If you can post some code replicating something that you've seen, that would help. With that, possibly, we can come up with a way to prevent these things from occurring.

@JakeWharton
Copy link
Member

Going to close this because the PR itself has gone stale. Can we move to an issue if you still want to pursue this?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants