diff --git a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj index dd6698af7f..2eae2329ed 100644 --- a/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj +++ b/language-adaptors/rxjava-clojure/src/test/clojure/rx/lang/clojure/interop_test.clj @@ -120,14 +120,16 @@ (.onNext o 99) (.onCompleted o) (rx.subscriptions.Subscriptions/empty))) - (BlockingObservable/single))))) + .toBlockingObservable + .single)))) (testing "can pass rx/fn to map and friends" (is (= (+ 1 4 9) (-> (Observable/from [1 2 3]) (.map (rx/fn [v] (* v v))) (.reduce (rx/fn* +)) - (BlockingObservable/single))))) + .toBlockingObservable + .single)))) (testing "can pass rx/action to subscribe and friends" (let [finally-called (atom nil) diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 28666cf48a..edb13655a7 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -324,8 +324,12 @@ def class ObservableTests { return Observable.from(1, 3, 2, 5, 4); } - public TestObservable getObservable() { - return new TestObservable(counter++); + public TestOnSubscribe getOnSubscribe() { + return new TestOnSubscribe(counter++); + } + + public Observable getObservable() { + return Observable.create(getOnSubscribe()); } } @@ -335,14 +339,14 @@ def class ObservableTests { public void received(Object o); } - def class TestObservable extends Observable { + def class TestOnSubscribe implements OnSubscribeFunc { private final int count; - public TestObservable(int count) { + public TestOnSubscribe(int count) { this.count = count; } - public Subscription subscribe(Observer observer) { + public Subscription onSubscribe(Observer observer) { observer.onNext("hello_" + count); observer.onCompleted(); diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 4732d5fcad..419a7e0c9d 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -28,6 +28,7 @@ import org.mockito.MockitoAnnotations; import rx.Observable; +import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; import rx.operators.OperationMostRecent; @@ -44,9 +45,7 @@ /** * An extension of {@link Observable} that provides blocking operators. *

- * You construct a BlockingObservable from an Observable with {@link #from(Observable)} or - * {@link Observable#toBlockingObservable()} - *

+ * You construct a BlockingObservable from an Observable with {@link #from(Observable)} or {@link Observable#toBlockingObservable()}

* The documentation for this interface makes use of a form of marble diagram that has been * modified to illustrate blocking operators. The following legend explains these marble diagrams: *

@@ -58,145 +57,19 @@ * * @param */ -public class BlockingObservable extends Observable { - - protected BlockingObservable(OnSubscribeFunc onSubscribe) { - super(onSubscribe); - } - - /** - * Convert an Observable into a BlockingObservable. - */ - public static BlockingObservable from(final Observable o) { - return new BlockingObservable(new OnSubscribeFunc() { - - @Override - public Subscription onSubscribe(Observer observer) { - return o.subscribe(observer); - } - }); - } - - /** - * Returns an {@link Iterator} that iterates over all items emitted by a specified - * {@link Observable}. - *

- * - * - * @param source - * the source {@link Observable} - * @param - * the type of items emitted by the source {@link Observable} - * @return an {@link Iterator} that can iterate over the items emitted by the {@link Observable} - */ - public static Iterator toIterator(Observable source) { - return OperationToIterator.toIterator(source); - } +public class BlockingObservable { - /** - * Returns the last item emitted by a specified {@link Observable}. - *

- * - * - * @param source - * the source {@link Observable} - * @return the last item emitted by the source {@link Observable} - */ - public static T last(final Observable source) { - return from(source).last(); - } + private final Observable o; - /** - * Returns the last item emitted by an {@link Observable} that matches a given predicate. - *

- * - * - * @param source - * the source {@link Observable} - * @param predicate - * a predicate function to evaluate items emitted by the {@link Observable} - * @return the last item emitted by the {@link Observable} for which the predicate function - * returns true - */ - public static T last(final Observable source, final Func1 predicate) { - return last(source.filter(predicate)); + private BlockingObservable(Observable o) { + this.o = o; } /** - * Returns the last item emitted by an {@link Observable}, or a default value if no item is - * emitted. - *

- * - * - * @param source - * the source {@link Observable} - * @param defaultValue - * a default value to return if the {@link Observable} emits no items - * @param - * the type of items emitted by the {@link Observable} - * @return the last item emitted by an {@link Observable}, or the default value if no item is - * emitted - */ - public static T lastOrDefault(Observable source, T defaultValue) { - return from(source).lastOrDefault(defaultValue); - } - - /** - * Returns the last item emitted by an {@link Observable} that matches a given predicate, or a - * default value if no such item is emitted. - *

- * - * - * @param source - * the source {@link Observable} - * @param defaultValue - * a default value to return if the {@link Observable} emits no matching items - * @param predicate - * a predicate function to evaluate items emitted by the {@link Observable} - * @param - * the type of items emitted by the {@link Observable} - * @return the last item emitted by an {@link Observable} that matches the predicate, or the - * default value if no matching item is emitted - */ - public static T lastOrDefault(Observable source, T defaultValue, Func1 predicate) { - return lastOrDefault(source.filter(predicate), defaultValue); - } - - /** - * Returns an {@link Iterable} that always returns the item most recently emitted by an - * {@link Observable}. - *

- * - * - * @param source - * the source {@link Observable} - * @param - * the type of items emitted by the {@link Observable} - * @param initialValue - * the initial value that will be yielded by the {@link Iterable} sequence if the - * {@link Observable} has not yet emitted an item - * @return an {@link Iterable} that on each iteration returns the item that the - * {@link Observable} has most recently emitted - */ - public static Iterable mostRecent(Observable source, T initialValue) { - return OperationMostRecent.mostRecent(source, initialValue); - } - - /** - * Returns an {@link Iterable} that blocks until the {@link Observable} emits another item, - * then returns that item. - *

- * - * - * @param items - * the source {@link Observable} - * @param - * the type of items emitted by the {@link Observable} - * @return an {@link Iterable} that blocks upon each iteration until the {@link Observable} - * emits a new item, whereupon the Iterable returns that item + * Convert an Observable into a BlockingObservable. */ - public static Iterable next(Observable items) { - return OperationNext.next(items); + public static BlockingObservable from(final Observable o) { + return new BlockingObservable(o); } private static T _singleOrDefault(BlockingObservable source, boolean hasDefault, T defaultValue) { @@ -218,107 +91,6 @@ private static T _singleOrDefault(BlockingObservable source, bo return result; } - /** - * If the {@link Observable} completes after emitting a single item, return that item, - * otherwise throw an exception. - *

- * - * - * @param source - * the source {@link Observable} - * @return the single item emitted by the {@link Observable} - * @throws IllegalStateException - * if the {@link Observable} does not emit exactly one item - */ - public static T single(Observable source) { - return from(source).single(); - } - - /** - * If the {@link Observable} completes after emitting a single item that matches a given - * predicate, return that item, otherwise throw an exception. - *

- * - * - * @param source - * the source {@link Observable} - * @param predicate - * a predicate function to evaluate items emitted by the {@link Observable} - * @return the single item emitted by the source {@link Observable} that matches the predicate - * @throws IllegalStateException - * if the {@link Observable} does not emit exactly one item that matches the - * predicate - */ - public static T single(Observable source, Func1 predicate) { - return from(source).single(predicate); - } - - /** - * If the {@link Observable} completes after emitting a single item, return that item, otherwise - * return a default value. - *

- * - * - * @param source - * the source {@link Observable} - * @param defaultValue - * a default value to return if the {@link Observable} emits no items - * @return the single item emitted by the source {@link Observable}, or a default value if no - * value is emitted - */ - public static T singleOrDefault(Observable source, T defaultValue) { - return from(source).singleOrDefault(defaultValue); - } - - /** - * If the {@link Observable} completes after emitting a single item that matches a given - * predicate, return that item, otherwise return a default value. - *

- * - * - * @param source - * the source {@link Observable} - * @param defaultValue - * a default value to return if the {@link Observable} emits no matching items - * @param predicate - * a predicate function to evaluate items emitted by the {@link Observable} - * @return the single item emitted by the source {@link Observable} that matches the predicate, - * or a default value if no such value is emitted - */ - public static T singleOrDefault(Observable source, T defaultValue, Func1 predicate) { - return from(source).singleOrDefault(defaultValue, predicate); - } - - /** - * Returns a {@link Future} representing the single value emitted by an {@link Observable}. - *

- * toFuture() throws an exception if the {@link Observable} emits more than one - * item. If the Observable may emit more than item, use - * {@link Observable#toList toList()}.toFuture(). - *

- * - * - * @param source - * the source {@link Observable} - * @return a Future that expects a single item to be emitted by the source {@link Observable} - */ - public static Future toFuture(final Observable source) { - return OperationToFuture.toFuture(source); - } - - /** - * Converts an {@link Observable} into an {@link Iterable}. - *

- * - * - * @param source - * the source {@link Observable} - * @return an {@link Iterable} version of the underlying {@link Observable} - */ - public static Iterable toIterable(final Observable source) { - return from(source).toIterable(); - } - /** * Used for protecting against errors being thrown from {@link Observer} implementations and * ensuring onNext/onError/onCompleted contract compliance. @@ -326,11 +98,11 @@ public static Iterable toIterable(final Observable source) { * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect * calls to user code from within an operator" */ - private Subscription protectivelyWrapAndSubscribe(Observer o) { + private Subscription protectivelyWrapAndSubscribe(Observer observer) { SafeObservableSubscription subscription = new SafeObservableSubscription(); - return subscription.wrap(subscribe(new SafeObserver(subscription, o))); + return subscription.wrap(o.subscribe(new SafeObserver(subscription, observer))); } - + /** * Invoke a method on each item emitted by the {@link Observable}; block until the Observable * completes. @@ -400,15 +172,14 @@ public void onNext(T args) { } /** - * Returns an {@link Iterator} that iterates over all items emitted by a specified - * {@link Observable}. + * Returns an {@link Iterator} that iterates over all items emitted by a specified {@link Observable}. *

* * * @return an {@link Iterator} that can iterate over the items emitted by the {@link Observable} */ public Iterator getIterator() { - return OperationToIterator.toIterator(this); + return OperationToIterator.toIterator(o); } /** @@ -436,7 +207,7 @@ public T last() { * @return the last item emitted by the {@link Observable} that matches the predicate */ public T last(final Func1 predicate) { - return last(this, predicate); + return from(o.filter(predicate)).last(); } /** @@ -480,23 +251,20 @@ public T lastOrDefault(T defaultValue) { * default value if no matching items are emitted */ public T lastOrDefault(T defaultValue, Func1 predicate) { - return lastOrDefault(this, defaultValue, predicate); + return from(o.filter(predicate)).lastOrDefault(defaultValue); } /** - * Returns an {@link Iterable} that always returns the item most recently emitted by an - * {@link Observable}. + * Returns an {@link Iterable} that always returns the item most recently emitted by an {@link Observable}. *

* * * @param initialValue - * the initial value that will be yielded by the {@link Iterable} sequence if the - * {@link Observable} has not yet emitted an item - * @return an {@link Iterable} that on each iteration returns the item that the - * {@link Observable} has most recently emitted + * the initial value that will be yielded by the {@link Iterable} sequence if the {@link Observable} has not yet emitted an item + * @return an {@link Iterable} that on each iteration returns the item that the {@link Observable} has most recently emitted */ public Iterable mostRecent(T initialValue) { - return mostRecent(this, initialValue); + return OperationMostRecent.mostRecent(o, initialValue); } /** @@ -505,11 +273,10 @@ public Iterable mostRecent(T initialValue) { *

* * - * @return an {@link Iterable} that blocks upon each iteration until the {@link Observable} - * emits a new item, whereupon the Iterable returns that item + * @return an {@link Iterable} that blocks upon each iteration until the {@link Observable} emits a new item, whereupon the Iterable returns that item */ public Iterable next() { - return next(this); + return OperationNext.next(o); } /** @@ -535,7 +302,7 @@ public T single() { * @return the single item emitted by the source {@link Observable} that matches the predicate */ public T single(Func1 predicate) { - return _singleOrDefault(from(this.filter(predicate)), false, null); + return _singleOrDefault(from(o.filter(predicate)), false, null); } /** @@ -568,23 +335,21 @@ public T singleOrDefault(T defaultValue) { * default value if no such items are emitted */ public T singleOrDefault(T defaultValue, Func1 predicate) { - return _singleOrDefault(from(this.filter(predicate)), true, defaultValue); + return _singleOrDefault(from(o.filter(predicate)), true, defaultValue); } /** * Returns a {@link Future} representing the single value emitted by an {@link Observable}. *

* toFuture() throws an exception if the Observable emits more than one item. If - * the Observable may emit more than item, use - * {@link Observable#toList toList()}.toFuture(). + * the Observable may emit more than item, use {@link Observable#toList toList()}.toFuture(). *

* * - * @return a {@link Future} that expects a single item to be emitted by the source - * {@link Observable} + * @return a {@link Future} that expects a single item to be emitted by the source {@link Observable} */ public Future toFuture() { - return toFuture(this); + return OperationToFuture.toFuture(o); } /** @@ -629,7 +394,7 @@ public void testLastEmptyObservable() { @Test public void testLastOrDefault() { - BlockingObservable observable = BlockingObservable.from(from(1, 0, -1)); + BlockingObservable observable = BlockingObservable.from(Observable.from(1, 0, -1)); int last = observable.lastOrDefault(-100, new Func1() { @Override public Boolean call(Integer args) { @@ -641,19 +406,19 @@ public Boolean call(Integer args) { @Test public void testLastOrDefault1() { - BlockingObservable observable = BlockingObservable.from(from("one", "two", "three")); + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); assertEquals("three", observable.lastOrDefault("default")); } @Test public void testLastOrDefault2() { - BlockingObservable observable = BlockingObservable.from(from()); + BlockingObservable observable = BlockingObservable.from(Observable.from()); assertEquals("default", observable.lastOrDefault("default")); } @Test public void testLastOrDefaultWithPredicate() { - BlockingObservable observable = BlockingObservable.from(from(1, 0, -1)); + BlockingObservable observable = BlockingObservable.from(Observable.from(1, 0, -1)); int last = observable.lastOrDefault(0, new Func1() { @Override public Boolean call(Integer args) { @@ -666,7 +431,7 @@ public Boolean call(Integer args) { @Test public void testLastOrDefaultWrongPredicate() { - BlockingObservable observable = BlockingObservable.from(from(-1, -2, -3)); + BlockingObservable observable = BlockingObservable.from(Observable.from(-1, -2, -3)); int last = observable.lastOrDefault(0, new Func1() { @Override public Boolean call(Integer args) { @@ -689,19 +454,19 @@ public Boolean call(String s) { } public void testSingle() { - BlockingObservable observable = BlockingObservable.from(from("one")); + BlockingObservable observable = BlockingObservable.from(Observable.from("one")); assertEquals("one", observable.single()); } @Test public void testSingleDefault() { - BlockingObservable observable = BlockingObservable.from(from()); + BlockingObservable observable = BlockingObservable.from(Observable.from()); assertEquals("default", observable.singleOrDefault("default")); } @Test(expected = IllegalStateException.class) public void testSingleDefaultPredicateMatchesMoreThanOne() { - BlockingObservable.from(from("one", "two")).singleOrDefault("default", new Func1() { + BlockingObservable.from(Observable.from("one", "two")).singleOrDefault("default", new Func1() { @Override public Boolean call(String args) { return args.length() == 3; @@ -711,7 +476,7 @@ public Boolean call(String args) { @Test public void testSingleDefaultPredicateMatchesNothing() { - BlockingObservable observable = BlockingObservable.from(from("one", "two")); + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two")); String result = observable.singleOrDefault("default", new Func1() { @Override public Boolean call(String args) { @@ -723,13 +488,13 @@ public Boolean call(String args) { @Test(expected = IllegalStateException.class) public void testSingleDefaultWithMoreThanOne() { - BlockingObservable observable = BlockingObservable.from(from("one", "two", "three")); + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); observable.singleOrDefault("default"); } @Test public void testSingleWithPredicateDefault() { - BlockingObservable observable = BlockingObservable.from(from("one", "two", "four")); + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "four")); assertEquals("four", observable.single(new Func1() { @Override public Boolean call(String s) { @@ -740,13 +505,13 @@ public Boolean call(String s) { @Test(expected = IllegalStateException.class) public void testSingleWrong() { - BlockingObservable observable = BlockingObservable.from(from(1, 2)); + BlockingObservable observable = BlockingObservable.from(Observable.from(1, 2)); observable.single(); } @Test(expected = IllegalStateException.class) public void testSingleWrongPredicate() { - BlockingObservable observable = BlockingObservable.from(from(-1)); + BlockingObservable observable = BlockingObservable.from(Observable.from(-1)); observable.single(new Func1() { @Override public Boolean call(Integer args) { @@ -757,7 +522,7 @@ public Boolean call(Integer args) { @Test public void testToIterable() { - BlockingObservable obs = BlockingObservable.from(from("one", "two", "three")); + BlockingObservable obs = BlockingObservable.from(Observable.from("one", "two", "three")); Iterator it = obs.toIterable().iterator(); @@ -776,7 +541,7 @@ public void testToIterable() { @Test(expected = TestException.class) public void testToIterableWithException() { - BlockingObservable obs = BlockingObservable.from(create(new OnSubscribeFunc() { + BlockingObservable obs = BlockingObservable.from(Observable.create(new OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer observer) { @@ -795,7 +560,7 @@ public Subscription onSubscribe(Observer observer) { it.next(); } - + @Test public void testForEachWithError() { try {