Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorOnErrorReturn #1110

Merged
merged 1 commit into from
Apr 30, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@
import rx.operators.OperationMergeMaxConcurrent;
import rx.operators.OperationMulticast;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationOnExceptionResumeNextViaObservable;
import rx.operators.OperationParallelMerge;
import rx.operators.OperationReplay;
Expand Down Expand Up @@ -108,6 +107,7 @@
import rx.operators.OperatorObserveOn;
import rx.operators.OperatorOnErrorFlatMap;
import rx.operators.OperatorOnErrorResumeNextViaFunction;
import rx.operators.OperatorOnErrorReturn;
import rx.operators.OperatorParallel;
import rx.operators.OperatorPivot;
import rx.operators.OperatorRepeat;
Expand Down Expand Up @@ -4547,7 +4547,7 @@ public final Observable<T> onErrorResumeNext(final Observable<? extends T> resum
* @see <a href="https://github.com/Netflix/RxJava/wiki/Error-Handling-Operators#wiki-onerrorreturn">RxJava Wiki: onErrorReturn()</a>
*/
public final Observable<T> onErrorReturn(Func1<Throwable, ? extends T> resumeFunction) {
return create(OperationOnErrorReturn.onErrorReturn(this, resumeFunction));
return lift(new OperatorOnErrorReturn<T>(resumeFunction));
}

/**
Expand Down
124 changes: 0 additions & 124 deletions rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java

This file was deleted.

79 changes: 79 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorOnErrorReturn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package rx.operators;

import java.util.Arrays;
import rx.Observable.Operator;
import rx.Subscriber;
import rx.exceptions.CompositeException;
import rx.functions.Func1;

/**
* Instruct an Observable to emit a particular item to its Observer's <code>onNext</code> method
* rather than invoking <code>onError</code> if it encounters an error.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/onErrorReturn.png">
* <p>
* By default, when an Observable encounters an error that prevents it from emitting the expected
* item to its Observer, the Observable invokes its Observer's <code>onError</code> method, and then
* quits without invoking any more of its Observer's methods. The onErrorReturn operation changes
* this behavior. If you pass a function (resumeFunction) to onErrorReturn, if the original
* Observable encounters an error, instead of invoking its Observer's <code>onError</code> method,
* it will instead pass the return value of resumeFunction to the Observer's <code>onNext</code>
* method.
* <p>
* You can use this to prevent errors from propagating or to supply fallback data should errors be
* encountered.
*
* @param <T> the value type
*/
public final class OperatorOnErrorReturn<T> implements Operator<T, T> {
final Func1<Throwable, ? extends T> resultFunction;

public OperatorOnErrorReturn(Func1<Throwable, ? extends T> resultFunction) {
this.resultFunction = resultFunction;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
return new Subscriber<T>(child) {

@Override
public void onNext(T t) {
child.onNext(t);
}

@Override
public void onError(Throwable e) {
try {
T result = resultFunction.call(e);

child.onNext(result);
} catch (Throwable x) {
child.onError(new CompositeException(Arrays.asList(e, x)));
return;
}
child.onCompleted();
}

@Override
public void onCompleted() {
child.onCompleted();
}

};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static rx.operators.OperationOnErrorReturn.onErrorReturn;

import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -30,10 +29,11 @@

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Func1;

public class OperationOnErrorReturnTest {
public class OperatorOnErrorReturnTest {

@Test
public void testResumeNext() {
Expand All @@ -42,15 +42,15 @@ public void testResumeNext() {
Observable<String> w = Observable.create(f);
final AtomicReference<Throwable> capturedException = new AtomicReference<Throwable>();

Observable<String> observable = Observable.create(onErrorReturn(w, new Func1<Throwable, String>() {
Observable<String> observable = w.onErrorReturn(new Func1<Throwable, String>() {

@Override
public String call(Throwable e) {
capturedException.set(e);
return "failure";
}

}));
});

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand Down Expand Up @@ -79,15 +79,15 @@ public void testFunctionThrowsError() {
Observable<String> w = Observable.create(f);
final AtomicReference<Throwable> capturedException = new AtomicReference<Throwable>();

Observable<String> observable = Observable.create(onErrorReturn(w, new Func1<Throwable, String>() {
Observable<String> observable = w.onErrorReturn(new Func1<Throwable, String>() {

@Override
public String call(Throwable e) {
capturedException.set(e);
throw new RuntimeException("exception from function");
}

}));
});

@SuppressWarnings("unchecked")
Observer<String> observer = mock(Observer.class);
Expand All @@ -108,7 +108,7 @@ public String call(Throwable e) {
assertNotNull(capturedException.get());
}

private static class TestObservable implements Observable.OnSubscribeFunc<String> {
private static class TestObservable implements Observable.OnSubscribe<String> {

final Subscription s;
final String[] values;
Expand All @@ -120,7 +120,7 @@ public TestObservable(Subscription s, String... values) {
}

@Override
public Subscription onSubscribe(final Observer<? super String> observer) {
public void call(final Subscriber<? super String> subscriber) {
System.out.println("TestObservable subscribed to ...");
t = new Thread(new Runnable() {

Expand All @@ -130,19 +130,18 @@ public void run() {
System.out.println("running TestObservable thread");
for (String s : values) {
System.out.println("TestObservable onNext: " + s);
observer.onNext(s);
subscriber.onNext(s);
}
throw new RuntimeException("Forced Failure");
} catch (Throwable e) {
observer.onError(e);
subscriber.onError(e);
}
}

});
System.out.println("starting TestObservable thread");
t.start();
System.out.println("done starting TestObservable thread");
return s;
}
}
}