Skip to content

Commit

Permalink
Change onError(Exception) to onError(Throwable)
Browse files Browse the repository at this point in the history
See issue "Observer#onError should use Throwable" for background => ReactiveX#296
  • Loading branch information
benjchristensen committed Aug 1, 2013
1 parent 52595d3 commit 5b92876
Show file tree
Hide file tree
Showing 57 changed files with 403 additions and 388 deletions.
14 changes: 7 additions & 7 deletions rxjava-core/src/main/java/rx/Notification.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
public class Notification<T> {

private final Kind kind;
private final Exception exception;
private final Throwable exception;
private final T value;

/**
Expand All @@ -44,7 +44,7 @@ public Notification(T value) {
* @param exception
* The exception passed to the onError notification.
*/
public Notification(Exception exception) {
public Notification(Throwable exception) {
this.exception = exception;
this.value = null;
this.kind = Kind.OnError;
Expand All @@ -62,9 +62,9 @@ public Notification() {
/**
* Retrieves the exception associated with an onError notification.
*
* @return The exception associated with an onError notification.
* @return Throwable associated with an onError notification.
*/
public Exception getException() {
public Throwable getThrowable() {
return exception;
}

Expand Down Expand Up @@ -126,7 +126,7 @@ public String toString() {
if (hasValue())
str.append(" ").append(getValue());
if (hasException())
str.append(" ").append(getException().getMessage());
str.append(" ").append(getThrowable().getMessage());
str.append("]");
return str.toString();
}
Expand All @@ -137,7 +137,7 @@ public int hashCode() {
if (hasValue())
hash = hash * 31 + getValue().hashCode();
if (hasException())
hash = hash * 31 + getException().hashCode();
hash = hash * 31 + getThrowable().hashCode();
return hash;
}

Expand All @@ -154,7 +154,7 @@ public boolean equals(Object obj) {
return false;
if (hasValue() && !getValue().equals(notification.getValue()))
return false;
if (hasException() && !getException().equals(notification.getException()))
if (hasException() && !getThrowable().equals(notification.getThrowable()))
return false;
return true;
}
Expand Down
76 changes: 38 additions & 38 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -205,14 +205,14 @@ public Subscription subscribe(Observer<T> observer) {
} catch (OnErrorNotImplementedException e) {
// special handling when onError is not implemented ... we just rethrow
throw e;
} catch (Exception e) {
} catch (Throwable e) {
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
observer.onError(hook.onSubscribeError(this, e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
} catch (Exception e2) {
} catch (Throwable e2) {
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new RuntimeException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
Expand Down Expand Up @@ -295,7 +295,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
handleError(e);
Object onError = callbacks.get("onError");
if (onError != null) {
Expand Down Expand Up @@ -344,7 +344,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
handleError(e);
throw new OnErrorNotImplementedException(e);
}
Expand Down Expand Up @@ -379,7 +379,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
handleError(e);
throw new OnErrorNotImplementedException(e);
}
Expand Down Expand Up @@ -421,7 +421,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
handleError(e);
Functions.from(onError).call(e);
}
Expand All @@ -438,7 +438,7 @@ public Subscription subscribe(final Object onNext, final Object onError, Schedul
return subscribeOn(scheduler).subscribe(onNext, onError);
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError) {
public Subscription subscribe(final Action1<T> onNext, final Action1<Throwable> onError) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Expand All @@ -459,7 +459,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
handleError(e);
onError.call(e);
}
Expand All @@ -472,7 +472,7 @@ public void onNext(T args) {
});
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, Scheduler scheduler) {
public Subscription subscribe(final Action1<T> onNext, final Action1<Throwable> onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}

Expand Down Expand Up @@ -504,7 +504,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
handleError(e);
Functions.from(onError).call(e);
}
Expand All @@ -521,7 +521,7 @@ public Subscription subscribe(final Object onNext, final Object onError, final O
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete) {
public Subscription subscribe(final Action1<T> onNext, final Action1<Throwable> onError, final Action0 onComplete) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Expand All @@ -545,7 +545,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
handleError(e);
onError.call(e);
}
Expand All @@ -558,7 +558,7 @@ public void onNext(T args) {
});
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete, Scheduler scheduler) {
public Subscription subscribe(final Action1<T> onNext, final Action1<Throwable> onError, final Action0 onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}

Expand All @@ -583,7 +583,7 @@ public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) {
*
* @param e
*/
private void handleError(Exception e) {
private void handleError(Throwable e) {
// onError should be rare so we'll only fetch when needed
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
}
Expand Down Expand Up @@ -618,7 +618,7 @@ public Subscription call(Observer<T> t1) {
*/
private static class ThrowObservable<T> extends Observable<T> {

public ThrowObservable(final Exception exception) {
public ThrowObservable(final Throwable exception) {
super(new Func1<Observer<T>, Subscription>() {

/**
Expand Down Expand Up @@ -956,7 +956,7 @@ public static <T> Observable<T> empty() {
* @return an Observable that invokes the {@link Observer}'s
* {@link Observer#onError onError} method when the Observer subscribes to it
*/
public static <T> Observable<T> error(Exception exception) {
public static <T> Observable<T> error(Throwable exception) {
return new ThrowObservable<T>(exception);
}

Expand Down Expand Up @@ -1767,7 +1767,7 @@ public static <T> Observable<T> never() {
* encounters an error
* @return an Observable, identical to the source Observable with its behavior modified as described
*/
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Func1<Exception, Observable<T>> resumeFunction) {
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Func1<Throwable, Observable<T>> resumeFunction) {
return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction));
}

Expand Down Expand Up @@ -1800,11 +1800,11 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorResumeNext(that, new Func1<Exception, Observable<T>>() {
return onErrorResumeNext(that, new Func1<Throwable, Observable<T>>() {

@SuppressWarnings("unchecked")
@Override
public Observable<T> call(Exception e) {
public Observable<T> call(Throwable e) {
return (Observable<T>) _f.call(e);
}
});
Expand Down Expand Up @@ -1866,7 +1866,7 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
* would otherwise cause it to invoke {@link Observer#onError onError}
* @return an Observable, identical to the source Observable with its behavior modified as described
*/
public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Exception, T> resumeFunction) {
public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Throwable, T> resumeFunction) {
return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
}

Expand Down Expand Up @@ -3537,7 +3537,7 @@ public Observable<T> observeOn(Scheduler scheduler) {
* @return an Observable that emits the items and notifications embedded in the
* {@link Notification} objects emitted by the source Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229047(v=vs.103).aspx">MSDN: Observable.dematerialize</a>
* @throws Exception
* @throws Throwable
* if the source Observable is not of type {@code Observable<Notification<T>>}.
*/
@SuppressWarnings("unchecked")
Expand Down Expand Up @@ -3571,7 +3571,7 @@ public <T2> Observable<T2> dematerialize() {
* encounters an error
* @return the original Observable, with appropriately modified behavior
*/
public Observable<T> onErrorResumeNext(final Func1<Exception, Observable<T>> resumeFunction) {
public Observable<T> onErrorResumeNext(final Func1<Throwable, Observable<T>> resumeFunction) {
return onErrorResumeNext(this, resumeFunction);
}

Expand Down Expand Up @@ -3601,11 +3601,11 @@ public Observable<T> onErrorResumeNext(final Func1<Exception, Observable<T>> res
public Observable<T> onErrorResumeNext(final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorResumeNext(this, new Func1<Exception, Observable<T>>() {
return onErrorResumeNext(this, new Func1<Throwable, Observable<T>>() {

@Override
@SuppressWarnings("unchecked")
public Observable<T> call(Exception e) {
public Observable<T> call(Throwable e) {
return (Observable<T>) _f.call(e);
}
});
Expand Down Expand Up @@ -3664,7 +3664,7 @@ public Observable<T> onErrorResumeNext(final Observable<T> resumeSequence) {
* Observable encounters an error
* @return the original Observable with appropriately modified behavior
*/
public Observable<T> onErrorReturn(Func1<Exception, T> resumeFunction) {
public Observable<T> onErrorReturn(Func1<Throwable, T> resumeFunction) {
return onErrorReturn(this, resumeFunction);
}

Expand Down Expand Up @@ -3694,11 +3694,11 @@ public Observable<T> onErrorReturn(Func1<Exception, T> resumeFunction) {
public Observable<T> onErrorReturn(final Object resumeFunction) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(resumeFunction);
return onErrorReturn(this, new Func1<Exception, T>() {
return onErrorReturn(this, new Func1<Throwable, T>() {

@Override
@SuppressWarnings("unchecked")
public T call(Exception e) {
public T call(Throwable e) {
return (T) _f.call(e);
}
});
Expand Down Expand Up @@ -4376,7 +4376,7 @@ public Subscription call(Observer<String> Observer) {
verify(aObserver, times(1)).onNext("one");
verify(aObserver, times(1)).onNext("two");
verify(aObserver, times(1)).onNext("three");
verify(aObserver, Mockito.never()).onError(any(Exception.class));
verify(aObserver, Mockito.never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

Expand Down Expand Up @@ -4453,7 +4453,7 @@ public void testMaterializeDematerializeChaining() {

verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onCompleted();
verify(observer, times(0)).onError(any(Exception.class));
verify(observer, times(0)).onError(any(Throwable.class));
}

/**
Expand All @@ -4467,7 +4467,7 @@ public void testMaterializeDematerializeChaining() {
public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
Expand Down Expand Up @@ -4499,7 +4499,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
Expand Down Expand Up @@ -4533,7 +4533,7 @@ public void onNext(String v) {
@Test
public void testCustomObservableWithErrorInObserverSynchronous() {
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
Expand All @@ -4553,7 +4553,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
Expand Down Expand Up @@ -4584,7 +4584,7 @@ public void onNext(String v) {
@Test
public void testCustomObservableWithErrorInObservableSynchronous() {
final AtomicInteger count = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
Expand All @@ -4601,7 +4601,7 @@ public void onCompleted() {
}

@Override
public void onError(Exception e) {
public void onError(Throwable e) {
error.set(e);
System.out.println("error");
e.printStackTrace();
Expand Down Expand Up @@ -4803,7 +4803,7 @@ public void call(Object t1) {

});
fail("expected exception");
} catch (Exception e) {
} catch (Throwable e) {
assertEquals("failure", e.getMessage());
}
}
Expand All @@ -4822,7 +4822,7 @@ public void call(Object t1) {
@Test
public void testErrorThrownWithoutErrorHandlerAsynchronous() throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Exception> exception = new AtomicReference<Exception>();
final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
Expand All @@ -4833,7 +4833,7 @@ public Subscription call(final Observer<String> observer) {
public void run() {
try {
observer.onError(new RuntimeException("failure"));
} catch (Exception e) {
} catch (Throwable e) {
// without an onError handler it has to just throw on whatever thread invokes it
exception.set(e);
}
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/Observer.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public interface Observer<T> {
*
* @param e
*/
public void onError(Exception e);
public void onError(Throwable e);

/**
* Provides the Observer with new data.
Expand Down

0 comments on commit 5b92876

Please sign in to comment.