Permalink
Browse files

Change scheduler factory to care about loopers, not handlers.

A Looper is the currency of Android-specific work scheduling that we should be exposing. Handler is only a helper through which we access the Looper.
  • Loading branch information...
JakeWharton committed May 1, 2016
1 parent fa21566 commit 0bb9b4e63f5560ca238233f1c4d692d9bc8b5ba0
@@ -4,7 +4,7 @@ Android specific bindings for [RxJava](http://github.com/ReactiveX/RxJava).

This module adds the minimum classes to RxJava that make writing reactive components in Android
applications easy and hassle-free. More specifically, it provides a `Scheduler` that schedules on
the main UI thread or any given `Handler`.
the main thread or any given `Looper`.


## Communication
@@ -52,53 +52,36 @@ Futher details on building can be found on the RxJava [Getting Started][start] p
A sample project which provides runnable code examples that demonstrate uses of the classes in this
project is available in the `sample-app/` folder.

## Observing on the UI thread
## Observing on the main thread

One of the most common operations when dealing with asynchronous tasks on Android is to observe the task's
result or outcome on the main UI thread. Using vanilla Android, this would
typically be accomplished with an `AsyncTask`. With RxJava instead you would declare your `Observable`
to be observed on the main thread:
result or outcome on the main thread. Using vanilla Android, this would typically be accomplished with an
`AsyncTask`. With RxJava instead you would declare your `Observable` to be observed on the main thread:

```java
public class ReactiveFragment extends Fragment {
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);
}
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);
```

This will execute the Observable on a new thread, and emit results through `onNext` on the main UI thread.
This will execute the `Observable` on a new thread, and emit results through `onNext` on the main thread.

## Observing on arbitrary threads
The previous sample is merely a specialization of a more general concept, namely binding asynchronous
communication to an Android message loop using the `Handler` class. In order to observe an `Observable`
on an arbitrary thread, create a `Handler` bound to that thread and use the `HandlerScheduler.from`
scheduler:
## Observing on arbitrary loopers

The previous sample is merely a specialization of a more general concept: binding asynchronous
communication to an Android message loop, or `Looper`. In order to observe an `Observable` on an arbitrary
`Looper`, create an associated `Scheduler` by calling `AndroidSchedulers.from`:

```java
new Thread(new Runnable() {
@Override
public void run() {
Looper.prepare();
final Handler handler = new Handler(); // bound to this thread
Observable.just("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(HandlerScheduler.from(handler))
.subscribe(/* an Observer */)
// perform work, ...
Looper.loop();
}
}, "custom-thread-1").start();
Looper backgroundLooper = // ...
Observable.just("one", "two", "three", "four", "five")
.observeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe(/* an Observer */)
```

This will execute the Observable on a new thread and emit results through `onNext` on "custom-thread-1".
(This example is contrived since you could as well call `observeOn(Schedulers.currentThread())` but it
shall suffice to illustrate the idea.)
This will execute the Observable on a new thread and emit results through `onNext` on whatever thread is
running `backgroundLooper`.


## Bugs and Feedback
@@ -13,9 +13,7 @@
*/
package rx.android.schedulers;

import android.os.Handler;
import android.os.Looper;

import rx.Scheduler;
import rx.android.plugins.RxAndroidPlugins;

@@ -28,8 +26,7 @@ private AndroidSchedulers() {
// See https://github.com/ReactiveX/RxAndroid/issues/238
// https://en.wikipedia.org/wiki/Initialization-on-demand_holder_idiom
private static class MainThreadSchedulerHolder {
static final Scheduler MAIN_THREAD_SCHEDULER =
new HandlerScheduler(new Handler(Looper.getMainLooper()));
static final Scheduler MAIN_THREAD_SCHEDULER = new LooperScheduler(Looper.getMainLooper());
}

/** A {@link Scheduler} which executes actions on the Android UI thread. */
@@ -38,4 +35,10 @@ public static Scheduler mainThread() {
RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
return scheduler != null ? scheduler : MainThreadSchedulerHolder.MAIN_THREAD_SCHEDULER;
}

/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new LooperScheduler(looper);
}
}
@@ -13,122 +13,29 @@
*/
package rx.android.schedulers;

import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.android.plugins.RxAndroidPlugins;
import rx.android.plugins.RxAndroidSchedulersHook;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;
import android.os.Handler;
import android.os.Message;
import android.os.Looper;
import rx.Scheduler;

