From 2c915ad03757349d6b559dbab8233e3270f44f16 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 21 Aug 2015 18:30:27 +0200 Subject: [PATCH] Blocking subscribe methods for convenience --- .../rx/observables/BlockingObservable.java | 245 +++++++++++++++++- .../observables/BlockingObservableTest.java | 164 ++++++++++-- 2 files changed, 376 insertions(+), 33 deletions(-) diff --git a/src/main/java/rx/observables/BlockingObservable.java b/src/main/java/rx/observables/BlockingObservable.java index 7eced68981c..de5583af526 100644 --- a/src/main/java/rx/observables/BlockingObservable.java +++ b/src/main/java/rx/observables/BlockingObservable.java @@ -15,23 +15,19 @@ */ package rx.observables; -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import rx.*; import rx.Observable; -import rx.Subscriber; -import rx.Subscription; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.internal.operators.BlockingOperatorLatest; -import rx.internal.operators.BlockingOperatorMostRecent; -import rx.internal.operators.BlockingOperatorNext; -import rx.internal.operators.BlockingOperatorToFuture; -import rx.internal.operators.BlockingOperatorToIterator; +import rx.Observer; +import rx.annotations.Experimental; +import rx.exceptions.OnErrorNotImplementedException; +import rx.functions.*; +import rx.internal.operators.*; import rx.internal.util.UtilityFunctions; +import rx.subscriptions.Subscriptions; /** * {@code BlockingObservable} is a variety of {@link Observable} that provides blocking operators. It can be @@ -477,4 +473,227 @@ private void awaitForComplete(CountDownLatch latch, Subscription subscription) { throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); } } + + /** + * Runs the source observable to a terminal event, ignoring any values and rethrowing any exception. + */ + @Experimental + public void run() { + final CountDownLatch cdl = new CountDownLatch(1); + final Throwable[] error = { null }; + Subscription s = o.subscribe(new Subscriber() { + @Override + public void onNext(T t) { + + } + @Override + public void onError(Throwable e) { + error[0] = e; + cdl.countDown(); + } + + @Override + public void onCompleted() { + cdl.countDown(); + } + }); + + awaitForComplete(cdl, s); + Throwable e = error[0]; + if (e != null) { + if (e instanceof RuntimeException) { + throw (RuntimeException)e; + } else { + throw new RuntimeException(e); + } + } + } + + /** + * Subscribes to the source and calls back the Observer methods on the current thread. + * @param observer the observer to call event methods on + */ + @Experimental + public void subscribe(Observer observer) { + final NotificationLite nl = NotificationLite.instance(); + final BlockingQueue queue = new LinkedBlockingQueue(); + + Subscription s = o.subscribe(new Subscriber() { + @Override + public void onNext(T t) { + queue.offer(nl.next(t)); + } + @Override + public void onError(Throwable e) { + queue.offer(nl.error(e)); + } + @Override + public void onCompleted() { + queue.offer(nl.completed()); + } + }); + + try { + for (;;) { + Object o = queue.poll(); + if (o == null) { + o = queue.take(); + } + if (nl.accept(observer, o)) { + return; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + observer.onError(e); + } finally { + s.unsubscribe(); + } + } + + /** Constant to indicate the onStart method should be called. */ + private static final Object ON_START = new Object(); + + /** Constant indicating the setProducer method should be called. */ + private static final Object SET_PRODUCER = new Object(); + + /** Indicates an unsubscripton happened */ + private static final Object UNSUBSCRIBE = new Object(); + + /** + * Subscribes to the source and calls the Subscriber methods on the current thread. + *

+ * The unsubscription and backpressure is composed through. + * @param subscriber the subscriber to forward events and calls to in the current thread + */ + @Experimental + public void subscribe(Subscriber subscriber) { + final NotificationLite nl = NotificationLite.instance(); + final BlockingQueue queue = new LinkedBlockingQueue(); + final Producer[] theProducer = { null }; + + Subscriber s = new Subscriber() { + @Override + public void onNext(T t) { + queue.offer(nl.next(t)); + } + @Override + public void onError(Throwable e) { + queue.offer(nl.error(e)); + } + @Override + public void onCompleted() { + queue.offer(nl.completed()); + } + + @Override + public void setProducer(Producer p) { + theProducer[0] = p; + queue.offer(SET_PRODUCER); + } + + @Override + public void onStart() { + queue.offer(ON_START); + } + }; + + subscriber.add(s); + subscriber.add(Subscriptions.create(new Action0() { + @Override + public void call() { + queue.offer(UNSUBSCRIBE); + } + })); + + o.subscribe(s); + + try { + for (;;) { + if (subscriber.isUnsubscribed()) { + break; + } + Object o = queue.poll(); + if (o == null) { + o = queue.take(); + } + if (subscriber.isUnsubscribed() || o == UNSUBSCRIBE) { + break; + } + if (o == ON_START) { + subscriber.onStart(); + } else + if (o == SET_PRODUCER) { + subscriber.setProducer(theProducer[0]); + } else + if (nl.accept(subscriber, o)) { + return; + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + subscriber.onError(e); + } finally { + s.unsubscribe(); + } + } + + /** + * Runs the source observable to a terminal event, ignoring any values and rethrowing any exception. + */ + @Experimental + public void subscribe() { + run(); + } + + /** + * Subscribes to the source and calls the given action on the current thread and rethrows any exception wrapped + * into OnErrorNotImplementedException. + * @param onNext the callback action for each source value + */ + @Experimental + public void subscribe(final Action1 onNext) { + subscribe(onNext, new Action1() { + @Override + public void call(Throwable t) { + throw new OnErrorNotImplementedException(t); + } + }, Actions.empty()); + } + + /** + * Subscribes to the source and calls the given actions on the current thread. + * @param onNext the callback action for each source value + * @param onError the callback action for an error event + */ + @Experimental + public void subscribe(final Action1 onNext, final Action1 onError) { + subscribe(onNext, onError, Actions.empty()); + } + + /** + * Subscribes to the source and calls the given actions on the current thread. + * @param onNext the callback action for each source value + * @param onError the callback action for an error event + * @param onCompleted the callback action for the completion event. + */ + @Experimental + public void subscribe(final Action1 onNext, final Action1 onError, final Action0 onCompleted) { + subscribe(new Observer() { + @Override + public void onNext(T t) { + onNext.call(t); + } + + @Override + public void onError(Throwable e) { + onError.call(e); + } + + @Override + public void onCompleted() { + onCompleted.call(); + } + }); + } } diff --git a/src/test/java/rx/observables/BlockingObservableTest.java b/src/test/java/rx/observables/BlockingObservableTest.java index 4328461d804..72963f76aed 100644 --- a/src/test/java/rx/observables/BlockingObservableTest.java +++ b/src/test/java/rx/observables/BlockingObservableTest.java @@ -15,34 +15,25 @@ */ package rx.observables; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.util.Iterator; -import java.util.NoSuchElementException; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import org.junit.*; +import org.mockito.*; + import rx.Observable; import rx.Observable.OnSubscribe; import rx.Subscriber; -import rx.exceptions.TestException; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; +import rx.exceptions.*; +import rx.functions.*; +import rx.observers.TestSubscriber; import rx.schedulers.Schedulers; import rx.subscriptions.Subscriptions; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - public class BlockingObservableTest { @Mock @@ -641,4 +632,137 @@ private InterruptedException getInterruptedExceptionOrNull() { } + @Test + public void testRun() { + Observable.just(1).observeOn(Schedulers.computation()).toBlocking().run(); + } + + @Test(expected = TestException.class) + public void testRunException() { + Observable.error(new TestException()).observeOn(Schedulers.computation()).toBlocking().run(); + } + + @Test + public void testRunIOException() { + try { + Observable.error(new IOException()).observeOn(Schedulers.computation()).toBlocking().run(); + fail("No exception thrown"); + } catch (RuntimeException ex) { + if (ex.getCause() instanceof IOException) { + return; + } + fail("Bad exception type: " + ex + ", " + ex.getCause()); + } + } + + @Test + public void testSubscriberBackpressure() { + TestSubscriber ts = new TestSubscriber() { + @Override + public void onStart() { + request(2); + } + + @Override + public void onNext(Integer t) { + super.onNext(t); + unsubscribe(); + } + }; + + Observable.range(1, 10).observeOn(Schedulers.computation()).toBlocking().subscribe(ts); + + ts.assertNoErrors(); + ts.assertNotCompleted(); + ts.assertValue(1); + } + + @Test(expected = OnErrorNotImplementedException.class) + public void testOnErrorNotImplemented() { + Observable.error(new TestException()).observeOn(Schedulers.computation()).toBlocking().subscribe(Actions.empty()); + } + + @Test + public void testSubscribeCallback1() { + final boolean[] valueReceived = { false }; + Observable.just(1).observeOn(Schedulers.computation()).toBlocking().subscribe(new Action1() { + @Override + public void call(Integer t) { + valueReceived[0] = true; + assertEquals((Integer)1, t); + } + }); + + assertTrue(valueReceived[0]); + } + + @Test + public void testSubscribeCallback2() { + final boolean[] received = { false }; + Observable.error(new TestException()).observeOn(Schedulers.computation()).toBlocking() + .subscribe(new Action1() { + @Override + public void call(Object t) { + fail("Value emitted: " + t); + } + }, new Action1() { + @Override + public void call(Throwable t) { + received[0] = true; + assertEquals(TestException.class, t.getClass()); + } + }); + + assertTrue(received[0]); + } + + @Test + public void testSubscribeCallback3() { + final boolean[] received = { false, false }; + Observable.just(1).observeOn(Schedulers.computation()).toBlocking().subscribe(new Action1() { + @Override + public void call(Integer t) { + received[0] = true; + assertEquals((Integer)1, t); + } + }, new Action1() { + @Override + public void call(Throwable t) { + t.printStackTrace(); + fail("Exception received!"); + } + }, new Action0() { + @Override + public void call() { + received[1] = true; + } + }); + + assertTrue(received[0]); + assertTrue(received[1]); + } + @Test + public void testSubscribeCallback3Error() { + final TestSubscriber ts = TestSubscriber.create(); + Observable.error(new TestException()).observeOn(Schedulers.computation()).toBlocking().subscribe(new Action1() { + @Override + public void call(Object t) { + ts.onNext(t); + } + }, new Action1() { + @Override + public void call(Throwable t) { + ts.onError(t); + } + }, new Action0() { + @Override + public void call() { + ts.onCompleted(); + } + }); + + ts.assertNoValues(); + ts.assertNotCompleted(); + ts.assertError(TestException.class); + } }