Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement Maybe.switchIfEmpty(Single) #5582

Merged
merged 3 commits into from
Sep 4, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3820,6 +3820,31 @@ public final Maybe<T> switchIfEmpty(MaybeSource<? extends T> other) {
return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmpty<T>(this, other));
}

/**
* Returns a Single that emits the items emitted by the source Maybe or the item of an alternate
* SingleSource if the current Maybe is empty.
* <p>
* <img width="640" height="445" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/switchifempty.m.png" alt="">
* <p/>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code switchIfEmpty} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param other
* the alternate SingleSource to subscribe to if the main does not emit any items
* @return a Single that emits the items emitted by the source Maybe or the item of an
* alternate SingleSource if the source Maybe is empty.
* @since 2.1.4 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Single<T> switchIfEmpty(SingleSource<? extends T> other) {
ObjectHelper.requireNonNull(other, "other is null");
return RxJavaPlugins.onAssembly(new MaybeSwitchIfEmptySingle<T>(this, other));
}

/**
* Returns a Maybe that emits the items emitted by the source Maybe until a second MaybeSource
* emits an item.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.fuseable.HasUpstreamMaybeSource;

import java.util.concurrent.atomic.AtomicReference;

/**
* Subscribes to the other source if the main source is empty.
*
* @param <T> the value type
*/
public final class MaybeSwitchIfEmptySingle<T> extends Single<T> implements HasUpstreamMaybeSource<T> {

final MaybeSource<T> source;
final SingleSource<? extends T> other;

public MaybeSwitchIfEmptySingle(MaybeSource<T> source, SingleSource<? extends T> other) {
this.source = source;
this.other = other;
}

@Override
public MaybeSource<T> source() {
return source;
}

@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
source.subscribe(new SwitchIfEmptyMaybeObserver<T>(observer, other));
}

static final class SwitchIfEmptyMaybeObserver<T>
extends AtomicReference<Disposable>
implements MaybeObserver<T>, Disposable {

private static final long serialVersionUID = 4603919676453758899L;

final SingleObserver<? super T> actual;

final SingleSource<? extends T> other;

SwitchIfEmptyMaybeObserver(SingleObserver<? super T> actual, SingleSource<? extends T> other) {
this.actual = actual;
this.other = other;
}

@Override
public void dispose() {
DisposableHelper.dispose(this);
}

@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
actual.onSubscribe(this);
}
}

@Override
public void onSuccess(T value) {
actual.onSuccess(value);
}

@Override
public void onError(Throwable e) {
actual.onError(e);
}

@Override
public void onComplete() {
Disposable d = get();
if (d != DisposableHelper.DISPOSED) {
if (compareAndSet(d, null)) {
other.subscribe(new OtherSingleObserver<T>(actual, this));
}
}
}

static final class OtherSingleObserver<T> implements SingleObserver<T> {

final SingleObserver<? super T> actual;

final AtomicReference<Disposable> parent;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OtherSingleObserver can extend AtomicReference to save field, should we do that @akarnokd?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you look at L95, you'll see that parent is SwitchIfEmptyMaybeObserver which is declared as AtomicReference<Disposable> thus this is not a new instace of an AtomicReference here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I saw that, but wouldn't it save field anyway? Though it's super nit

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The downstream can interact with only one replaceable reference to the original or the alternative disposable, so you'll need an extra field either way.

OtherSingleObserver(SingleObserver<? super T> actual, AtomicReference<Disposable> parent) {
this.actual = actual;
this.parent = parent;
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(parent, d);
}
@Override
public void onSuccess(T value) {
actual.onSuccess(value);
}
@Override
public void onError(Throwable e) {
actual.onError(e);
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.internal.operators.maybe;

import io.reactivex.Maybe;
import io.reactivex.Single;
import io.reactivex.TestHelper;
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Function;
import io.reactivex.observers.TestObserver;
import io.reactivex.processors.PublishProcessor;
import io.reactivex.schedulers.Schedulers;
import org.junit.Test;

import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class MaybeSwitchIfEmptySingleTest {

@Test
public void nonEmpty() {
Maybe.just(1).switchIfEmpty(Single.just(2)).test().assertResult(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertValuesOnly() @vanniktech?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

onSuccess translates to onNext + onComplete in TestObserver so this requires assertResult.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AssertValuesOnly is mostly for Observable & Flowable

}

@Test
public void empty() {
Maybe.<Integer>empty().switchIfEmpty(Single.just(2)).test().assertResult(2);
}

@Test
public void error() {
Maybe.<Integer>error(new TestException()).switchIfEmpty(Single.just(2))
.test().assertFailure(TestException.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be great to also check that no values were emitted to check that it didn't switch to fallback Single

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertFailure does check values:

public final U assertFailure(Class<? extends Throwable> error, T... values) {
    return assertSubscribed()
            .assertValues(values)
            .assertError(error)
            .assertNotComplete();
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes assertFailure does the job under the hood

}

@Test
public void errorOther() {
Maybe.empty().switchIfEmpty(Single.<Integer>error(new TestException()))
.test().assertFailure(TestException.class);
}

@Test
public void dispose() {
PublishProcessor<Integer> pp = PublishProcessor.create();

TestObserver<Integer> ts = pp.singleElement().switchIfEmpty(Single.just(2)).test();

assertTrue(pp.hasSubscribers());

ts.cancel();

assertFalse(pp.hasSubscribers());
}


@Test
public void isDisposed() {
PublishProcessor<Integer> pp = PublishProcessor.create();

TestHelper.checkDisposed(pp.singleElement().switchIfEmpty(Single.just(2)));
}

@Test
public void doubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeMaybeToSingle(new Function<Maybe<Integer>, Single<Integer>>() {
@Override
public Single<Integer> apply(Maybe<Integer> f) throws Exception {
return f.switchIfEmpty(Single.just(2));
}
});
}

@Test
public void emptyCancelRace() {
for (int i = 0; i < 500; i++) {
final PublishProcessor<Integer> pp = PublishProcessor.create();

final TestObserver<Integer> ts = pp.singleElement().switchIfEmpty(Single.just(2)).test();

Runnable r1 = new Runnable() {
@Override
public void run() {
pp.onComplete();
}
};

Runnable r2 = new Runnable() {
@Override
public void run() {
ts.cancel();
}
};

TestHelper.race(r1, r2, Schedulers.single());
}
}
}