/** A {@link Scheduler} backed by a {@link Handler}. */
public final class HandlerScheduler extends Scheduler {
/** Create a {@link Scheduler} which uses {@code handler} to execute actions. */
/**
* A {@link Scheduler} backed by a {@link Handler}.
*
* @deprecated Use {@link AndroidSchedulers#from(Looper)}.
*/
@Deprecated
public final class HandlerScheduler extends LooperScheduler {
/**
* Create a {@link Scheduler} which uses {@code handler} to execute actions.
*
* @deprecated Use {@link AndroidSchedulers#from(Looper)}.
*/
@Deprecated
public static HandlerScheduler from(Handler handler) {
if (handler == null) throw new NullPointerException("handler == null");
return new HandlerScheduler(handler);
}

private final Handler handler;

HandlerScheduler(Handler handler) {
this.handler = handler;
}

@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}

static class HandlerWorker extends Worker {
private final Handler handler;
private final RxAndroidSchedulersHook hook;
private volatile boolean unsubscribed;

HandlerWorker(Handler handler) {
this.handler = handler;
this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
}

@Override
public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacksAndMessages(this /* token */);
}

@Override
public boolean isUnsubscribed() {
return unsubscribed;
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}

action = hook.onSchedule(action);

ScheduledAction scheduledAction = new ScheduledAction(action, handler);

Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.

handler.sendMessageDelayed(message, unit.toMillis(delayTime));

if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}

return scheduledAction;
}

@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
}

static final class ScheduledAction implements Runnable, Subscription {
private final Action0 action;
private final Handler handler;
private volatile boolean unsubscribed;

ScheduledAction(Action0 action, Handler handler) {
this.action = action;
this.handler = handler;
}

@Override public void run() {
try {
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}

@Override public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacks(this);
}

@Override public boolean isUnsubscribed() {
return unsubscribed;
}
private HandlerScheduler(Handler handler) {
super(handler);
}
}
@@ -0,0 +1,131 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.android.schedulers;

import android.os.Handler;
import android.os.Looper;
import android.os.Message;
import java.util.concurrent.TimeUnit;
import rx.Scheduler;
import rx.Subscription;
import rx.android.plugins.RxAndroidPlugins;
import rx.android.plugins.RxAndroidSchedulersHook;
import rx.exceptions.OnErrorNotImplementedException;
import rx.functions.Action0;
import rx.plugins.RxJavaPlugins;
import rx.subscriptions.Subscriptions;

class LooperScheduler extends Scheduler {
private final Handler handler;

LooperScheduler(Looper looper) {
handler = new Handler(looper);
}

LooperScheduler(Handler handler) {
this.handler = handler;
}

@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}

static class HandlerWorker extends Worker {
private final Handler handler;
private final RxAndroidSchedulersHook hook;
private volatile boolean unsubscribed;

HandlerWorker(Handler handler) {
this.handler = handler;
this.hook = RxAndroidPlugins.getInstance().getSchedulersHook();
}

@Override
public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacksAndMessages(this /* token */);
}

@Override
public boolean isUnsubscribed() {
return unsubscribed;
}

@Override
public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
if (unsubscribed) {
return Subscriptions.unsubscribed();
}

action = hook.onSchedule(action);

ScheduledAction scheduledAction = new ScheduledAction(action, handler);

Message message = Message.obtain(handler, scheduledAction);
message.obj = this; // Used as token for unsubscription operation.

handler.sendMessageDelayed(message, unit.toMillis(delayTime));

if (unsubscribed) {
handler.removeCallbacks(scheduledAction);
return Subscriptions.unsubscribed();
}

return scheduledAction;
}

@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}
}

static final class ScheduledAction implements Runnable, Subscription {
private final Action0 action;
private final Handler handler;
private volatile boolean unsubscribed;

ScheduledAction(Action0 action, Handler handler) {
this.action = action;
this.handler = handler;
}

@Override public void run() {
try {
action.call();
} catch (Throwable e) {
// nothing to do but print a System error as this is fatal and there is nowhere else to throw this
IllegalStateException ie;
if (e instanceof OnErrorNotImplementedException) {
ie = new IllegalStateException("Exception thrown on Scheduler.Worker thread. Add `onError` handling.", e);
} else {
ie = new IllegalStateException("Fatal Exception thrown on Scheduler.Worker thread.", e);
}
RxJavaPlugins.getInstance().getErrorHandler().handleError(ie);
Thread thread = Thread.currentThread();
thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
}
}

@Override public void unsubscribe() {
unsubscribed = true;
handler.removeCallbacks(this);
}

@Override public boolean isUnsubscribed() {
return unsubscribed;
}
}
}
Oops, something went wrong.

0 comments on commit 0bb9b4e

Please sign in to comment.