Skip to content

Commit

Permalink
Merge pull request #1324 from benjchristensen/trampoline-schedule-uns…
Browse files Browse the repository at this point in the history
…ubscribe

TrampolineScheduler & Unsubscribe
  • Loading branch information
benjchristensen committed Jun 4, 2014
2 parents 7de1f9b + 7058a76 commit fb1a806
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,6 @@ private Subscription enqueue(Action0 action, long execTime) {

if (exec) {
while (!queue.isEmpty()) {
if (innerSubscription.isUnsubscribed()) {
return Subscriptions.empty();
}
queue.poll().action.call();
}

Expand All @@ -108,7 +105,6 @@ public void call() {

@Override
public void unsubscribe() {
QUEUE.set(null); // this assumes we are calling unsubscribe from the same thread
innerSubscription.unsubscribe();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,18 @@
*/
package rx.schedulers;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Arrays;

import org.junit.Test;

import rx.Observable;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;

Expand Down Expand Up @@ -55,4 +61,62 @@ public void call(String t) {
}
});
}

@Test
public void testNestedTrampolineWithUnsubscribe() {
final ArrayList<String> workDone = new ArrayList<String>();
Worker worker = Schedulers.trampoline().createWorker();
worker.schedule(new Action0() {

@Override
public void call() {
doWorkOnNewTrampoline("A", workDone);
}

});

final Worker worker2 = Schedulers.trampoline().createWorker();
worker2.schedule(new Action0() {

@Override
public void call() {
doWorkOnNewTrampoline("B", workDone);
// we unsubscribe worker2 ... it should not affect work scheduled on a separate Trampline.Worker
worker2.unsubscribe();
}

});

assertEquals(6, workDone.size());
assertEquals(Arrays.asList("A.1", "A.B.1", "A.B.2", "B.1", "B.B.1", "B.B.2"), workDone);
}

private static void doWorkOnNewTrampoline(final String key, final ArrayList<String> workDone) {
Worker worker = Schedulers.trampoline().createWorker();
worker.schedule(new Action0() {

@Override
public void call() {
String msg = key + ".1";
workDone.add(msg);
System.out.println(msg);
Worker worker3 = Schedulers.trampoline().createWorker();
worker3.schedule(createPrintAction(key + ".B.1", workDone));
worker3.schedule(createPrintAction(key + ".B.2", workDone));
}

});
}

private static Action0 createPrintAction(final String message, final ArrayList<String> workDone) {
return new Action0() {

@Override
public void call() {
System.out.println(message);
workDone.add(message);
}

};
}
}

0 comments on commit fb1a806

Please sign in to comment.