Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
move subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn Feldman committed Oct 6, 2015
1 parent 17586ec commit 7dceb56
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 4 deletions.
Expand Up @@ -547,12 +547,11 @@ public void call(final Subscriber<? super List<QueueMessage>> subscriber) {
//ack each message, but only if we didn't error.
ack(message);
})
)
.subscribeOn(Schedulers.newThread());
);

//start in the background

final Subscription subscription = consumer.subscribe();
final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe();

subscriptions.add(subscription);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@

import rx.Observable;
import rx.Observer;
import rx.schedulers.Schedulers;


/**
Expand All @@ -31,13 +32,21 @@ public class ExceptionBehaviorTest {

//this test shows toBlocking re-throws exceptions correctly
@Test( expected = TestException.class )
public void throwOnBlocking() {
public void throwOnBlockingFirst() {

Observable.range( 0, 1 ).map( integer -> {
throw new TestException( "I throw and exception" );
} ).toBlocking().first();
}

@Test( expected = TestException.class )
public void throwOnBlockingLast() {

Observable.range( 0, 1 ).map( integer -> {
throw new TestException( "I throw and exception" );
} ).toBlocking().last();
}

//
// /**
// * This shows that no re-throw happens on subscribe. This is as designed, but not as expected
Expand Down Expand Up @@ -67,6 +76,27 @@ public void throwOnSubscribeObservable() {
exceptionObserver.checkResult();
}

/**
* Tests working with observers
*/
@Test( expected = TestException.class )
public void throwOnSubscribeObservableNewThread() throws Exception {

final ReThrowObserver exceptionObserver = new ReThrowObserver();

Observable.range( 0, 1 ).map(integer -> {
throw new TestException("I throw and exception");
})
.doOnError(t -> exceptionObserver.onError(t))
.subscribeOn(Schedulers.newThread())
.subscribe(exceptionObserver);

for(int i =0; i<5; i++) {
exceptionObserver.checkResult();
Thread.sleep(200);
}
}


private static final class TestException extends RuntimeException {

Expand Down

0 comments on commit 7dceb56

Please sign in to comment.