Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed the blocking/non-blocking first #520

Merged
merged 6 commits into from
Dec 23, 2013
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ def class ObservableTests {
assertEquals("one", s)
}

@Test(expected = IllegalStateException.class)
@Test(expected = IllegalArgumentException.class)
public void testSingle2() {
Observable.from("one", "two").toBlockingObservable().single({ x -> x.length() == 3})
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ public class BasicKotlinTests {
assertEquals("default", Observable.from("one", "two")!!.toBlockingObservable()!!.lastOrDefault("default") { x -> x!!.length > 3 })
}

[Test(expected = javaClass<IllegalStateException>())]
[Test(expected = javaClass<IllegalArgumentException>())]
public fun testSingle() {
assertEquals("one", Observable.from("one")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 })
Observable.from("one", "two")!!.toBlockingObservable()!!.single { x -> x!!.length == 3 }
Expand Down
163 changes: 139 additions & 24 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,12 @@
import rx.operators.OperationElementAt;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationFirstOrDefault;
import rx.operators.OperationGroupBy;
import rx.operators.OperationGroupByUntil;
import rx.operators.OperationGroupJoin;
import rx.operators.OperationInterval;
import rx.operators.OperationJoin;
import rx.operators.OperationJoinPatterns;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
Expand All @@ -78,6 +76,7 @@
import rx.operators.OperationSample;
import rx.operators.OperationScan;
import rx.operators.OperationSequenceEqual;
import rx.operators.OperationSingle;
import rx.operators.OperationSkip;
import rx.operators.OperationSkipLast;
import rx.operators.OperationSkipUntil;
Expand Down Expand Up @@ -5091,37 +5090,107 @@ public Observable<T> skip(int num) {
return create(OperationSkip.skip(this, num));
}

/**
* If the Observable completes after emitting a single item, return an
* Observable containing that item. If it emits more than one item or no
* item, throw an IllegalArgumentException.
*
* @return an Observable containing the single item emitted by the source
* Observable that matches the predicate.
* @throws IllegalArgumentException
* if the source emits more than one item or no item
*/
public Observable<T> single() {
return create(OperationSingle.<T> single(this));
}

/**
* If the Observable completes after emitting a single item that matches a
* predicate, return an Observable containing that item. If it emits more
* than one such item or no item, throw an IllegalArgumentException.
*
* @param predicate
* a predicate function to evaluate items emitted by the source
* Observable
* @return an Observable containing the single item emitted by the source
* Observable that matches the predicate.
* @throws IllegalArgumentException
* if the source emits more than one item or no item matching
* the predicate
*/
public Observable<T> single(Func1<? super T, Boolean> predicate) {
return filter(predicate).single();
}

/**
* If the Observable completes after emitting a single item, return an
* Observable containing that item. If it's empty, return an Observable
* containing the defaultValue. If it emits more than one item, throw an
* IllegalArgumentException.
*
* @param defaultValue
* a default value to return if the Observable emits no item
* @return an Observable containing the single item emitted by the source
* Observable, or an Observable containing the defaultValue if no
* item.
* @throws IllegalArgumentException
* if the source emits more than one item
*/
public Observable<T> singleOrDefault(T defaultValue) {
return create(OperationSingle.<T> singleOrDefault(this, defaultValue));
}

/**
* If the Observable completes after emitting a single item that matches a
* predicate, return an Observable containing that item. If it emits no such
* item, return an Observable containing the defaultValue. If it emits more
* than one such item, throw an IllegalArgumentException.
*
* @param defaultValue
* a default value to return if the {@link Observable} emits no
* matching items
* @param predicate
* a predicate function to evaluate items emitted by the
* Observable
* @return an Observable containing the single item emitted by the source
* Observable that matches the predicate, or an Observable
* containing the defaultValue if no item matches the predicate
* @throws IllegalArgumentException
* if the source emits more than one item matching the predicate
*/
public Observable<T> singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
return filter(predicate).singleOrDefault(defaultValue);
}

/**
* Returns an Observable that emits only the very first item emitted by the
* source Observable.
* source Observable, or an <code>IllegalArgumentException</code> if the source
* {@link Observable} is empty.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
*
* @return an Observable that emits only the very first item emitted by the
* source Observable, or nothing if the source Observable completes
* without emitting a single item
* @return an Observable that emits only the very first item from the
* source, or an <code>IllegalArgumentException</code> if the source {@link Observable} is empty.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
*/
public Observable<T> first() {
return take(1);
return take(1).single();
}

