From 7dceb563eeb836a7dd09280b07af78c69ebb1793 Mon Sep 17 00:00:00 2001 From: Shawn Feldman Date: Tue, 6 Oct 2015 08:39:21 -0600 Subject: [PATCH] move subscriber --- .../asyncevents/AmazonAsyncEventService.java | 5 ++- .../core/rx/ExceptionBehaviorTest.java | 32 ++++++++++++++++++- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index e16de05824..14d37b55af 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -547,12 +547,11 @@ public void call(final Subscriber> subscriber) { //ack each message, but only if we didn't error. ack(message); }) - ) - .subscribeOn(Schedulers.newThread()); + ); //start in the background - final Subscription subscription = consumer.subscribe(); + final Subscription subscription = consumer.subscribeOn(Schedulers.newThread()).subscribe(); subscriptions.add(subscription); } diff --git a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java index be10d0aaaf..8e4f4c4c6e 100644 --- a/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java +++ b/stack/corepersistence/common/src/test/java/org/apache/usergrid/persistence/core/rx/ExceptionBehaviorTest.java @@ -22,6 +22,7 @@ import rx.Observable; import rx.Observer; +import rx.schedulers.Schedulers; /** @@ -31,13 +32,21 @@ public class ExceptionBehaviorTest { //this test shows toBlocking re-throws exceptions correctly @Test( expected = TestException.class ) - public void throwOnBlocking() { + public void throwOnBlockingFirst() { Observable.range( 0, 1 ).map( integer -> { throw new TestException( "I throw and exception" ); } ).toBlocking().first(); } + @Test( expected = TestException.class ) + public void throwOnBlockingLast() { + + Observable.range( 0, 1 ).map( integer -> { + throw new TestException( "I throw and exception" ); + } ).toBlocking().last(); + } + // // /** // * This shows that no re-throw happens on subscribe. This is as designed, but not as expected @@ -67,6 +76,27 @@ public void throwOnSubscribeObservable() { exceptionObserver.checkResult(); } + /** + * Tests working with observers + */ + @Test( expected = TestException.class ) + public void throwOnSubscribeObservableNewThread() throws Exception { + + final ReThrowObserver exceptionObserver = new ReThrowObserver(); + + Observable.range( 0, 1 ).map(integer -> { + throw new TestException("I throw and exception"); + }) + .doOnError(t -> exceptionObserver.onError(t)) + .subscribeOn(Schedulers.newThread()) + .subscribe(exceptionObserver); + + for(int i =0; i<5; i++) { + exceptionObserver.checkResult(); + Thread.sleep(200); + } + } + private static final class TestException extends RuntimeException {