From c43229b6793a698c03293cd35b7c8516029ef02b Mon Sep 17 00:00:00 2001 From: Boris Maslakov Date: Mon, 4 Sep 2017 15:28:23 +0300 Subject: [PATCH] implement Maybe.switchIfEmpty(Single) (#5582) * implement Maybe.switchIfEmpty(Single) * switchIfEmpty(Single) returns single; remove all changes unrelated to the PR * add 'experimental' annotation --- src/main/java/io/reactivex/Maybe.java | 25 ++++ .../maybe/MaybeSwitchIfEmptySingle.java | 124 ++++++++++++++++++ .../maybe/MaybeSwitchIfEmptySingleTest.java | 108 +++++++++++++++ 3 files changed, 257 insertions(+) create mode 100644 src/main/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingle.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingleTest.java diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 9e468e9d6d..daff77cff4 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -3820,6 +3820,31 @@ public final Maybe switchIfEmpty(MaybeSource other) { return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmpty(this, other)); } + /** + * Returns a Single that emits the items emitted by the source Maybe or the item of an alternate + * SingleSource if the current Maybe is empty. + *

+ * + *

+ *

+ *
Scheduler:
+ *
{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param other + * the alternate SingleSource to subscribe to if the main does not emit any items + * @return a Single that emits the items emitted by the source Maybe or the item of an + * alternate SingleSource if the source Maybe is empty. + * @since 2.1.4 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Single switchIfEmpty(SingleSource other) { + ObjectHelper.requireNonNull(other, "other is null"); + return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmptySingle(this, other)); + } + /** * Returns a Maybe that emits the items emitted by the source Maybe until a second MaybeSource * emits an item. diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingle.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingle.java new file mode 100644 index 0000000000..89615edd03 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingle.java @@ -0,0 +1,124 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.fuseable.HasUpstreamMaybeSource; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Subscribes to the other source if the main source is empty. + * + * @param the value type + */ +public final class MaybeSwitchIfEmptySingle extends Single implements HasUpstreamMaybeSource { + + final MaybeSource source; + final SingleSource other; + + public MaybeSwitchIfEmptySingle(MaybeSource source, SingleSource other) { + this.source = source; + this.other = other; + } + + @Override + public MaybeSource source() { + return source; + } + + @Override + protected void subscribeActual(SingleObserver observer) { + source.subscribe(new SwitchIfEmptyMaybeObserver(observer, other)); + } + + static final class SwitchIfEmptyMaybeObserver + extends AtomicReference + implements MaybeObserver, Disposable { + + private static final long serialVersionUID = 4603919676453758899L; + + final SingleObserver actual; + + final SingleSource other; + + SwitchIfEmptyMaybeObserver(SingleObserver actual, SingleSource other) { + this.actual = actual; + this.other = other; + } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.setOnce(this, d)) { + actual.onSubscribe(this); + } + } + + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + + @Override + public void onError(Throwable e) { + actual.onError(e); + } + + @Override + public void onComplete() { + Disposable d = get(); + if (d != DisposableHelper.DISPOSED) { + if (compareAndSet(d, null)) { + other.subscribe(new OtherSingleObserver(actual, this)); + } + } + } + + static final class OtherSingleObserver implements SingleObserver { + + final SingleObserver actual; + + final AtomicReference parent; + OtherSingleObserver(SingleObserver actual, AtomicReference parent) { + this.actual = actual; + this.parent = parent; + } + @Override + public void onSubscribe(Disposable d) { + DisposableHelper.setOnce(parent, d); + } + @Override + public void onSuccess(T value) { + actual.onSuccess(value); + } + @Override + public void onError(Throwable e) { + actual.onError(e); + } + } + + } +} \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingleTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingleTest.java new file mode 100644 index 0000000000..d21eccb211 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeSwitchIfEmptySingleTest.java @@ -0,0 +1,108 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.Maybe; +import io.reactivex.Single; +import io.reactivex.TestHelper; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.observers.TestObserver; +import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class MaybeSwitchIfEmptySingleTest { + + @Test + public void nonEmpty() { + Maybe.just(1).switchIfEmpty(Single.just(2)).test().assertResult(1); + } + + @Test + public void empty() { + Maybe.empty().switchIfEmpty(Single.just(2)).test().assertResult(2); + } + + @Test + public void error() { + Maybe.error(new TestException()).switchIfEmpty(Single.just(2)) + .test().assertFailure(TestException.class); + } + + @Test + public void errorOther() { + Maybe.empty().switchIfEmpty(Single.error(new TestException())) + .test().assertFailure(TestException.class); + } + + @Test + public void dispose() { + PublishProcessor pp = PublishProcessor.create(); + + TestObserver ts = pp.singleElement().switchIfEmpty(Single.just(2)).test(); + + assertTrue(pp.hasSubscribers()); + + ts.cancel(); + + assertFalse(pp.hasSubscribers()); + } + + + @Test + public void isDisposed() { + PublishProcessor pp = PublishProcessor.create(); + + TestHelper.checkDisposed(pp.singleElement().switchIfEmpty(Single.just(2))); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToSingle(new Function, Single>() { + @Override + public Single apply(Maybe f) throws Exception { + return f.switchIfEmpty(Single.just(2)); + } + }); + } + + @Test + public void emptyCancelRace() { + for (int i = 0; i < 500; i++) { + final PublishProcessor pp = PublishProcessor.create(); + + final TestObserver ts = pp.singleElement().switchIfEmpty(Single.just(2)).test(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2, Schedulers.single()); + } + } +}