Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add Completable.onErrorReturn[Item] #6886

Merged
merged 1 commit into from
Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -2323,6 +2323,64 @@ public final Completable onErrorResumeWith(@NonNull CompletableSource fallback)
return onErrorResumeNext(Functions.justFunction(fallback));
}

/**
* Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current
* {@code Completable} instead of signaling the error via {@code onError}.
* <p>
* <img width="640" height="567" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorReturn.png" alt="">
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the item type to return on error
* @param itemSupplier
* a function that returns a single value that will be emitted as success value
* the current {@code Completable} signals an {@code onError} event
* @return the new {@link Maybe} instance
* @throws NullPointerException if {@code itemSupplier} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
* @since 3.0.0
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? extends T> itemSupplier) {
Objects.requireNonNull(itemSupplier, "itemSupplier is null");
return RxJavaPlugins.onAssembly(new CompletableOnErrorReturn<>(this, itemSupplier));
}

/**
* Ends the flow with the given success item when the current {@code Completable}
* fails instead of signaling the error via {@code onError}.
* <p>
* <img width="640" height="567" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorReturnItem.png" alt="">
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code onErrorReturnItem} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the item type to return on error
* @param item
* the value that is emitted as {@code onSuccess} in case the current {@code Completable} signals an {@code onError}
* @return the new {@link Maybe} instance
* @throws NullPointerException if {@code item} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
*/
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public final <T> Maybe<T> onErrorReturnItem(@NonNull T item) {
Objects.requireNonNull(item, "item is null");
return onErrorReturn(Functions.justFunction(item));
}

/**
* Nulls out references to the upstream producer and downstream {@link CompletableObserver} if
* the sequence is terminated or downstream calls {@code dispose()}.
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -4467,7 +4467,7 @@ public final Maybe<T> onErrorResumeNext(@NonNull Function<? super Throwable, ? e
* Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current
* {@code Maybe} instead of signaling the error via {@code onError}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt="">
* <img width="640" height="377" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorReturn.png" alt="">
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
Expand All @@ -4494,7 +4494,7 @@ public final Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? exten
/**
* Ends the flow with the given success item when the current {@code Maybe} fails instead of signaling the error via {@code onError}.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt="">
* <img width="640" height="377" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorReturnItem.png" alt="">
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
Expand All @@ -4504,7 +4504,7 @@ public final Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? exten
* </dl>
*
* @param item
* the value that is emitted as {@code onSuccess} in case this {@code Maybe} signals an {@code onError}
* the value that is emitted as {@code onSuccess} in case the current {@code Maybe} signals an {@code onError}
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code item} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.completable;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;

import java.util.Objects;

/**
* Returns a value generated via a function if the main source signals an onError.
* @param <T> the value type
* @since 3.0.0
*/
public final class CompletableOnErrorReturn<T> extends Maybe<T> {

final CompletableSource source;

final Function<? super Throwable, ? extends T> valueSupplier;

public CompletableOnErrorReturn(CompletableSource source,
Function<? super Throwable, ? extends T> valueSupplier) {
this.source = source;
this.valueSupplier = valueSupplier;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier));
}

static final class OnErrorReturnMaybeObserver<T> implements CompletableObserver, Disposable {

final MaybeObserver<? super T> downstream;

final Function<? super Throwable, ? extends T> itemSupplier;

Disposable upstream;

OnErrorReturnMaybeObserver(MaybeObserver<? super T> actual,
Function<? super Throwable, ? extends T> itemSupplier) {
this.downstream = actual;
this.itemSupplier = itemSupplier;
}

@Override
public void dispose() {
upstream.dispose();
}

@Override
public boolean isDisposed() {
return upstream.isDisposed();
}

@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;

downstream.onSubscribe(this);
}
}

@Override
public void onError(Throwable e) {
T v;

try {
v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(e, ex));
return;
}

downstream.onSuccess(v);
}

@Override
public void onComplete() {
downstream.onComplete();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,31 +27,31 @@
*/
public final class MaybeOnErrorReturn<T> extends AbstractMaybeWithUpstream<T, T> {

final Function<? super Throwable, ? extends T> valueSupplier;
final Function<? super Throwable, ? extends T> itemSupplier;

public MaybeOnErrorReturn(MaybeSource<T> source,
Function<? super Throwable, ? extends T> valueSupplier) {
Function<? super Throwable, ? extends T> itemSupplier) {
super(source);
this.valueSupplier = valueSupplier;
this.itemSupplier = itemSupplier;
}

@Override
protected void subscribeActual(MaybeObserver<? super T> observer) {
source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier));
source.subscribe(new OnErrorReturnMaybeObserver<>(observer, itemSupplier));
}

static final class OnErrorReturnMaybeObserver<T> implements MaybeObserver<T>, Disposable {

final MaybeObserver<? super T> downstream;

final Function<? super Throwable, ? extends T> valueSupplier;
final Function<? super Throwable, ? extends T> itemSupplier;

Disposable upstream;

OnErrorReturnMaybeObserver(MaybeObserver<? super T> actual,
Function<? super Throwable, ? extends T> valueSupplier) {
this.downstream = actual;
this.valueSupplier = valueSupplier;
this.itemSupplier = valueSupplier;
}

@Override
Expand Down Expand Up @@ -83,7 +83,7 @@ public void onError(Throwable e) {
T v;

try {
v = Objects.requireNonNull(valueSupplier.apply(e), "The valueSupplier returned a null value");
v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
downstream.onError(new CompositeException(e, ex));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,16 @@

import static org.junit.Assert.assertEquals;

import java.io.IOException;

import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.subjects.CompletableSubject;
import io.reactivex.rxjava3.testsupport.TestHelper;

public class CompletableOnErrorXTest extends RxJavaTest {

Expand Down Expand Up @@ -46,4 +52,55 @@ public CompletableSource apply(Throwable e) throws Exception {

assertEquals(0, call[0]);
}

@Test
public void onErrorReturnConst() {
Completable.error(new TestException())
.onErrorReturnItem(1)
.test()
.assertResult(1);
}

@Test
public void onErrorReturn() {
Completable.error(new TestException())
.onErrorReturn(Functions.justFunction(1))
.test()
.assertResult(1);
}

@Test
public void onErrorReturnFunctionThrows() {
TestHelper.assertCompositeExceptions(Completable.error(new TestException())
.onErrorReturn(new Function<Throwable, Object>() {
@Override
public Object apply(Throwable v) throws Exception {
throw new IOException();
}
})
.to(TestHelper.testConsumer()), TestException.class, IOException.class);
}

@Test
public void onErrorReturnEmpty() {
Completable.complete()
.onErrorReturnItem(2)
.test()
.assertResult();
}

@Test
public void onErrorReturnDispose() {
TestHelper.checkDisposed(CompletableSubject.create().onErrorReturnItem(1));
}

@Test
public void onErrorReturnDoubleOnSubscribe() {
TestHelper.checkDoubleOnSubscribeCompletableToMaybe(new Function<Completable, MaybeSource<Object>>() {
@Override
public MaybeSource<Object> apply(Completable v) throws Exception {
return v.onErrorReturnItem(1);
}
});
}
}