/**
* Returns an Observable that emits only the very first item emitted by the
* source Observable that satisfies a given condition.
* source Observable that satisfies a given condition, or an <code>IllegalArgumentException</code>
* if no such items are emitted.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/firstN.png">
*
* @param predicate the condition any source emitted item has to satisfy
* @return an Observable that emits only the very first item satisfying the
* given condition from the source, or nothing if the source
* Observable completes without emitting a single matching item
* given condition from the source, or an <code>IllegalArgumentException</code> if no such items are emitted.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
*/
public Observable<T> first(Func1<? super T, Boolean> predicate) {
return skipWhile(not(predicate)).take(1);
return takeFirst(predicate).single();
}

/**
Expand All @@ -5139,7 +5208,7 @@ public Observable<T> first(Func1<? super T, Boolean> predicate) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229320.aspx">MSDN: Observable.FirstOrDefault</a>
*/
public Observable<T> firstOrDefault(T defaultValue) {
return create(OperationFirstOrDefault.firstOrDefault(this, defaultValue));
return take(1).singleOrDefault(defaultValue);
}

/**
Expand All @@ -5157,8 +5226,8 @@ public Observable<T> firstOrDefault(T defaultValue) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#firstordefault">RxJava Wiki: firstOrDefault()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229759.aspx">MSDN: Observable.FirstOrDefault</a>
*/
public Observable<T> firstOrDefault(Func1<? super T, Boolean> predicate, T defaultValue) {
return create(OperationFirstOrDefault.firstOrDefault(this, predicate, defaultValue));
public Observable<T> firstOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
return takeFirst(predicate).singleOrDefault(defaultValue);
}

/**
Expand Down Expand Up @@ -5245,14 +5314,15 @@ public Observable<T> takeWhileWithIndex(final Func2<? super T, ? super Integer,
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/first.png">
*
* @return an Observable that emits only the very first item from the
* source, or none if the source Observable completes without
* source, or an empty Observable if the source Observable completes without
* emitting a single item
* @deprecated Use <code>take(1)</code> directly.
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
* @see #first()
*/
@Deprecated
public Observable<T> takeFirst() {
return first();
return take(1);
}

/**
Expand All @@ -5263,14 +5333,13 @@ public Observable<T> takeFirst() {
*
* @param predicate the condition any source emitted item has to satisfy
* @return an Observable that emits only the very first item satisfying the
* given condition from the source, or none if the source Observable
* given condition from the source, or an empty Observable if the source Observable
* completes without emitting a single matching item
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#first">RxJava Wiki: first()</a>
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229177.aspx">MSDN: Observable.First</a>
* @see #first(Func1)
*/
public Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
return first(predicate);
return filter(predicate).take(1);
}

/**
Expand Down Expand Up @@ -5725,7 +5794,7 @@ public <T2, D1, D2, R> Observable<R> groupJoin(Observable<T2> right, Func1<? sup
public Observable<Boolean> isEmpty() {
return create(OperationAny.isEmpty(this));
}

/**
* Returns an Observable that emits the last item emitted by the source or
* notifies observers of an <code>IllegalArgumentException</code> if the
Expand All @@ -5738,7 +5807,53 @@ public Observable<Boolean> isEmpty() {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observable-Operators#last">RxJava Wiki: last()</a>
*/
public Observable<T> last() {
return create(OperationLast.last(this));
return takeLast(1).single();
}

/**
* Returns an Observable that emits only the last item emitted by the source
* Observable that satisfies a given condition, or an
* IllegalArgumentException if no such items are emitted.
*
* @param predicate
* the condition any source emitted item has to satisfy
* @return an Observable that emits only the last item satisfying the given
* condition from the source, or an IllegalArgumentException if no
* such items are emitted.
* @throws IllegalArgumentException
* if no such itmes are emmited
*/
public Observable<T> last(Func1<? super T, Boolean> predicate) {
return filter(predicate).takeLast(1).single();
}

/**
* Returns an Observable that emits only the last item emitted by the source
* Observable, or a default item if the source is empty.
*
* @param defaultValue
* the default item to emit if the source Observable is empty
* @return an Observable that emits only the last item from the source, or a
* default item if the source is empty
*/
public Observable<T> lastOrDefault(T defaultValue) {
return takeLast(1).singleOrDefault(defaultValue);
}

/**
* Returns an Observable that emits only the last item emitted by the source
* Observable that satisfies a given condition, or a default item otherwise.
*
* @param defaultValue
* the default item to emit if the source Observable doesn't emit
* anything that satisfies the given condition
* @param predicate
* the condition any source emitted item has to satisfy
* @return an Observable that emits only the last item from the source that
* satisfies the given condition, or a default item otherwise
*/
public Observable<T> lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
return filter(predicate).takeLast(1).singleOrDefault(defaultValue);
}

/**
Expand Down
Loading