Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Operators: switchCase (Case), ifThen (If), doWhile (DoWhile), WhileDo (W... #635

Merged
merged 1 commit into from
Dec 23, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 123 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import rx.operators.OperationCast;
import rx.operators.OperationCombineLatest;
import rx.operators.OperationConcat;
import rx.operators.OperationConditionals;
import rx.operators.OperationDebounce;
import rx.operators.OperationDefaultIfEmpty;
import rx.operators.OperationDefer;
Expand Down Expand Up @@ -1923,6 +1924,128 @@ public static <T> Observable<T> switchOnNext(Observable<? extends Observable<? e
return create(OperationSwitch.switchDo(sequenceOfSequences));
}

/**
* Return an Observable that subscribes to an observable sequence
* chosen from a map of observables via a selector function or to an
* empty observable.
* @param <K> the case key type
* @param <R> the result value type
* @param caseSelector the function that produces a case key when an Observer subscribes
* @param mapOfCases a map that maps a case key to an observable sequence
* @return an Observable that subscribes to an observable sequence
* chosen from a map of observables via a selector function or to an
* empty observable
*/
public static <K, R> Observable<R> switchCase(Func0<? extends K> caseSelector,
Map<? super K, ? extends Observable<? extends R>> mapOfCases) {
return switchCase(caseSelector, mapOfCases, Observable.<R>empty());
}

/**
* Return an Observable that subscribes to an observable sequence
* chosen from a map of observables via a selector function or to an
* empty observable which runs on the given scheduler.
* @param <K> the case key type
* @param <R> the result value type
* @param caseSelector the function that produces a case key when an Observer subscribes
* @param mapOfCases a map that maps a case key to an observable sequence
* @param scheduler the scheduler where the empty observable is observed
* @return an Observable that subscribes to an observable sequence
* chosen from a map of observables via a selector function or to an
* empty observable which runs on the given scheduler
*/
public static <K, R> Observable<R> switchCase(Func0<? extends K> caseSelector,
Map<? super K, ? extends Observable<? extends R>> mapOfCases, Scheduler scheduler) {
return switchCase(caseSelector, mapOfCases, Observable.<R>empty(scheduler));
}
/**
* Return an Observable that subscribes to an observable sequence
* chosen from a map of observables via a selector function or to the
* default observable.
* @param <K> the case key type
* @param <R> the result value type
* @param caseSelector the function that produces a case key when an Observer subscribes
* @param mapOfCases a map that maps a case key to an observable sequence
* @param defaultCase the default observable if the {@code mapOfCases} doesn't contain a value for
* the key returned by the {@case caseSelector}
* @return an Observable that subscribes to an observable sequence
* chosen from a map of observables via a selector function or to an
* empty observable
*/
public static <K, R> Observable<R> switchCase(Func0<? extends K> caseSelector,
Map<? super K, ? extends Observable<? extends R>> mapOfCases,
Observable<? extends R> defaultCase) {
return create(OperationConditionals.switchCase(caseSelector, mapOfCases, defaultCase));
}

/**
* Return an Observable that subscribes to the this Observable,
* then resubscribes only if the postCondition evaluates to true.
* @param postCondition the post condition after the source completes
* @return an Observable that subscribes to the source Observable,
* then resubscribes only if the postCondition evaluates to true.
*/
public Observable<T> doWhile(Func0<Boolean> postCondition) {
return create(OperationConditionals.doWhile(this, postCondition));
}

/**
* Return an Observable that subscribes and resubscribes to this
* Observable if the preCondition evaluates to true.
* @param preCondition the condition to evaluate before subscribing to this,
* and subscribe to source if it returns {@code true}
* @return an Observable that subscribes and resubscribes to the source
* Observable if the preCondition evaluates to true.
*/
public Observable<T> whileDo(Func0<Boolean> preCondition) {
return create(OperationConditionals.whileDo(this, preCondition));
}

/**
* Return an Observable that subscribes to the
* then Observables if the condition function evaluates to true, or to an empty
* Observable if false.
* @param <R> the result value type
* @param condition the condition to decide which Observables to subscribe to
* @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
* @return an Observable that subscribes to the
* then Observables if the condition function evaluates to true, or to an empty
* Observable running on the given scheduler if false
*/
public static <R> Observable<R> ifThen(Func0<Boolean> condition, Observable<? extends R> then) {
return ifThen(condition, then, Observable.<R>empty());
}

/**
* Return an Observable that subscribes to the
* then Observables if the condition function evaluates to true, or to an empty
* Observable running on the given scheduler if false.
* @param <R> the result value type
* @param condition the condition to decide which Observables to subscribe to
* @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
* @param scheduler the scheduler where the empty Observable is observed in case the condition returns false
* @return an Observable that subscribes to the
* then Observables if the condition function evaluates to true, or to an empty
* Observable running on the given scheduler if false
*/
public static <R> Observable<R> ifThen(Func0<Boolean> condition, Observable<? extends R> then, Scheduler scheduler) {
return ifThen(condition, then, Observable.<R>empty(scheduler));
}
/**
* Return an Observable that subscribes to either the
* then or orElse Observables depending on a condition function.
* @param <R> the result value type
* @param condition the condition to decide which Observables to subscribe to
* @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
* @param orElse the Observable sequence to subscribe to if {@code condition} is {@code false}
* @return an Observable that subscribes to either the
* then or orElse Observables depending on a condition function
*/
public static <R> Observable<R> ifThen(Func0<Boolean> condition, Observable<? extends R> then,
Observable<? extends R> orElse) {
return create(OperationConditionals.ifThen(condition, then, orElse));
}

/**
* Accepts an Observable and wraps it in another Observable that ensures
* that the resulting Observable is chronologically well-behaved.
Expand Down
242 changes: 242 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationConditionals.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.Map;
import rx.Observable;
import rx.Observable.OnSubscribeFunc;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.SerialSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func0;

/**
* Implementation of conditional-based operations such as Case, If, DoWhile and While.
*/
public final class OperationConditionals {
/** Utility class. */
private OperationConditionals() { throw new IllegalStateException("No instances!"); }
/**
* Return a subscription function that subscribes to an observable sequence
* chosen from a map of observables via a selector function or to the
* default observable.
* @param <K> the case key type
* @param <R> the result value type
* @param caseSelector the function that produces a case key when an Observer subscribes
* @param mapOfCases a map that maps a case key to an observable sequence
* @param defaultCase the default observable if the {@code mapOfCases} doesn't contain a value for
* the key returned by the {@case caseSelector}
* @return a subscription function
*/
public static <K, R> OnSubscribeFunc<R> switchCase(
Func0<? extends K> caseSelector,
Map<? super K, ? extends Observable<? extends R>> mapOfCases,
Observable<? extends R> defaultCase) {
return new SwitchCase<K, R>(caseSelector, mapOfCases, defaultCase);
}
/**
* Return a subscription function that subscribes to either the
* then or orElse Observables depending on a condition function.
* @param <R> the result value type
* @param condition the condition to decide which Observables to subscribe to
* @param then the Observable sequence to subscribe to if {@code condition} is {@code true}
* @param orElse the Observable sequence to subscribe to if {@code condition} is {@code false}
* @return a subscription function
*/
public static <R> OnSubscribeFunc<R> ifThen(
Func0<Boolean> condition,
Observable<? extends R> then,
Observable<? extends R> orElse) {
return new IfThen<R>(condition, then, orElse);
}
/**
* Return a subscription function that subscribes to the source Observable,
* then resubscribes only if the postCondition evaluates to true.
* @param <T> the result value type
* @param source the source Observable
* @param postCondition the post condition after the source completes
* @return a subscription function.
*/
public static <T> OnSubscribeFunc<T> doWhile(Observable<? extends T> source, Func0<Boolean> postCondition) {
return new WhileDoWhile<T>(source, TRUE, postCondition);
}
/**
* Return a subscription function that subscribes and resubscribes to the source
* Observable if the preCondition evaluates to true.
* @param <T> the result value type
* @param source the source Observable
* @param preCondition the condition to evaluate before subscribing to source,
* and subscribe to source if it returns {@code true}
* @return a subscription function.
*/
public static <T> OnSubscribeFunc<T> whileDo(Observable<? extends T> source, Func0<Boolean> preCondition) {
return new WhileDoWhile<T>(source, preCondition, preCondition);
}
/**
* Select an observable from a map based on a case key returned by a selector
* function when an observer subscribes.
* @param <K> the case key type
* @param <R> the result value type
*/
private static final class SwitchCase<K, R> implements OnSubscribeFunc<R> {
final Func0<? extends K> caseSelector;
final Map<? super K, ? extends Observable<? extends R>> mapOfCases;
final Observable<? extends R> defaultCase;
public SwitchCase(Func0<? extends K> caseSelector,
Map<? super K, ? extends Observable<? extends R>> mapOfCases,
Observable<? extends R> defaultCase) {
this.caseSelector = caseSelector;
this.mapOfCases = mapOfCases;
this.defaultCase = defaultCase;
}

@Override
public Subscription onSubscribe(Observer<? super R> t1) {
Observable<? extends R> target;
try {
K caseKey = caseSelector.call();
if (mapOfCases.containsKey(caseKey)) {
target = mapOfCases.get(caseKey);
} else {
target = defaultCase;
}
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
}
return target.subscribe(t1);
}
}
/** Returns always true. */
private static final class Func0True implements Func0<Boolean> {
@Override
public Boolean call() {
return true;
}
}
/** Returns always true function. */
private static final Func0True TRUE = new Func0True();
/**
* Given a condition, subscribe to one of the observables when an Observer
* subscribes.
* @param <R> the result value type
*/
private static final class IfThen<R> implements OnSubscribeFunc<R> {
final Func0<Boolean> condition;
final Observable<? extends R> then;
final Observable<? extends R> orElse;
public IfThen(Func0<Boolean> condition, Observable<? extends R> then, Observable<? extends R> orElse) {
this.condition = condition;
this.then = then;
this.orElse = orElse;
}
@Override
public Subscription onSubscribe(Observer<? super R> t1) {
Observable<? extends R> target;
try {
if (condition.call()) {
target = then;
} else {
target = orElse;
}
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
}
return target.subscribe(t1);
}
}
/**
* Repeatedly subscribes to the source observable if the pre- or
* postcondition is true.
* <p>
* This combines the While and DoWhile into a single operation through
* the conditions.
* @param <T> the result value type
*/
private static final class WhileDoWhile<T> implements OnSubscribeFunc<T> {
final Func0<Boolean> preCondition;
final Func0<Boolean> postCondition;
final Observable<? extends T> source;
public WhileDoWhile(Observable<? extends T> source,
Func0<Boolean> preCondition, Func0<Boolean> postCondition
) {
this.source = source;
this.preCondition = preCondition;
this.postCondition = postCondition;
}

@Override
public Subscription onSubscribe(Observer<? super T> t1) {
boolean first;
try {
first = preCondition.call();
} catch (Throwable t) {
t1.onError(t);
return Subscriptions.empty();
}
if (first) {
SerialSubscription ssub = new SerialSubscription();

ssub.setSubscription(source.subscribe(new SourceObserver(t1, ssub)));

return ssub;
} else {
t1.onCompleted();
}
return Subscriptions.empty();
}
/** Observe the source. */
final class SourceObserver implements Observer<T> {
final SerialSubscription cancel;
final Observer<? super T> observer;
public SourceObserver(Observer<? super T> observer, SerialSubscription cancel) {
this.observer = observer;
this.cancel = cancel;
}

@Override
public void onNext(T args) {
observer.onNext(args);
}

@Override
public void onError(Throwable e) {
observer.onError(e);
cancel.unsubscribe();
}

@Override
public void onCompleted() {
boolean next;
try {
next = postCondition.call();
} catch (Throwable t) {
observer.onError(t);
return;
}
if (next) {
cancel.setSubscription(source.subscribe(this));
} else {
observer.onCompleted();
cancel.unsubscribe();
}
}

}
}
}
Loading