Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handler scheduler fast path #228

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,27 @@ private AndroidSchedulers() {
throw new AssertionError("No instances");
}

private static final Handler MAIN_HANDLER = new Handler(Looper.getMainLooper());
private static final Scheduler MAIN_THREAD_SCHEDULER =
new HandlerScheduler(new Handler(Looper.getMainLooper()));
new HandlerScheduler(MAIN_HANDLER);
private static final Scheduler MAIN_THREAD_FAST_PATH_SCHEDULER =
new FastPathHandlerScheduler(MAIN_HANDLER);

/** A {@link Scheduler} which executes actions on the Android UI thread. */
public static Scheduler mainThread() {
Scheduler scheduler =
RxAndroidPlugins.getInstance().getSchedulersHook().getMainThreadScheduler();
return scheduler != null ? scheduler : MAIN_THREAD_SCHEDULER;
}

// TODO: Do we need a plugin hook for this one?
// Also, should it be created with the scheduler returned from mainThread()?
/**
* A {@link rx.Scheduler} which executes actions on the Android UI thread. This scheduler could
* possibly operate synchronously if the action is immediate and is scheduled from the target
* thread.
*/
public static Scheduler mainThreadFastPath() {
return MAIN_THREAD_FAST_PATH_SCHEDULER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.android.schedulers;

import android.os.Handler;
import android.os.Looper;

import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.android.plugins.RxAndroidPlugins;
import rx.functions.Action0;
import rx.subscriptions.Subscriptions;

/**
* A {@link Scheduler} backed by a {@link Handler}. This scheduler is optimized to call the action
* directly if immediate and scheduled from the target thread. Because of this, it could operate
* synchronously.
*/
public final class FastPathHandlerScheduler extends Scheduler {
private final Scheduler actual;
private final Handler handler;

/** Create a {@link FastPathHandlerScheduler} which uses {@code handler} to execute actions. */
public static FastPathHandlerScheduler from(Handler handler) {
return new FastPathHandlerScheduler(handler);
}

FastPathHandlerScheduler(Handler handler) {
this.actual = HandlerScheduler.from(handler);
this.handler = handler;
}

@Override public Worker createWorker() {
return new HandlerWorker(actual.createWorker(), handler);
}

private static final class HandlerWorker extends Worker {
private final Worker actual;
private final Handler handler;

private HandlerWorker(Worker actual, Handler handler) {
this.actual = actual;
this.handler = handler;
}

@Override public Subscription schedule(Action0 action) {
return schedule(action, 0L, TimeUnit.MILLISECONDS);
}

@Override public Subscription schedule(Action0 action, long delayTime, TimeUnit unit) {
// Check if unsubscribed
if (actual.isUnsubscribed()) {
return Subscriptions.unsubscribed();
}

// Fast path if action is immediate and we are on the target thread
if (delayTime <= 0L && Looper.myLooper() == handler.getLooper()) {
// Schedulers hook on action, only for fast path. If actual worker handles, this is done already.
action = RxAndroidPlugins.getInstance().getSchedulersHook().onSchedule(action);
action.call();
return Subscriptions.unsubscribed();
}

return actual.schedule(action, delayTime, unit);
}

@Override public void unsubscribe() {
actual.unsubscribe();
}

@Override public boolean isUnsubscribed() {
return actual.isUnsubscribed();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package rx.android.schedulers;

import android.os.Handler;
import android.os.Looper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -38,15 +39,18 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@RunWith(RobolectricTestRunner.class)
@Config(manifest=Config.NONE)
Expand Down Expand Up @@ -220,4 +224,48 @@ public void call() {

assertTrue(neverCalled.get());
}

@Test
public void shouldPerformFastPathForImmediateActionIsAlreadyOnTargetThread() {
// Explicitly create handler with same looper as current thread
Looper myLooper = Looper.myLooper();
assertNotNull(myLooper);
Handler handler = spy(new Handler(myLooper));
Observable.OnSubscribe<Integer> onSubscribe = mock(Observable.OnSubscribe.class);

Subscription subscription = Observable.create(onSubscribe)
.subscribeOn(FastPathHandlerScheduler.from(handler))
.subscribe();

// Verify onSubscribe is called
verify(onSubscribe).call(any(Subscriber.class));

subscription.unsubscribe();

// Verify action was called directly, and not through handler
verify(handler, never()).postDelayed(any(Runnable.class), anyLong());
verify(handler, never()).removeCallbacks(any(Runnable.class));
}

@Test
public void shouldNotPerformFastPathForDelayedAction() {
// Explicitly create handler with same looper as current thread
Looper myLooper = Looper.myLooper();
assertNotNull(myLooper);
Handler handler = spy(new Handler(myLooper));
@SuppressWarnings("unchecked")
Action0 action = mock(Action0.class);

Scheduler scheduler = FastPathHandlerScheduler.from(handler);
Worker inner = scheduler.createWorker();
inner.schedule(action, 1, SECONDS);

// verify that we post to the given Handler
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
verify(handler).postDelayed(runnable.capture(), eq(1000L));

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call();
}
}
91 changes: 91 additions & 0 deletions sample-app/src/main/java/rx/android/samples/MainActivity.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,29 @@
import android.os.Bundle;
import android.os.Handler;
import android.os.HandlerThread;
import android.os.Looper;
import android.util.Log;
import android.view.View;
import android.widget.Toast;

import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Scheduler;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.android.schedulers.HandlerScheduler;
import rx.exceptions.OnErrorThrowable;
import rx.functions.Action0;
import rx.functions.Func0;
import rx.schedulers.Schedulers;

import static android.os.Process.THREAD_PRIORITY_BACKGROUND;

public class MainActivity extends Activity {
private static final String TAG = "RxAndroidSamples";

private Handler backgroundHandler;
private int counter;

@Override protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
Expand All @@ -35,6 +42,13 @@ public class MainActivity extends Activity {
onRunSchedulerExampleButtonClicked();
}
});
findViewById(R.id.fast_path_example).setOnClickListener(new View.OnClickListener() {
@Override public void onClick(View v) {
int count = counter++;
onRunFastPathExample(count, false);
onRunFastPathExample(count, true);
}
});
}

void onRunSchedulerExampleButtonClicked() {
Expand All @@ -58,6 +72,33 @@ void onRunSchedulerExampleButtonClicked() {
});
}

