Skip to content

Remove Redundant protectivelyWrap Method #1015

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 3, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 12 additions & 40 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5367,17 +5367,6 @@ public final <R> Observable<R> parallel(final Func1<Observable<T>, Observable<R>
return lift(new OperatorParallel<T, R>(f, s));
}

/**
* Protects against errors being thrown from Observer implementations and ensures
* onNext/onError/onCompleted contract compliance.
* <p>
* See https://github.com/Netflix/RxJava/issues/216 for a discussion on "Guideline 6.4: Protect calls to
* user code from within an Observer"
*/
private Subscription protectivelyWrapAndSubscribe(Subscriber<? super T> o) {
return subscribe(new SafeSubscriber<T>(o));
}

/**
* Returns a {@link ConnectableObservable}, which waits until its {@link ConnectableObservable#connect connect} method is called before it begins emitting items to those {@link Observer}s that
* have subscribed to it.
Expand Down Expand Up @@ -6704,7 +6693,7 @@ public final Observable<T> startWith(T[] values, Scheduler scheduler) {
* if the Observable tries to call {@code onError}
*/
public final Subscription subscribe() {
return protectivelyWrapAndSubscribe(new Subscriber<T>() {
return subscribe(new Subscriber<T>() {

@Override
public final void onCompleted() {
Expand Down Expand Up @@ -6743,13 +6732,7 @@ public final Subscription subscribe(final Action1<? super T> onNext) {
throw new IllegalArgumentException("onNext can not be null");
}

/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to
* user code from within an Observer"
*/
return protectivelyWrapAndSubscribe(new Subscriber<T>() {
return subscribe(new Subscriber<T>() {

@Override
public final void onCompleted() {
Expand Down Expand Up @@ -6793,13 +6776,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, final Actio
throw new IllegalArgumentException("onError can not be null");
}

/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on
* "Guideline 6.4: Protect calls to user code from within an Observer"
*/
return protectivelyWrapAndSubscribe(new Subscriber<T>() {
return subscribe(new Subscriber<T>() {

@Override
public final void onCompleted() {
Expand Down Expand Up @@ -6850,12 +6827,7 @@ public final Subscription subscribe(final Action1<? super T> onNext, final Actio
throw new IllegalArgumentException("onComplete can not be null");
}

/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an Observer"
*/
return protectivelyWrapAndSubscribe(new Subscriber<T>() {
return subscribe(new Subscriber<T>() {

@Override
public final void onCompleted() {
Expand Down Expand Up @@ -7011,7 +6983,7 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
* For more information see the
* <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
*
* @param observer
* @param subscriber
* the {@link Subscriber}
* @return a {@link Subscription} reference with which Subscribers that are {@link Observer}s can
* unsubscribe from the Observable
Expand All @@ -7024,11 +6996,11 @@ public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
* @throws RuntimeException
* if the {@link Subscriber}'s {@code onError} method itself threw a {@code Throwable}
*/
public final Subscription subscribe(Subscriber<? super T> observer) {
public final Subscription subscribe(Subscriber<? super T> subscriber) {
// allow the hook to intercept and/or decorate
OnSubscribe<T> onSubscribeFunction = hook.onSubscribeStart(this, onSubscribe);
// validate and proceed
if (observer == null) {
if (subscriber == null) {
throw new IllegalArgumentException("observer can not be null");
}
if (onSubscribeFunction == null) {
Expand All @@ -7044,12 +7016,12 @@ public final Subscription subscribe(Subscriber<? super T> observer) {
* to user code from within an Observer"
*/
// if not already wrapped
if (!(observer instanceof SafeSubscriber)) {
if (!(subscriber instanceof SafeSubscriber)) {
// assign to `observer` so we return the protected version
observer = new SafeSubscriber<T>(observer);
subscriber = new SafeSubscriber<T>(subscriber);
}
onSubscribeFunction.call(observer);
final Subscription returnSubscription = hook.onSubscribeReturn(observer);
onSubscribeFunction.call(subscriber);
final Subscription returnSubscription = hook.onSubscribeReturn(subscriber);
// we return it inside a Subscription so it can't be cast back to Subscriber
return Subscriptions.create(new Action0() {

Expand All @@ -7064,7 +7036,7 @@ public void call() {
Exceptions.throwIfFatal(e);
// if an unhandled error occurs executing the onSubscribe we will propagate it
try {
observer.onError(hook.onSubscribeError(e));
subscriber.onError(hook.onSubscribeError(e));
} catch (OnErrorNotImplementedException e2) {
// special handling when onError is not implemented ... we just rethrow
throw e2;
Expand Down