diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5d98df26b9..cf428ce3de 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -41,6 +41,7 @@ import rx.operators.OperationCast; import rx.operators.OperationCombineLatest; import rx.operators.OperationConcat; +import rx.operators.OperationConditionals; import rx.operators.OperationDebounce; import rx.operators.OperationDefaultIfEmpty; import rx.operators.OperationDefer; @@ -1923,6 +1924,128 @@ public static Observable switchOnNext(Observable the case key type + * @param the result value type + * @param caseSelector the function that produces a case key when an Observer subscribes + * @param mapOfCases a map that maps a case key to an observable sequence + * @return an Observable that subscribes to an observable sequence + * chosen from a map of observables via a selector function or to an + * empty observable + */ + public static Observable switchCase(Func0 caseSelector, + Map> mapOfCases) { + return switchCase(caseSelector, mapOfCases, Observable.empty()); + } + + /** + * Return an Observable that subscribes to an observable sequence + * chosen from a map of observables via a selector function or to an + * empty observable which runs on the given scheduler. + * @param the case key type + * @param the result value type + * @param caseSelector the function that produces a case key when an Observer subscribes + * @param mapOfCases a map that maps a case key to an observable sequence + * @param scheduler the scheduler where the empty observable is observed + * @return an Observable that subscribes to an observable sequence + * chosen from a map of observables via a selector function or to an + * empty observable which runs on the given scheduler + */ + public static Observable switchCase(Func0 caseSelector, + Map> mapOfCases, Scheduler scheduler) { + return switchCase(caseSelector, mapOfCases, Observable.empty(scheduler)); + } + /** + * Return an Observable that subscribes to an observable sequence + * chosen from a map of observables via a selector function or to the + * default observable. + * @param the case key type + * @param the result value type + * @param caseSelector the function that produces a case key when an Observer subscribes + * @param mapOfCases a map that maps a case key to an observable sequence + * @param defaultCase the default observable if the {@code mapOfCases} doesn't contain a value for + * the key returned by the {@case caseSelector} + * @return an Observable that subscribes to an observable sequence + * chosen from a map of observables via a selector function or to an + * empty observable + */ + public static Observable switchCase(Func0 caseSelector, + Map> mapOfCases, + Observable defaultCase) { + return create(OperationConditionals.switchCase(caseSelector, mapOfCases, defaultCase)); + } + + /** + * Return an Observable that subscribes to the this Observable, + * then resubscribes only if the postCondition evaluates to true. + * @param postCondition the post condition after the source completes + * @return an Observable that subscribes to the source Observable, + * then resubscribes only if the postCondition evaluates to true. + */ + public Observable doWhile(Func0 postCondition) { + return create(OperationConditionals.doWhile(this, postCondition)); + } + + /** + * Return an Observable that subscribes and resubscribes to this + * Observable if the preCondition evaluates to true. + * @param preCondition the condition to evaluate before subscribing to this, + * and subscribe to source if it returns {@code true} + * @return an Observable that subscribes and resubscribes to the source + * Observable if the preCondition evaluates to true. + */ + public Observable whileDo(Func0 preCondition) { + return create(OperationConditionals.whileDo(this, preCondition)); + } + + /** + * Return an Observable that subscribes to the + * then Observables if the condition function evaluates to true, or to an empty + * Observable if false. + * @param the result value type + * @param condition the condition to decide which Observables to subscribe to + * @param then the Observable sequence to subscribe to if {@code condition} is {@code true} + * @return an Observable that subscribes to the + * then Observables if the condition function evaluates to true, or to an empty + * Observable running on the given scheduler if false + */ + public static Observable ifThen(Func0 condition, Observable then) { + return ifThen(condition, then, Observable.empty()); + } + + /** + * Return an Observable that subscribes to the + * then Observables if the condition function evaluates to true, or to an empty + * Observable running on the given scheduler if false. + * @param the result value type + * @param condition the condition to decide which Observables to subscribe to + * @param then the Observable sequence to subscribe to if {@code condition} is {@code true} + * @param scheduler the scheduler where the empty Observable is observed in case the condition returns false + * @return an Observable that subscribes to the + * then Observables if the condition function evaluates to true, or to an empty + * Observable running on the given scheduler if false + */ + public static Observable ifThen(Func0 condition, Observable then, Scheduler scheduler) { + return ifThen(condition, then, Observable.empty(scheduler)); + } + /** + * Return an Observable that subscribes to either the + * then or orElse Observables depending on a condition function. + * @param the result value type + * @param condition the condition to decide which Observables to subscribe to + * @param then the Observable sequence to subscribe to if {@code condition} is {@code true} + * @param orElse the Observable sequence to subscribe to if {@code condition} is {@code false} + * @return an Observable that subscribes to either the + * then or orElse Observables depending on a condition function + */ + public static Observable ifThen(Func0 condition, Observable then, + Observable orElse) { + return create(OperationConditionals.ifThen(condition, then, orElse)); + } + /** * Accepts an Observable and wraps it in another Observable that ensures * that the resulting Observable is chronologically well-behaved. diff --git a/rxjava-core/src/main/java/rx/operators/OperationConditionals.java b/rxjava-core/src/main/java/rx/operators/OperationConditionals.java new file mode 100644 index 0000000000..06e14e4328 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationConditionals.java @@ -0,0 +1,242 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.Map; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.SerialSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func0; + +/** + * Implementation of conditional-based operations such as Case, If, DoWhile and While. + */ +public final class OperationConditionals { + /** Utility class. */ + private OperationConditionals() { throw new IllegalStateException("No instances!"); } + /** + * Return a subscription function that subscribes to an observable sequence + * chosen from a map of observables via a selector function or to the + * default observable. + * @param the case key type + * @param the result value type + * @param caseSelector the function that produces a case key when an Observer subscribes + * @param mapOfCases a map that maps a case key to an observable sequence + * @param defaultCase the default observable if the {@code mapOfCases} doesn't contain a value for + * the key returned by the {@case caseSelector} + * @return a subscription function + */ + public static OnSubscribeFunc switchCase( + Func0 caseSelector, + Map> mapOfCases, + Observable defaultCase) { + return new SwitchCase(caseSelector, mapOfCases, defaultCase); + } + /** + * Return a subscription function that subscribes to either the + * then or orElse Observables depending on a condition function. + * @param the result value type + * @param condition the condition to decide which Observables to subscribe to + * @param then the Observable sequence to subscribe to if {@code condition} is {@code true} + * @param orElse the Observable sequence to subscribe to if {@code condition} is {@code false} + * @return a subscription function + */ + public static OnSubscribeFunc ifThen( + Func0 condition, + Observable then, + Observable orElse) { + return new IfThen(condition, then, orElse); + } + /** + * Return a subscription function that subscribes to the source Observable, + * then resubscribes only if the postCondition evaluates to true. + * @param the result value type + * @param source the source Observable + * @param postCondition the post condition after the source completes + * @return a subscription function. + */ + public static OnSubscribeFunc doWhile(Observable source, Func0 postCondition) { + return new WhileDoWhile(source, TRUE, postCondition); + } + /** + * Return a subscription function that subscribes and resubscribes to the source + * Observable if the preCondition evaluates to true. + * @param the result value type + * @param source the source Observable + * @param preCondition the condition to evaluate before subscribing to source, + * and subscribe to source if it returns {@code true} + * @return a subscription function. + */ + public static OnSubscribeFunc whileDo(Observable source, Func0 preCondition) { + return new WhileDoWhile(source, preCondition, preCondition); + } + /** + * Select an observable from a map based on a case key returned by a selector + * function when an observer subscribes. + * @param the case key type + * @param the result value type + */ + private static final class SwitchCase implements OnSubscribeFunc { + final Func0 caseSelector; + final Map> mapOfCases; + final Observable defaultCase; + public SwitchCase(Func0 caseSelector, + Map> mapOfCases, + Observable defaultCase) { + this.caseSelector = caseSelector; + this.mapOfCases = mapOfCases; + this.defaultCase = defaultCase; + } + + @Override + public Subscription onSubscribe(Observer t1) { + Observable target; + try { + K caseKey = caseSelector.call(); + if (mapOfCases.containsKey(caseKey)) { + target = mapOfCases.get(caseKey); + } else { + target = defaultCase; + } + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + return target.subscribe(t1); + } + } + /** Returns always true. */ + private static final class Func0True implements Func0 { + @Override + public Boolean call() { + return true; + } + } + /** Returns always true function. */ + private static final Func0True TRUE = new Func0True(); + /** + * Given a condition, subscribe to one of the observables when an Observer + * subscribes. + * @param the result value type + */ + private static final class IfThen implements OnSubscribeFunc { + final Func0 condition; + final Observable then; + final Observable orElse; + public IfThen(Func0 condition, Observable then, Observable orElse) { + this.condition = condition; + this.then = then; + this.orElse = orElse; + } + @Override + public Subscription onSubscribe(Observer t1) { + Observable target; + try { + if (condition.call()) { + target = then; + } else { + target = orElse; + } + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + return target.subscribe(t1); + } + } + /** + * Repeatedly subscribes to the source observable if the pre- or + * postcondition is true. + *

+ * This combines the While and DoWhile into a single operation through + * the conditions. + * @param the result value type + */ + private static final class WhileDoWhile implements OnSubscribeFunc { + final Func0 preCondition; + final Func0 postCondition; + final Observable source; + public WhileDoWhile(Observable source, + Func0 preCondition, Func0 postCondition + ) { + this.source = source; + this.preCondition = preCondition; + this.postCondition = postCondition; + } + + @Override + public Subscription onSubscribe(Observer t1) { + boolean first; + try { + first = preCondition.call(); + } catch (Throwable t) { + t1.onError(t); + return Subscriptions.empty(); + } + if (first) { + SerialSubscription ssub = new SerialSubscription(); + + ssub.setSubscription(source.subscribe(new SourceObserver(t1, ssub))); + + return ssub; + } else { + t1.onCompleted(); + } + return Subscriptions.empty(); + } + /** Observe the source. */ + final class SourceObserver implements Observer { + final SerialSubscription cancel; + final Observer observer; + public SourceObserver(Observer observer, SerialSubscription cancel) { + this.observer = observer; + this.cancel = cancel; + } + + @Override + public void onNext(T args) { + observer.onNext(args); + } + + @Override + public void onError(Throwable e) { + observer.onError(e); + cancel.unsubscribe(); + } + + @Override + public void onCompleted() { + boolean next; + try { + next = postCondition.call(); + } catch (Throwable t) { + observer.onError(t); + return; + } + if (next) { + cancel.setSubscription(source.subscribe(this)); + } else { + observer.onCompleted(); + cancel.unsubscribe(); + } + } + + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationConditionalsTest.java b/rxjava-core/src/test/java/rx/operators/OperationConditionalsTest.java new file mode 100644 index 0000000000..3daec59297 --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationConditionalsTest.java @@ -0,0 +1,453 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mock; +import static org.mockito.Mockito.*; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; +import rx.util.functions.Func0; + +public class OperationConditionalsTest { + @Mock + Observer observer; + TestScheduler scheduler; + Func0 func; + Func0 funcError; + Func0 condition; + Func0 conditionError; + int numRecursion = 250; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + scheduler = new TestScheduler(); + func = new Func0() { + int count = 1; + @Override + public Integer call() { + return count++; + } + }; + funcError = new Func0() { + int count = 1; + @Override + public Integer call() { + if (count == 2) { + throw new RuntimeException("Forced failure!"); + } + return count++; + } + }; + condition = new Func0() { + boolean r; + @Override + public Boolean call() { + r = !r; + return r; + } + + }; + conditionError = new Func0() { + boolean r; + @Override + public Boolean call() { + r = !r; + if (!r) { + throw new RuntimeException("Forced failure!"); + } + return r; + } + + }; + } + Func0 just(final T value) { + return new Func0() { + @Override + public T call() { + return value; + } + }; + } + + @SuppressWarnings("unchecked") + void observe(Observable source, T... values) { + Observer o = mock(Observer.class); + + Subscription s = source.subscribe(o); + + InOrder inOrder = inOrder(o); + + for (T v : values) { + inOrder.verify(o, times(1)).onNext(v); + } + inOrder.verify(o, times(1)).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + s.unsubscribe(); + + inOrder.verifyNoMoreInteractions(); + } + + @SuppressWarnings("unchecked") + void observeSequence(Observable source, Iterable values) { + Observer o = mock(Observer.class); + + Subscription s = source.subscribe(o); + + InOrder inOrder = inOrder(o); + + for (T v : values) { + inOrder.verify(o, times(1)).onNext(v); + } + inOrder.verify(o, times(1)).onCompleted(); + verify(o, never()).onError(any(Throwable.class)); + + s.unsubscribe(); + + inOrder.verifyNoMoreInteractions(); + } + + @SuppressWarnings("unchecked") + void observeError(Observable source, Class error, T... valuesBeforeError) { + Observer o = mock(Observer.class); + + Subscription s = source.subscribe(o); + + InOrder inOrder = inOrder(o); + + for (T v : valuesBeforeError) { + inOrder.verify(o, times(1)).onNext(v); + } + inOrder.verify(o, times(1)).onError(any(error)); + verify(o, never()).onCompleted(); + + s.unsubscribe(); + + inOrder.verifyNoMoreInteractions(); + } + + @SuppressWarnings("unchecked") + void observeSequenceError(Observable source, Class error, Iterable valuesBeforeError) { + Observer o = mock(Observer.class); + + Subscription s = source.subscribe(o); + + InOrder inOrder = inOrder(o); + + for (T v : valuesBeforeError) { + inOrder.verify(o, times(1)).onNext(v); + } + inOrder.verify(o, times(1)).onError(any(error)); + verify(o, never()).onCompleted(); + + s.unsubscribe(); + + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testSimple() { + Observable source1 = Observable.from(1, 2, 3); + Observable source2 = Observable.from(4, 5, 6); + + Map> map = new HashMap>(); + map.put(1, source1); + map.put(2, source2); + + Observable result = Observable.switchCase(func, map); + + observe(result, 1, 2, 3); + observe(result, 4, 5, 6); + observe(result); + } + @Test + public void testDefaultCase() { + Observable source1 = Observable.from(1, 2, 3); + Observable source2 = Observable.from(4, 5, 6); + + Map> map = new HashMap>(); + map.put(1, source1); + + Observable result = Observable.switchCase(func, map, source2); + + observe(result, 1, 2, 3); + observe(result, 4, 5, 6); + observe(result, 4, 5, 6); + } + @Test + public void testCaseSelectorThrows() { + Observable source1 = Observable.from(1, 2, 3); + + Map> map = new HashMap>(); + map.put(1, source1); + + Observable result = Observable.switchCase(funcError, map); + + observe(result, 1, 2, 3); + observeError(result, RuntimeException.class); + } + @Test + public void testMapGetThrows() { + Observable source1 = Observable.from(1, 2, 3); + Observable source2 = Observable.from(4, 5, 6); + + Map> map = new HashMap>() { + + @Override + public Observable get(Object key) { + if (key.equals(2)) { + throw new RuntimeException("Forced failure!"); + } + return super.get(key); + } + + }; + map.put(1, source1); + map.put(2, source2); + + Observable result = Observable.switchCase(func, map); + + observe(result, 1, 2, 3); + observeError(result, RuntimeException.class); + } + @Test + public void testMapContainsKeyThrows() { + Observable source1 = Observable.from(1, 2, 3); + + Map> map = new HashMap>() { + + @Override + public boolean containsKey(Object key) { + if (key.equals(2)) { + throw new RuntimeException("Forced failure!"); + } + return super.containsKey(key); + } + + }; + map.put(1, source1); + + Observable result = Observable.switchCase(func, map); + + observe(result, 1, 2, 3); + observeError(result, RuntimeException.class); + } + @Test + public void testChosenObservableThrows() { + Observable source1 = Observable.from(1, 2, 3); + Observable source2 = Observable.error(new RuntimeException("Forced failure")); + + Map> map = new HashMap>(); + map.put(1, source1); + map.put(2, source2); + + Observable result = Observable.switchCase(func, map); + + observe(result, 1, 2, 3); + observeError(result, RuntimeException.class); + } + @Test + public void testIfThen() { + Observable source1 = Observable.from(1, 2, 3); + + Observable result = Observable.ifThen(condition, source1); + + observe(result, 1, 2, 3); + observe(result); + observe(result, 1, 2, 3); + observe(result); + } + + @Test + public void testIfThenElse() { + Observable source1 = Observable.from(1, 2, 3); + Observable source2 = Observable.from(4, 5, 6); + + Observable result = Observable.ifThen(condition, source1, source2); + + observe(result, 1, 2, 3); + observe(result, 4, 5, 6); + observe(result, 1, 2, 3); + observe(result, 4, 5, 6); + } + + @Test + public void testIfThenConditonThrows() { + Observable source1 = Observable.from(1, 2, 3); + + Observable result = Observable.ifThen(conditionError, source1); + + observe(result, 1, 2, 3); + observeError(result, RuntimeException.class); + observe(result, 1, 2, 3); + observeError(result, RuntimeException.class); + } + + @Test + public void testIfThenObservableThrows() { + Observable source1 = Observable.error(new RuntimeException("Forced failure!")); + + Observable result = Observable.ifThen(condition, source1); + + observeError(result, RuntimeException.class); + observe(result); + + observeError(result, RuntimeException.class); + observe(result); + } + @Test + public void testIfThenElseObservableThrows() { + Observable source1 = Observable.from(1, 2, 3); + Observable source2 = Observable.error(new RuntimeException("Forced failure!")); + + Observable result = Observable.ifThen(condition, source1, source2); + + observe(result, 1, 2, 3); + observeError(result, RuntimeException.class); + observe(result, 1, 2, 3); + observeError(result, RuntimeException.class); + } + + @Test + public void testDoWhile() { + Observable source1 = Observable.from(1, 2, 3); + + Observable result = source1.doWhile(condition); + + observe(result, 1, 2, 3, 1, 2, 3); + } + @Test + public void testDoWhileOnce() { + Observable source1 = Observable.from(1, 2, 3); + + condition.call(); // toggle to false + Observable result = source1.doWhile(condition); + + observe(result, 1, 2, 3); + } + @Test + public void testDoWhileConditionThrows() { + Observable source1 = Observable.from(1, 2, 3); + Observable result = source1.doWhile(conditionError); + + observeError(result, RuntimeException.class, 1, 2, 3); + } + @Test + public void testDoWhileSourceThrows() { + Observable source1 = Observable.concat(Observable.from(1, 2, 3), + Observable.error(new RuntimeException("Forced failure!"))); + + Observable result = source1.doWhile(condition); + + observeError(result, RuntimeException.class, 1, 2, 3); + } + Func0 countdown(final int n) { + return new Func0() { + int count = n; + @Override + public Boolean call() { + return count-- > 0; + } + }; + } + @Test + public void testDoWhileManyTimes() { + Observable source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread()); + + List expected = new ArrayList(numRecursion * 3); + for (int i = 0; i < numRecursion; i++) { + expected.add(1); + expected.add(2); + expected.add(3); + } + + Observable result = source1.doWhile(countdown(numRecursion)); + + observeSequence(result, expected); + } + @Test + public void testWhileDo() { + Observable source1 = Observable.from(1, 2, 3); + Observable result = source1.whileDo(countdown(2)); + + observe(result, 1, 2, 3, 1, 2, 3); + } + @Test + public void testWhileDoOnce() { + Observable source1 = Observable.from(1, 2, 3); + Observable result = source1.whileDo(countdown(1)); + + observe(result, 1, 2, 3); + } + @Test + public void testWhileDoZeroTimes() { + Observable source1 = Observable.from(1, 2, 3); + Observable result = source1.whileDo(countdown(0)); + + observe(result); + } + @Test + public void testWhileDoManyTimes() { + Observable source1 = Observable.from(1, 2, 3).subscribeOn(Schedulers.currentThread()); + + List expected = new ArrayList(numRecursion * 3); + for (int i = 0; i < numRecursion; i++) { + expected.add(1); + expected.add(2); + expected.add(3); + } + + Observable result = source1.whileDo(countdown(numRecursion)); + + observeSequence(result, expected); + } + @Test + public void testWhileDoConditionThrows() { + Observable source1 = Observable.from(1, 2, 3); + Observable result = source1.whileDo(conditionError); + + observeError(result, RuntimeException.class, 1, 2, 3); + } + @Test + public void testWhileDoConditionThrowsImmediately() { + Observable source1 = Observable.from(1, 2, 3); + conditionError.call(); + Observable result = source1.whileDo(conditionError); + + observeError(result, RuntimeException.class); + } + @Test + public void testWhileDoSourceThrows() { + Observable source1 = Observable.concat(Observable.from(1, 2, 3), + Observable.error(new RuntimeException("Forced failure!"))); + + Observable result = source1.whileDo(condition); + + observeError(result, RuntimeException.class, 1, 2, 3); + } +}