Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reimplement the ElementAt operator and add it to rxjava-scala #966

Merged
merged 3 commits into from
Mar 25, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -564,4 +564,13 @@ class RxScalaDemo extends JUnitSuite {
println(result)
}

@Test def elementAtExample(): Unit = {
val o = List("red", "green", "blue").toObservable
println(o(2).toBlockingObservable.single)
}

@Test def elementAtOrDefaultExample(): Unit = {
val o : Observable[Seq[Char]] = List("red".toList, "green".toList, "blue".toList).toObservable.elementAtOrDefault(3, "black".toSeq)
println(o.toBlockingObservable.single)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2305,6 +2305,65 @@ trait Observable[+T]
def delaySubscription(delay: Duration, scheduler: Scheduler): Observable[T] = {
toScalaObservable[T](asJavaObservable.delaySubscription(delay.length, delay.unit, scheduler))
}

/**
* Returns an Observable that emits the single item at a specified index in a sequence of emissions from a
* source Observbable.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/elementAt.png">
*
* @param index
* the zero-based index of the item to retrieve
* @return an Observable that emits a single item: the item at the specified position in the sequence of
* those emitted by the source Observable
* @throws IndexOutOfBoundsException
* if index is greater than or equal to the number of items emitted by the source
* Observable
* @throws IndexOutOfBoundsException
* if index is less than 0
* @see `Observable.elementAt`
*/
def apply(index: Int): Observable[T] = elementAt(index)

/**
* Returns an Observable that emits the single item at a specified index in a sequence of emissions from a
* source Observbable.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/elementAt.png">
*
* @param index
* the zero-based index of the item to retrieve
* @return an Observable that emits a single item: the item at the specified position in the sequence of
* those emitted by the source Observable
* @throws IndexOutOfBoundsException
* if index is greater than or equal to the number of items emitted by the source
* Observable
* @throws IndexOutOfBoundsException
* if index is less than 0
*/
def elementAt(index: Int): Observable[T] = {
toScalaObservable[T](asJavaObservable.elementAt(index))
}

/**
* Returns an Observable that emits the item found at a specified index in a sequence of emissions from a
* source Observable, or a default item if that index is out of range.
*
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/elementAtOrDefault.png">
*
* @param index
* the zero-based index of the item to retrieve
* @param defaultValue
* the default item
* @return an Observable that emits the item at the specified position in the sequence emitted by the source
* Observable, or the default item if that index is outside the bounds of the source sequence
* @throws IndexOutOfBoundsException
* if {@code index} is less than 0
*/
def elementAtOrDefault[U >: T](index: Int, default: U): Observable[U] = {
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[U]]
toScalaObservable[U](thisJava.elementAtOrDefault(index, default))
}
}

/**
Expand Down
6 changes: 3 additions & 3 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
import rx.operators.OperationDematerialize;
import rx.operators.OperationDistinct;
import rx.operators.OperationDistinctUntilChanged;
import rx.operators.OperationElementAt;
import rx.operators.OperatorElementAt;
import rx.operators.OperationFinally;
import rx.operators.OperationFlatMap;
import rx.operators.OperationGroupByUntil;
Expand Down Expand Up @@ -4466,7 +4466,7 @@ public final void onNext(T args) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-elementat">RxJava Wiki: elementAt()</a>
*/
public final Observable<T> elementAt(int index) {
return create(OperationElementAt.elementAt(this, index));
return lift(new OperatorElementAt<T>(index));
}

/**
Expand All @@ -4486,7 +4486,7 @@ public final Observable<T> elementAt(int index) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Filtering-Observables#wiki-elementatordefault">RxJava Wiki: elementAtOrDefault()</a>
*/
public final Observable<T> elementAtOrDefault(int index, T defaultValue) {
return create(OperationElementAt.elementAtOrDefault(this, index, defaultValue));
return lift(new OperatorElementAt<T>(index, defaultValue));
}

/**
Expand Down
137 changes: 0 additions & 137 deletions rxjava-core/src/main/java/rx/operators/OperationElementAt.java

This file was deleted.

82 changes: 82 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorElementAt.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Subscriber;

/**
* Returns the element at a specified index in a sequence.
*/
public final class OperatorElementAt<T> implements Operator<T, T> {

private final int index;
private final boolean hasDefault;
private final T defaultValue;

public OperatorElementAt(int index) {
this(index, null, false);
}

public OperatorElementAt(int index, T defaultValue) {
this(index, defaultValue, true);
}

private OperatorElementAt(int index, T defaultValue, boolean hasDefault) {
if (index < 0) {
throw new IndexOutOfBoundsException(index + " is out of bounds");
}
this.index = index;
this.defaultValue = defaultValue;
this.hasDefault = hasDefault;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {

private int currentIndex = 0;

@Override
public void onNext(T value) {
if (currentIndex == index) {
subscriber.onNext(value);
subscriber.onCompleted();
}
currentIndex++;
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onCompleted() {
if (currentIndex <= index) {
// If "subscriber.onNext(value)" is called, currentIndex must be greater than index
if (hasDefault) {
subscriber.onNext(defaultValue);
subscriber.onCompleted();
} else {
subscriber.onError(new IndexOutOfBoundsException(index + " is out of bounds"));
}
}
}
};
}

}
Loading