void onRunFastPathExample(final int count, final boolean fastPathEnabled) {
final String name = count + "-" + (fastPathEnabled ? "Fast path" : "Non-fast path");
final Scheduler observeOn = fastPathEnabled
? AndroidSchedulers.mainThreadFastPath()
: AndroidSchedulers.mainThread();
fastPathSampleObservable()
// Run on a background thread
.subscribeOn(HandlerScheduler.from(backgroundHandler))
// Be notified on the main thread
.observeOn(observeOn)
.subscribe(new Subscriber<String>() {
@Override public void onCompleted() {
Log.d(TAG, String.format("onCompleted(%s)", name));
}

@Override public void onError(Throwable e) {
Log.e(TAG, "onError()", e);
}

@Override public void onNext(String string) {
string = String.format("%s posted at: %s, now is: %d", name, string,
System.currentTimeMillis());
Log.d(TAG, "onNext(" + string + ")");
}
});
}

static Observable<String> sampleObservable() {
return Observable.defer(new Func0<Observable<String>>() {
@Override public Observable<String> call() {
Expand All @@ -72,9 +113,59 @@ static Observable<String> sampleObservable() {
});
}

static Observable<String> fastPathSampleObservable() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(final Subscriber<? super String> subscriber) {
new SomeThirdPartyNetworkCall(new Callback<String>() {
@Override public void onResult(String s) {
// Calling the subscriber from the main thread
subscriber.onNext(s);
subscriber.onCompleted();
}
}).execute();
}
});
}

static class BackgroundThread extends HandlerThread {
BackgroundThread() {
super("SchedulerSample-BackgroundThread", THREAD_PRIORITY_BACKGROUND);
}
}

interface Callback<T> {
void onResult(T t);
}

static class SomeThirdPartyNetworkCall {
private final Handler handler = new Handler(Looper.getMainLooper());
private final Callback<String> callback;

private SomeThirdPartyNetworkCall(Callback<String> callback) {
this.callback = callback;
}

void execute() {
Schedulers.io().createWorker().schedule(new Action0() {
@Override public void call() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
handler.post(new Runnable() {
@Override public void run() {
// Simulate taking a bit of time on handler thread
try {
Thread.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
callback.onResult(Long.toString(System.currentTimeMillis()));
}
});
}
});
}
}
}
12 changes: 9 additions & 3 deletions sample-app/src/main/res/layout/main_activity.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?xml version="1.0" encoding="utf-8"?>

<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
Expand All @@ -9,11 +8,18 @@
android:paddingRight="@dimen/activity_horizontal_margin"
android:paddingLeft="@dimen/activity_horizontal_margin">

<TextView
<Button
android:id="@+id/scheduler_example"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="Run Schedulers Example"
/>

</LinearLayout>
<Button
android:id="@+id/fast_path_example"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:text="Run Fast Path Example"
/>

</LinearLayout>