diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 9c6e244149..34d90f160c 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -278,7 +278,7 @@ def class ObservableTests { assertEquals("one", s) } - @Test(expected = IllegalStateException.class) + @Test(expected = IllegalArgumentException.class) public void testSingle2() { Observable.from("one", "two").toBlockingObservable().single({ x -> x.length() == 3}) } diff --git a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt index d826585ced..9e1fde131e 100644 --- a/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt +++ b/language-adaptors/rxjava-kotlin/src/test/kotlin/rx/lang/kotlin/BasicKotlinTests.kt @@ -255,7 +255,7 @@ public class BasicKotlinTests { assertEquals("default", Observable.from("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length > 3 }) } - [Test(expected = javaClass())] + [Test(expected = javaClass())] public fun testSingle() { assertEquals("one", Observable.from("one")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 }) Observable.from("one", "two")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 } diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 04fa8e5132..c0027d1bf7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -52,14 +52,12 @@ import rx.operators.OperationElementAt; import rx.operators.OperationFilter; import rx.operators.OperationFinally; -import rx.operators.OperationFirstOrDefault; import rx.operators.OperationGroupBy; import rx.operators.OperationGroupByUntil; import rx.operators.OperationGroupJoin; import rx.operators.OperationInterval; import rx.operators.OperationJoin; import rx.operators.OperationJoinPatterns; -import rx.operators.OperationLast; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; @@ -78,6 +76,7 @@ import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSequenceEqual; +import rx.operators.OperationSingle; import rx.operators.OperationSkip; import rx.operators.OperationSkipLast; import rx.operators.OperationSkipUntil; @@ -5091,37 +5090,107 @@ public Observable skip(int num) { return create(OperationSkip.skip(this, num)); } + /** + * If the Observable completes after emitting a single item, return an + * Observable containing that item. If it emits more than one item or no + * item, throw an IllegalArgumentException. + * + * @return an Observable containing the single item emitted by the source + * Observable that matches the predicate. + * @throws IllegalArgumentException + * if the source emits more than one item or no item + */ + public Observable single() { + return create(OperationSingle. single(this)); + } + + /** + * If the Observable completes after emitting a single item that matches a + * predicate, return an Observable containing that item. If it emits more + * than one such item or no item, throw an IllegalArgumentException. + * + * @param predicate + * a predicate function to evaluate items emitted by the source + * Observable + * @return an Observable containing the single item emitted by the source + * Observable that matches the predicate. + * @throws IllegalArgumentException + * if the source emits more than one item or no item matching + * the predicate + */ + public Observable single(Func1 predicate) { + return filter(predicate).single(); + } + + /** + * If the Observable completes after emitting a single item, return an + * Observable containing that item. If it's empty, return an Observable + * containing the defaultValue. If it emits more than one item, throw an + * IllegalArgumentException. + * + * @param defaultValue + * a default value to return if the Observable emits no item + * @return an Observable containing the single item emitted by the source + * Observable, or an Observable containing the defaultValue if no + * item. + * @throws IllegalArgumentException + * if the source emits more than one item + */ + public Observable singleOrDefault(T defaultValue) { + return create(OperationSingle. singleOrDefault(this, defaultValue)); + } + + /** + * If the Observable completes after emitting a single item that matches a + * predicate, return an Observable containing that item. If it emits no such + * item, return an Observable containing the defaultValue. If it emits more + * than one such item, throw an IllegalArgumentException. + * + * @param defaultValue + * a default value to return if the {@link Observable} emits no + * matching items + * @param predicate + * a predicate function to evaluate items emitted by the + * Observable + * @return an Observable containing the single item emitted by the source + * Observable that matches the predicate, or an Observable + * containing the defaultValue if no item matches the predicate + * @throws IllegalArgumentException + * if the source emits more than one item matching the predicate + */ + public Observable singleOrDefault(T defaultValue, Func1 predicate) { + return filter(predicate).singleOrDefault(defaultValue); + } + /** * Returns an Observable that emits only the very first item emitted by the - * source Observable. + * source Observable, or an IllegalArgumentException if the source + * {@link Observable} is empty. *

* * - * @return an Observable that emits only the very first item emitted by the - * source Observable, or nothing if the source Observable completes - * without emitting a single item + * @return an Observable that emits only the very first item from the + * source, or an IllegalArgumentException if the source {@link Observable} is empty. * @see RxJava Wiki: first() - * @see MSDN: Observable.First */ public Observable first() { - return take(1); + return take(1).single(); } /** * Returns an Observable that emits only the very first item emitted by the - * source Observable that satisfies a given condition. + * source Observable that satisfies a given condition, or an IllegalArgumentException + * if no such items are emitted. *

* * * @param predicate the condition any source emitted item has to satisfy * @return an Observable that emits only the very first item satisfying the - * given condition from the source, or nothing if the source - * Observable completes without emitting a single matching item + * given condition from the source, or an IllegalArgumentException if no such items are emitted. * @see RxJava Wiki: first() - * @see MSDN: Observable.First */ public Observable first(Func1 predicate) { - return skipWhile(not(predicate)).take(1); + return takeFirst(predicate).single(); } /** @@ -5139,7 +5208,7 @@ public Observable first(Func1 predicate) { * @see MSDN: Observable.FirstOrDefault */ public Observable firstOrDefault(T defaultValue) { - return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue)); + return take(1).singleOrDefault(defaultValue); } /** @@ -5157,8 +5226,8 @@ public Observable firstOrDefault(T defaultValue) { * @see RxJava Wiki: firstOrDefault() * @see MSDN: Observable.FirstOrDefault */ - public Observable firstOrDefault(Func1 predicate, T defaultValue) { - return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue)); + public Observable firstOrDefault(T defaultValue, Func1 predicate) { + return takeFirst(predicate).singleOrDefault(defaultValue); } /** @@ -5245,14 +5314,15 @@ public Observable takeWhileWithIndex(final Func2 * * @return an Observable that emits only the very first item from the - * source, or none if the source Observable completes without + * source, or an empty Observable if the source Observable completes without * emitting a single item + * @deprecated Use take(1) directly. * @see RxJava Wiki: first() * @see MSDN: Observable.First - * @see #first() */ + @Deprecated public Observable takeFirst() { - return first(); + return take(1); } /** @@ -5263,14 +5333,13 @@ public Observable takeFirst() { * * @param predicate the condition any source emitted item has to satisfy * @return an Observable that emits only the very first item satisfying the - * given condition from the source, or none if the source Observable + * given condition from the source, or an empty Observable if the source Observable * completes without emitting a single matching item * @see RxJava Wiki: first() * @see MSDN: Observable.First - * @see #first(Func1) */ public Observable takeFirst(Func1 predicate) { - return first(predicate); + return filter(predicate).take(1); } /** @@ -5725,7 +5794,7 @@ public Observable groupJoin(Observable right, Func1 isEmpty() { return create(OperationAny.isEmpty(this)); } - + /** * Returns an Observable that emits the last item emitted by the source or * notifies observers of an IllegalArgumentException if the @@ -5738,7 +5807,53 @@ public Observable isEmpty() { * @see RxJava Wiki: last() */ public Observable last() { - return create(OperationLast.last(this)); + return takeLast(1).single(); + } + + /** + * Returns an Observable that emits only the last item emitted by the source + * Observable that satisfies a given condition, or an + * IllegalArgumentException if no such items are emitted. + * + * @param predicate + * the condition any source emitted item has to satisfy + * @return an Observable that emits only the last item satisfying the given + * condition from the source, or an IllegalArgumentException if no + * such items are emitted. + * @throws IllegalArgumentException + * if no such itmes are emmited + */ + public Observable last(Func1 predicate) { + return filter(predicate).takeLast(1).single(); + } + + /** + * Returns an Observable that emits only the last item emitted by the source + * Observable, or a default item if the source is empty. + * + * @param defaultValue + * the default item to emit if the source Observable is empty + * @return an Observable that emits only the last item from the source, or a + * default item if the source is empty + */ + public Observable lastOrDefault(T defaultValue) { + return takeLast(1).singleOrDefault(defaultValue); + } + + /** + * Returns an Observable that emits only the last item emitted by the source + * Observable that satisfies a given condition, or a default item otherwise. + * + * @param defaultValue + * the default item to emit if the source Observable doesn't emit + * anything that satisfies the given condition + * @param predicate + * the condition any source emitted item has to satisfy + * @return an Observable that emits only the last item from the source that + * satisfies the given condition, or a default item otherwise + */ + public Observable lastOrDefault(T defaultValue, Func1 predicate) { + return filter(predicate).takeLast(1).singleOrDefault(defaultValue); } /** diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index 8fe13c9e88..9f1d88e3f3 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -62,25 +62,6 @@ public static BlockingObservable from(final Observable o) { return new BlockingObservable(o); } - private static T _singleOrDefault(BlockingObservable source, boolean hasDefault, T defaultValue) { - Iterator it = source.toIterable().iterator(); - - if (!it.hasNext()) { - if (hasDefault) { - return defaultValue; - } - throw new IllegalStateException("Expected single entry. Actually empty stream."); - } - - T result = it.next(); - - if (it.hasNext()) { - throw new IllegalStateException("Expected single entry. Actually more than one entry."); - } - - return result; - } - /** * Used for protecting against errors being thrown from {@link Observer} implementations and * ensuring onNext/onError/onCompleted contract compliance. @@ -173,7 +154,66 @@ public Iterator getIterator() { } /** - * Returns the last item emitted by a specified {@link Observable}. + * Returns the first item emitted by a specified {@link Observable}, + * or IllegalArgumentException if source contains no elements + * + * @return the first item emitted by the source {@link Observable} + * @throws IllegalArgumentException + * if source contains no elements + * @see MSDN: Observable.First + */ + public T first() { + return from(o.first()).single(); + } + + /** + * Returns the first item emitted by a specified {@link Observable} that matches a predicate, + * or IllegalArgumentException if no such items are emitted. + * + * @param predicate + * a predicate function to evaluate items emitted by the {@link Observable} + * @return the first item emitted by the {@link Observable} that matches the predicate + * @throws IllegalArgumentException + * if no such items are emitted. + * @see MSDN: Observable.First + */ + public T first(Func1 predicate) { + return from(o.first(predicate)).single(); + } + + /** + * Returns the first item emitted by a specified {@link Observable}, or a default value if no + * items are emitted. + * + * @param defaultValue + * a default value to return if the {@link Observable} emits no items + * @return the first item emitted by the {@link Observable}, or the default value if no items + * are emitted + * @see MSDN: Observable.FirstOrDefault + */ + public T firstOrDefault(T defaultValue) { + return from(o.take(1)).singleOrDefault(defaultValue); + } + + /** + * Returns the first item emitted by a specified {@link Observable} that matches a predicate, or + * a default value if no such items are emitted. + * + * @param defaultValue + * a default value to return if the {@link Observable} emits no matching items + * @param predicate + * a predicate function to evaluate items emitted by the {@link Observable} + * @return the first item emitted by the {@link Observable} that matches the predicate, or the + * default value if no matching items are emitted + * @see MSDN: Observable.FirstOrDefault + */ + public T firstOrDefault(T defaultValue, Func1 predicate) { + return from(o.filter(predicate)).firstOrDefault(defaultValue); + } + + /** + * Returns the last item emitted by a specified {@link Observable}, or throws IllegalArgumentException + * if source contains no elements. *

* * @@ -181,20 +221,23 @@ public Iterator getIterator() { * @throws IllegalArgumentException if source contains no elements */ public T last() { - return new BlockingObservable(o.last()).single(); + return from(o.last()).single(); } /** - * Returns the last item emitted by a specified {@link Observable} that matches a predicate. + * Returns the last item emitted by a specified {@link Observable} that matches a predicate, + * or throws IllegalArgumentException if no such items are emitted. *

* * * @param predicate * a predicate function to evaluate items emitted by the {@link Observable} * @return the last item emitted by the {@link Observable} that matches the predicate + * @throws IllegalArgumentException + * if no such items are emitted. */ public T last(final Func1 predicate) { - return from(o.filter(predicate)).last(); + return from(o.last(predicate)).single(); } /** @@ -209,19 +252,7 @@ public T last(final Func1 predicate) { * are emitted */ public T lastOrDefault(T defaultValue) { - boolean found = false; - T result = null; - - for (T value : toIterable()) { - found = true; - result = value; - } - - if (!found) { - return defaultValue; - } - - return result; + return from(o.takeLast(1)).singleOrDefault(defaultValue); } /** @@ -268,19 +299,19 @@ public Iterable next() { /** * If the {@link Observable} completes after emitting a single item, return that item, - * otherwise throw an exception. + * otherwise throw an IllegalArgumentException. *

* * * @return the single item emitted by the {@link Observable} */ public T single() { - return _singleOrDefault(this, false, null); + return from(o.single()).toIterable().iterator().next(); } /** * If the {@link Observable} completes after emitting a single item that matches a given - * predicate, return that item, otherwise throw an exception. + * predicate, return that item, otherwise throw an IllegalArgumentException. *

* * @@ -289,12 +320,12 @@ public T single() { * @return the single item emitted by the source {@link Observable} that matches the predicate */ public T single(Func1 predicate) { - return _singleOrDefault(from(o.filter(predicate)), false, null); + return from(o.single(predicate)).toIterable().iterator().next(); } /** * If the {@link Observable} completes after emitting a single item, return that item; if it - * emits more than one item, throw an exception; if it emits no items, return a default value. + * emits more than one item, throw an IllegalArgumentException; if it emits no items, return a default value. *

* * @@ -304,12 +335,22 @@ public T single(Func1 predicate) { * are emitted */ public T singleOrDefault(T defaultValue) { - return _singleOrDefault(this, true, defaultValue); + Iterator it = this.toIterable().iterator(); + + if (!it.hasNext()) { + return defaultValue; + } + + T result = it.next(); + if (it.hasNext()) { + throw new IllegalArgumentException("Sequence contains too many elements"); + } + return result; } /** * If the {@link Observable} completes after emitting a single item that matches a predicate, - * return that item; if it emits more than one such item, throw an exception; if it emits no + * return that item; if it emits more than one such item, throw an IllegalArgumentException; if it emits no * items, return a default value. *

* @@ -322,7 +363,7 @@ public T singleOrDefault(T defaultValue) { * default value if no such items are emitted */ public T singleOrDefault(T defaultValue, Func1 predicate) { - return _singleOrDefault(from(o.filter(predicate)), true, defaultValue); + return from(o.filter(predicate)).singleOrDefault(defaultValue); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java b/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java deleted file mode 100644 index 1a37d68d73..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationFirstOrDefault.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import static rx.util.functions.Functions.*; - -import java.util.concurrent.atomic.AtomicBoolean; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; -import rx.util.functions.Func1; - -/** - * Returns an Observable that emits the first item emitted by the source - * Observable, or a default value if the source emits nothing. - */ -public final class OperationFirstOrDefault { - - /** - * Returns an Observable that emits the first item emitted by the source - * Observable that satisfies the given condition, - * or a default value if the source emits no items that satisfy the given condition. - * - * @param source - * The source Observable to emit the first item for. - * @param predicate - * The condition the emitted source items have to satisfy. - * @param defaultValue - * The default value to use whenever the source Observable doesn't emit anything. - * @return A subscription function for creating the target Observable. - */ - public static OnSubscribeFunc firstOrDefault(Observable source, Func1 predicate, T defaultValue) { - return new FirstOrElse(source, predicate, defaultValue); - } - - /** - * Returns an Observable that emits the first item emitted by the source - * Observable, or a default value if the source emits nothing. - * - * @param source - * The source Observable to emit the first item for. - * @param defaultValue - * The default value to use whenever the source Observable doesn't emit anything. - * @return A subscription function for creating the target Observable. - */ - public static OnSubscribeFunc firstOrDefault(Observable source, T defaultValue) { - return new FirstOrElse(source, alwaysTrue(), defaultValue); - } - - private static class FirstOrElse implements OnSubscribeFunc { - private final Observable source; - private final Func1 predicate; - private final T defaultValue; - - private FirstOrElse(Observable source, Func1 predicate, T defaultValue) { - this.source = source; - this.defaultValue = defaultValue; - this.predicate = predicate; - } - - @Override - public Subscription onSubscribe(final Observer observer) { - final Subscription sourceSub = source.subscribe(new Observer() { - private final AtomicBoolean hasEmitted = new AtomicBoolean(false); - - @Override - public void onCompleted() { - if (!hasEmitted.get()) { - observer.onNext(defaultValue); - observer.onCompleted(); - } - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onNext(T next) { - try { - if (!hasEmitted.get() && predicate.call(next)) { - hasEmitted.set(true); - observer.onNext(next); - observer.onCompleted(); - } - } catch (Throwable t) { - // may happen within the predicate call (user code) - observer.onError(t); - } - } - }); - - return Subscriptions.create(new Action0() { - @Override - public void call() { - sourceSub.unsubscribe(); - } - }); - } - } -} diff --git a/rxjava-core/src/main/java/rx/operators/OperationLast.java b/rxjava-core/src/main/java/rx/operators/OperationLast.java deleted file mode 100644 index 964afd5176..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationLast.java +++ /dev/null @@ -1,82 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Subscription; - -/** - * Emit an Observable with the last emitted item - * or onError(new IllegalArgumentException("Sequence contains no elements")) if no elements received. - */ -public class OperationLast { - - /** - * Accepts a sequence and returns a sequence that is the last emitted item - * or an error if no items are emitted (empty sequence). - * - * @param sequence - * the input sequence. - * @param - * the type of the sequence. - * @return a sequence containing the last emitted item or that has onError invoked on it if no items - */ - public static OnSubscribeFunc last(final Observable sequence) { - return new OnSubscribeFunc() { - final AtomicReference last = new AtomicReference(); - final AtomicBoolean hasLast = new AtomicBoolean(false); - - @Override - public Subscription onSubscribe(final Observer observer) { - return sequence.subscribe(new Observer() { - - @Override - public void onCompleted() { - /* - * We don't need to worry about the following being non-atomic - * since an Observable sequence is serial so we will not receive - * concurrent executions. - */ - if (hasLast.get()) { - observer.onNext(last.get()); - observer.onCompleted(); - } else { - observer.onError(new IllegalArgumentException("Sequence contains no elements")); - } - } - - @Override - public void onError(Throwable e) { - observer.onError(e); - } - - @Override - public void onNext(T value) { - last.set(value); - hasLast.set(true); - } - }); - } - - }; - } - -} diff --git a/rxjava-core/src/main/java/rx/operators/OperationSingle.java b/rxjava-core/src/main/java/rx/operators/OperationSingle.java new file mode 100644 index 0000000000..2269cd8b44 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSingle.java @@ -0,0 +1,96 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; + +/** + * If the Observable completes after emitting a single item that matches a + * predicate, return an Observable containing that item. If it emits more than + * one such item or no item, throw an IllegalArgumentException. + */ +public class OperationSingle { + + public static OnSubscribeFunc single( + final Observable source) { + return single(source, false, null); + } + + public static OnSubscribeFunc singleOrDefault( + final Observable source, final T defaultValue) { + return single(source, true, defaultValue); + } + + private static OnSubscribeFunc single( + final Observable source, + final boolean hasDefaultValue, final T defaultValue) { + return new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer observer) { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + subscription.wrap(source.subscribe(new Observer() { + + private T value; + private boolean isEmpty = true; + private boolean hasTooManyElemenets; + + @Override + public void onCompleted() { + if (hasTooManyElemenets) { + // We has already sent an onError message + } else { + if (isEmpty) { + if (hasDefaultValue) { + observer.onNext(defaultValue); + observer.onCompleted(); + } else { + observer.onError(new IllegalArgumentException( + "Sequence contains no elements")); + } + } else { + observer.onNext(value); + observer.onCompleted(); + } + } + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + } + + @Override + public void onNext(T value) { + if (isEmpty) { + this.value = value; + isEmpty = false; + } else { + hasTooManyElemenets = true; + observer.onError(new IllegalArgumentException( + "Sequence contains too many elements")); + subscription.unsubscribe(); + } + } + })); + return subscription; + } + }; + } +} diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index 3b94095c27..b02afea164 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -164,9 +164,9 @@ public Subscription onSubscribe(Observer obsv) { verify(w, times(1)).onError(any(RuntimeException.class)); } - public void testFirstWithPredicateOfSome() { + public void testTakeFirstWithPredicateOfSome() { Observable observable = Observable.from(1, 3, 5, 4, 6, 3); - observable.first(IS_EVEN).subscribe(w); + observable.takeFirst(IS_EVEN).subscribe(w); verify(w, times(1)).onNext(anyInt()); verify(w).onNext(4); verify(w, times(1)).onCompleted(); @@ -174,18 +174,18 @@ public void testFirstWithPredicateOfSome() { } @Test - public void testFirstWithPredicateOfNoneMatchingThePredicate() { + public void testTakeFirstWithPredicateOfNoneMatchingThePredicate() { Observable observable = Observable.from(1, 3, 5, 7, 9, 7, 5, 3, 1); - observable.first(IS_EVEN).subscribe(w); + observable.takeFirst(IS_EVEN).subscribe(w); verify(w, never()).onNext(anyInt()); verify(w, times(1)).onCompleted(); verify(w, never()).onError(any(Throwable.class)); } @Test - public void testFirstOfSome() { + public void testTakeFirstOfSome() { Observable observable = Observable.from(1, 2, 3); - observable.first().subscribe(w); + observable.takeFirst().subscribe(w); verify(w, times(1)).onNext(anyInt()); verify(w).onNext(1); verify(w, times(1)).onCompleted(); @@ -193,14 +193,32 @@ public void testFirstOfSome() { } @Test - public void testFirstOfNone() { + public void testTakeFirstOfNone() { Observable observable = Observable.empty(); - observable.first().subscribe(w); + observable.takeFirst().subscribe(w); verify(w, never()).onNext(anyInt()); verify(w, times(1)).onCompleted(); verify(w, never()).onError(any(Throwable.class)); } + @Test + public void testFirstOfNone() { + Observable observable = Observable.empty(); + observable.first().subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, never()).onCompleted(); + verify(w, times(1)).onError(isA(IllegalArgumentException.class)); + } + + @Test + public void testFirstWithPredicateOfNoneMatchingThePredicate() { + Observable observable = Observable.from(1, 3, 5, 7, 9, 7, 5, 3, 1); + observable.first(IS_EVEN).subscribe(w); + verify(w, never()).onNext(anyInt()); + verify(w, never()).onCompleted(); + verify(w, times(1)).onError(isA(IllegalArgumentException.class)); + } + @Test public void testReduce() { Observable observable = Observable.from(1, 2, 3, 4); diff --git a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java index f3e05189c1..d5ebbab904 100644 --- a/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java +++ b/rxjava-core/src/test/java/rx/observables/BlockingObservableTest.java @@ -127,7 +127,7 @@ public void testSingleDefault() { assertEquals("default", observable.singleOrDefault("default")); } - @Test(expected = IllegalStateException.class) + @Test(expected = IllegalArgumentException.class) public void testSingleDefaultPredicateMatchesMoreThanOne() { BlockingObservable.from(Observable.from("one", "two")).singleOrDefault("default", new Func1() { @Override @@ -149,7 +149,7 @@ public Boolean call(String args) { assertEquals("default", result); } - @Test(expected = IllegalStateException.class) + @Test(expected = IllegalArgumentException.class) public void testSingleDefaultWithMoreThanOne() { BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); observable.singleOrDefault("default"); @@ -166,13 +166,13 @@ public Boolean call(String s) { })); } - @Test(expected = IllegalStateException.class) + @Test(expected = IllegalArgumentException.class) public void testSingleWrong() { BlockingObservable observable = BlockingObservable.from(Observable.from(1, 2)); observable.single(); } - @Test(expected = IllegalStateException.class) + @Test(expected = IllegalArgumentException.class) public void testSingleWrongPredicate() { BlockingObservable observable = BlockingObservable.from(Observable.from(-1)); observable.single(new Func1() { @@ -257,6 +257,76 @@ public void call(String t1) { } } + @Test + public void testFirst() { + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); + assertEquals("one", observable.first()); + } + + @Test(expected = IllegalArgumentException.class) + public void testFirstWithEmpty() { + BlockingObservable.from(Observable.empty()).first(); + } + + @Test + public void testFirstWithPredicate() { + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); + String first = observable.first(new Func1() { + @Override + public Boolean call(String args) { + return args.length() > 3; + } + }); + assertEquals("three", first); + } + + @Test(expected = IllegalArgumentException.class) + public void testFirstWithPredicateAndEmpty() { + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); + observable.first(new Func1() { + @Override + public Boolean call(String args) { + return args.length() > 5; + } + }); + } + + @Test + public void testFirstOrDefault() { + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); + assertEquals("one", observable.firstOrDefault("default")); + } + + @Test + public void testFirstOrDefaultWithEmpty() { + BlockingObservable observable = BlockingObservable.from(Observable.empty()); + assertEquals("default", observable.firstOrDefault("default")); + } + + @Test + public void testFirstOrDefaultWithPredicate() { + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); + String first = observable.firstOrDefault("default", new Func1() { + @Override + public Boolean call(String args) { + return args.length() > 3; + } + }); + assertEquals("three", first); + } + + @Test + public void testFirstOrDefaultWithPredicateAndEmpty() { + BlockingObservable observable = BlockingObservable.from(Observable.from("one", "two", "three")); + String first = observable.firstOrDefault("default", new Func1() { + @Override + public Boolean call(String args) { + return args.length() > 5; + } + }); + assertEquals("default", first); + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } diff --git a/rxjava-core/src/test/java/rx/operators/OperationFirstOrDefaultTest.java b/rxjava-core/src/test/java/rx/operators/OperationFirstOrDefaultTest.java index 026caa1319..dfa4de615f 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationFirstOrDefaultTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationFirstOrDefaultTest.java @@ -18,10 +18,10 @@ import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import static org.mockito.MockitoAnnotations.*; -import static rx.operators.OperationFirstOrDefault.*; import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; import org.mockito.Mock; import rx.Observable; @@ -48,7 +48,7 @@ public void before() { @Test public void testFirstOrElseOfNone() { Observable src = Observable.empty(); - Observable.create(firstOrDefault(src, "default")).subscribe(w); + src.firstOrDefault("default").subscribe(w); verify(w, times(1)).onNext(anyString()); verify(w, times(1)).onNext("default"); @@ -59,7 +59,7 @@ public void testFirstOrElseOfNone() { @Test public void testFirstOrElseOfSome() { Observable src = Observable.from("a", "b", "c"); - Observable.create(firstOrDefault(src, "default")).subscribe(w); + src.firstOrDefault("default").subscribe(w); verify(w, times(1)).onNext(anyString()); verify(w, times(1)).onNext("a"); @@ -70,7 +70,7 @@ public void testFirstOrElseOfSome() { @Test public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() { Observable src = Observable.from("a", "b", "c"); - Observable.create(firstOrDefault(src, IS_D, "default")).subscribe(w); + src.firstOrDefault("default", IS_D).subscribe(w); verify(w, times(1)).onNext(anyString()); verify(w, times(1)).onNext("default"); @@ -81,11 +81,222 @@ public void testFirstOrElseWithPredicateOfNoneMatchingThePredicate() { @Test public void testFirstOrElseWithPredicateOfSome() { Observable src = Observable.from("a", "b", "c", "d", "e", "f"); - Observable.create(firstOrDefault(src, IS_D, "default")).subscribe(w); + src.firstOrDefault("default", IS_D).subscribe(w); verify(w, times(1)).onNext(anyString()); verify(w, times(1)).onNext("d"); verify(w, never()).onError(any(Throwable.class)); verify(w, times(1)).onCompleted(); } + + @Test + public void testFirst() { + Observable observable = Observable.from(1, 2, 3).first(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstWithOneElement() { + Observable observable = Observable.from(1).first(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstWithEmpty() { + Observable observable = Observable. empty().first(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstWithPredicate() { + Observable observable = Observable.from(1, 2, 3, 4, 5, 6) + .first(new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstWithPredicateAndOneElement() { + Observable observable = Observable.from(1, 2).first( + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstWithPredicateAndEmpty() { + Observable observable = Observable.from(1).first( + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstOrDefault() { + Observable observable = Observable.from(1, 2, 3) + .firstOrDefault(4); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstOrDefaultWithOneElement() { + Observable observable = Observable.from(1).firstOrDefault(2); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstOrDefaultWithEmpty() { + Observable observable = Observable. empty() + .firstOrDefault(1); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstOrDefaultWithPredicate() { + Observable observable = Observable.from(1, 2, 3, 4, 5, 6) + .firstOrDefault(8, new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstOrDefaultWithPredicateAndOneElement() { + Observable observable = Observable.from(1, 2).firstOrDefault( + 4, new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testFirstOrDefaultWithPredicateAndEmpty() { + Observable observable = Observable.from(1).firstOrDefault(2, + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationLastTest.java b/rxjava-core/src/test/java/rx/operators/OperationLastTest.java index 006de63060..cfd292cff2 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationLastTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationLastTest.java @@ -16,28 +16,35 @@ package rx.operators; import static org.junit.Assert.*; +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import org.junit.Test; +import org.mockito.InOrder; import rx.Observable; +import rx.Observer; +import rx.util.functions.Func1; public class OperationLastTest { @Test public void testLastWithElements() { - Observable last = Observable.create(OperationLast.last(Observable.from(1, 2, 3))); + Observable last = Observable.from(1, 2, 3).last(); assertEquals(3, last.toBlockingObservable().single().intValue()); } @Test(expected = IllegalArgumentException.class) public void testLastWithNoElements() { - Observable last = Observable.create(OperationLast.last(Observable.empty())); + Observable last = Observable.empty().last(); last.toBlockingObservable().single(); } @Test public void testLastMultiSubscribe() { - Observable last = Observable.create(OperationLast.last(Observable.from(1, 2, 3))); + Observable last = Observable.from(1, 2, 3).last(); assertEquals(3, last.toBlockingObservable().single().intValue()); assertEquals(3, last.toBlockingObservable().single().intValue()); } @@ -46,4 +53,215 @@ public void testLastMultiSubscribe() { public void testLastViaObservable() { Observable.from(1, 2, 3).last(); } + + @Test + public void testLast() { + Observable observable = Observable.from(1, 2, 3).last(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(3); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastWithOneElement() { + Observable observable = Observable.from(1).last(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastWithEmpty() { + Observable observable = Observable. empty().last(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastWithPredicate() { + Observable observable = Observable.from(1, 2, 3, 4, 5, 6) + .last(new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(6); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastWithPredicateAndOneElement() { + Observable observable = Observable.from(1, 2).last( + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastWithPredicateAndEmpty() { + Observable observable = Observable.from(1).last( + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastOrDefault() { + Observable observable = Observable.from(1, 2, 3) + .lastOrDefault(4); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(3); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastOrDefaultWithOneElement() { + Observable observable = Observable.from(1).lastOrDefault(2); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastOrDefaultWithEmpty() { + Observable observable = Observable. empty() + .lastOrDefault(1); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastOrDefaultWithPredicate() { + Observable observable = Observable.from(1, 2, 3, 4, 5, 6) + .lastOrDefault(8, new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(6); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastOrDefaultWithPredicateAndOneElement() { + Observable observable = Observable.from(1, 2).lastOrDefault(4, + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testLastOrDefaultWithPredicateAndEmpty() { + Observable observable = Observable.from(1).lastOrDefault(2, + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationSingleTest.java b/rxjava-core/src/test/java/rx/operators/OperationSingleTest.java new file mode 100644 index 0000000000..b89864b1e4 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationSingleTest.java @@ -0,0 +1,242 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.isA; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; +import rx.util.functions.Func1; + +public class OperationSingleTest { + + @Test + public void testSingle() { + Observable observable = Observable.from(1).single(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleWithTooManyElements() { + Observable observable = Observable.from(1, 2).single(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleWithEmpty() { + Observable observable = Observable. empty().single(); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleWithPredicate() { + Observable observable = Observable.from(1, 2).single( + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleWithPredicateAndTooManyElements() { + Observable observable = Observable.from(1, 2, 3, 4).single( + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleWithPredicateAndEmpty() { + Observable observable = Observable.from(1).single( + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleOrDefault() { + Observable observable = Observable.from(1).singleOrDefault(2); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleOrDefaultWithTooManyElements() { + Observable observable = Observable.from(1, 2).singleOrDefault( + 3); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleOrDefaultWithEmpty() { + Observable observable = Observable. empty() + .singleOrDefault(1); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(1); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleOrDefaultWithPredicate() { + Observable observable = Observable.from(1, 2).singleOrDefault( + 4, new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleOrDefaultWithPredicateAndTooManyElements() { + Observable observable = Observable.from(1, 2, 3, 4) + .singleOrDefault(6, new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onError( + isA(IllegalArgumentException.class)); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSingleOrDefaultWithPredicateAndEmpty() { + Observable observable = Observable.from(1).singleOrDefault(2, + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 % 2 == 0; + } + }); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + observable.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext(2); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } +}