Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scheduler Simplification #1047

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
package rx.lang.scala

import scala.concurrent.duration.Duration
import rx.functions.Action1
import rx.functions.Action0
import rx.lang.scala.schedulers._
import scala.concurrent.duration
import rx.lang.scala.JavaConversions._
Expand All @@ -42,42 +42,8 @@ trait Scheduler {
*/
def now: Long = this.asJavaScheduler.now()

/**
* Schedules a cancelable action to be executed.
*
* @param action Action to schedule.
* @return a subscription to be able to unsubscribe from action.
*/
def schedule(action: Inner => Unit): Subscription = this.asJavaScheduler.schedule(action)
def createInner: Inner = this.asJavaScheduler.createInner()

/**
* Schedules a cancelable action to be executed periodically.
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
*
* @param action
* The action to execute periodically.
* @param initialDelay
* Time to wait before executing the action for the first time.
* @param period
* The time interval to wait each time in between executing the action.
* @return A subscription to be able to unsubscribe from action.
*/
def schedulePeriodically(action: Inner => Unit, initialDelay: Duration, period: Duration): Subscription =
this.asJavaScheduler.schedulePeriodically (
new Action1[rx.Scheduler.Inner] {
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
},
initialDelay.toNanos,
period.toNanos,
duration.NANOSECONDS
)

def scheduleRec(work: (=>Unit)=>Unit): Subscription = {
Subscription(asJavaScheduler.schedule(new Action1[rx.Scheduler.Inner] {
override def call(inner: rx.Scheduler.Inner): Unit = work{ inner.schedule(this) }
}))
}
}

object Inner {
Expand All @@ -90,20 +56,20 @@ trait Inner extends Subscription {
/**
* Schedules a cancelable action to be executed in delayTime.
*/
def schedule(action: Inner => Unit, delayTime: Duration): Unit =
def schedule(action: Unit => Unit, delayTime: Duration): Subscription =
this.asJavaInner.schedule(
new Action1[rx.Scheduler.Inner] {
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
new Action0 {
override def call(): Unit = action()
},
delayTime.length,
delayTime.unit)

/**
* Schedules a cancelable action to be executed immediately.
*/
def schedule(action: Inner=>Unit): Unit = this.asJavaInner.schedule(
new Action1[rx.Scheduler.Inner]{
override def call(inner: rx.Scheduler.Inner): Unit = action(javaInnerToScalaInner(inner))
def schedule(action: Unit => Unit): Subscription = this.asJavaInner.schedule(
new Action0 {
override def call(): Unit = action()
}
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
import android.os.Handler;

/**
Expand All @@ -40,43 +42,15 @@ public HandlerThreadScheduler(Handler handler) {
this.handler = handler;
}

/**
* Calls {@link HandlerThreadScheduler#schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)} with a delay of zero milliseconds.
*
* See {@link #schedule(Object, rx.functions.Func2, long, java.util.concurrent.TimeUnit)}
*/
@Override
public Subscription schedule(Action1<Inner> action) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action);
return inner;
}

/**
* Calls {@link Handler#postDelayed(Runnable, long)} with a runnable that executes the given action.
*
* @param state
* State to pass into the action.
* @param action
* Action to schedule.
* @param delayTime
* Time the action is to be delayed before executing.
* @param unit
* Time unit of the delay time.
* @return A Subscription from which one can unsubscribe from.
*/
@Override
public Subscription schedule(Action1<Inner> action, long delayTime, TimeUnit unit) {
InnerHandlerThreadScheduler inner = new InnerHandlerThreadScheduler(handler);
inner.schedule(action, delayTime, unit);
return inner;
public Inner createInner() {
return new InnerHandlerThreadScheduler(handler);
}

private static class InnerHandlerThreadScheduler extends Inner {

private final Handler handler;
private BooleanSubscription innerSubscription = new BooleanSubscription();
private Inner _inner = this;

public InnerHandlerThreadScheduler(Handler handler) {
this.handler = handler;
Expand All @@ -93,31 +67,31 @@ public boolean isUnsubscribed() {
}

@Override
public void schedule(final Action1<Inner> action, long delayTime, TimeUnit unit) {
handler.postDelayed(new Runnable() {
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
final Runnable runnable = new Runnable() {
@Override
public void run() {
if (_inner.isUnsubscribed()) {
if (isUnsubscribed()) {
return;
}
action.call(_inner);
action.call();
}
}, unit.toMillis(delayTime));
}

@Override
public void schedule(final Action1<Inner> action) {
handler.postDelayed(new Runnable() {
};
handler.postDelayed(runnable, unit.toMillis(delayTime));
return Subscriptions.create(new Action0() {

@Override
public void run() {
if (_inner.isUnsubscribed()) {
return;
}
action.call(_inner);
public void call() {
handler.removeCallbacks(runnable);

}

});
}

}, 0L);
@Override
public Subscription schedule(final Action0 action) {
return schedule(action, 0, TimeUnit.MILLISECONDS);
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ public void call() {
if (Looper.getMainLooper() == Looper.myLooper()) {
unsubscribe.call();
} else {
AndroidSchedulers.mainThread().schedule(new Action1<Inner>() {
final Inner inner = AndroidSchedulers.mainThread().createInner();
inner.schedule(new Action0() {
@Override
public void call(Inner inner) {
public void call() {
unsubscribe.call();
inner.unsubscribe();
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.functions.Action0;
import rx.functions.Action1;
import android.os.Handler;

Expand All @@ -41,35 +42,37 @@ public class HandlerThreadSchedulerTest {
public void shouldScheduleImmediateActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
@SuppressWarnings("unchecked")
final Action1<Inner> action = mock(Action1.class);
final Action0 action = mock(Action0.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(action);
Inner inner = scheduler.createInner();
inner.schedule(action);

// verify that we post to the given Handler
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
verify(handler).postDelayed(runnable.capture(), eq(0L));

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(any(Inner.class));
verify(action).call();
}

@Test
public void shouldScheduleDelayedActionOnHandlerThread() {
final Handler handler = mock(Handler.class);
@SuppressWarnings("unchecked")
final Action1<Inner> action = mock(Action1.class);
final Action0 action = mock(Action0.class);

Scheduler scheduler = new HandlerThreadScheduler(handler);
scheduler.schedule(action, 1L, TimeUnit.SECONDS);
Inner inner = scheduler.createInner();
inner.schedule(action, 1L, TimeUnit.SECONDS);

// verify that we post to the given Handler
ArgumentCaptor<Runnable> runnable = ArgumentCaptor.forClass(Runnable.class);
verify(handler).postDelayed(runnable.capture(), eq(1000L));

// verify that the given handler delegates to our action
runnable.getValue().run();
verify(action).call(any(Inner.class));
verify(action).call();
}
}
Loading