Skip to content

Commit

Permalink
Fix Unit Tests related to SubscribeOn
Browse files Browse the repository at this point in the history
- timeout test could be interrupted when unsubscribed
- groupBy.subscribeOn needs blocking buffer
  • Loading branch information
benjchristensen committed Feb 14, 2014
1 parent 43437fe commit 54b19be
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,14 +300,15 @@ public void call() {

});
} else {
return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
return group.nest().lift(new OperatorSubscribeOn<Integer>(Schedulers.newThread(), 0))
.delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {

@Override
public String call(Integer t1) {
return "last group: " + t1;
}
@Override
public String call(Integer t1) {
return "last group: " + t1;
}

});
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import rx.Observable.OnSubscribe;
import rx.Observer;
import rx.Subscriber;
import rx.observers.TestSubscriber;
import rx.schedulers.Schedulers;
import rx.subjects.PublishSubject;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -334,26 +335,31 @@ public void testTimeoutSelectorWithTimeoutAndOnNextRaceCondition() throws Interr
public Observable<Integer> call(Integer t1) {
if (t1 == 1) {
// Force "unsubscribe" run on another thread
return Observable.create(new OnSubscribe<Integer>(){
return Observable.create(new OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.add(Subscriptions.create(new Action0(){
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
try {
// emulate "unsubscribe" is busy and finishes after timeout.onNext(1)
timeoutEmittedOne.await();
} catch (InterruptedException e) {
// if we are interrupted then we complete (as this can happen when unsubscribed)
observerCompleted.countDown();
e.printStackTrace();
}
}}));
}
}));
// force the timeout message be sent after observer.onNext(2)
try {
observerReceivedTwo.await();
} catch (InterruptedException e) {
// if we are interrupted then we complete (as this can happen when unsubscribed)
observerCompleted.countDown();
e.printStackTrace();
}
if(!subscriber.isUnsubscribed()) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(1);
timeoutEmittedOne.countDown();
}
Expand Down Expand Up @@ -386,12 +392,14 @@ public Void answer(InvocationOnMock invocation) throws Throwable {

}).when(o).onCompleted();

final TestSubscriber<Integer> ts = new TestSubscriber<Integer>(o);

new Thread(new Runnable() {

@Override
public void run() {
PublishSubject<Integer> source = PublishSubject.create();
source.timeout(timeoutFunc, Observable.from(3)).subscribe(o);
source.timeout(timeoutFunc, Observable.from(3)).subscribe(ts);
source.onNext(1); // start timeout
source.onNext(2); // disable timeout
try {
Expand Down

0 comments on commit 54b19be

Please sign in to comment.