Skip to content

Commit

Permalink
Split SubscribeOn into SubscribeOn/UnsubscribeOn
Browse files Browse the repository at this point in the history
Working with @headinthebox based on discussions at ReactiveX#869 and ReactiveX#880 (comment) we determined that there are times when `unsubscribeOn` behavior is needed.

The `subscribeOn` operator can not mix `subscribe` and `unsubscribe` scheduling behavior without breaking the `lift`/`Subscriber` behavior that allows unsubscribing synchronous sources. The newly added `unsubscribeOn` operator will not work with synchronous unsubscribes, but it will work for the targeted use cases such as UI event handlers.
  • Loading branch information
benjchristensen committed Feb 17, 2014
1 parent 18f545a commit 9be8468
Show file tree
Hide file tree
Showing 7 changed files with 812 additions and 334 deletions.
18 changes: 15 additions & 3 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
import rx.operators.OperatorTimestamp;
import rx.operators.OperatorToObservableList;
import rx.operators.OperatorToObservableSortedList;
import rx.operators.OperatorUnsubscribeOn;
import rx.operators.OperatorZip;
import rx.operators.OperatorZipIterable;
import rx.plugins.RxJavaObservableExecutionHook;
Expand Down Expand Up @@ -7074,14 +7075,14 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
}

/**
* Asynchronously subscribes and unsubscribes Observers to this Observable on the specified
* Asynchronously subscribes Observers to this Observable on the specified
* {@link Scheduler}.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/subscribeOn.png">
*
* @param scheduler
* the {@link Scheduler} to perform subscription and unsubscription actions on
* @return the source Observable modified so that its subscriptions and unsubscriptions happen on the
* the {@link Scheduler} to perform subscription actions on
* @return the source Observable modified so that its subscriptions happen on the
* specified {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
* @see #subscribeOn(rx.Scheduler, int)
Expand Down Expand Up @@ -8204,6 +8205,17 @@ public final Observable<List<T>> toSortedList(Func2<? super T, ? super T, Intege
return lift(new OperatorToObservableSortedList<T>(sortFunction));
}

/**
* Asynchronously unsubscribes on the specified {@link Scheduler}.
*
* @param scheduler
* the {@link Scheduler} to perform subscription and unsubscription actions on
* @return the source Observable modified so that its unsubscriptions happen on the specified {@link Scheduler}
*/
public final Observable<T> unsubscribeOn(Scheduler scheduler) {
return lift(new OperatorUnsubscribeOn<T>(scheduler));
}

/**
* Returns an Observable that represents a filtered version of the source Observable.
* <p>
Expand Down
64 changes: 8 additions & 56 deletions rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,46 +23,16 @@
import rx.util.functions.Action1;

/**
* Subscribes and unsubscribes Observers on the specified Scheduler.
* Subscribes Observers on the specified Scheduler.
* <p>
* Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
* in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
* <p>
* See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
* subscribe is solving.
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
*/
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;
/**
* Indicate that events fired between the original subscription time and
* the actual subscription time should not get lost.
*/
private final boolean dontLoseEvents;
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
private final int bufferSize;

public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
this.dontLoseEvents = false;
this.bufferSize = -1;
}

/**
* Construct a SubscribeOn operator.
*
* @param scheduler
* the target scheduler
* @param bufferSize
* if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
* block the source. -1 indicates an unbounded buffer
*/
public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) {
this.scheduler = scheduler;
this.dontLoseEvents = true;
this.bufferSize = bufferSize;
}

@Override
Expand All @@ -71,41 +41,23 @@ public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscr

@Override
public void onCompleted() {
// ignore
// ignore because this is a nested Observable and we expect only 1 Observable<T> emitted to onNext
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

boolean checkNeedBuffer(Observable<?> o) {
return dontLoseEvents;
}

@Override
public void onNext(final Observable<T> o) {
if (checkNeedBuffer(o)) {
// use buffering (possibly blocking) for a possibly synchronous subscribe
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber);
o.subscribe(bus);
subscriber.add(scheduler.schedule(new Action1<Inner>() {
@Override
public void call(final Inner inner) {
bus.enterPassthroughMode();
}
}));
return;
} else {
// no buffering (async subscribe)
subscriber.add(scheduler.schedule(new Action1<Inner>() {
subscriber.add(scheduler.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
o.subscribe(subscriber);
}
}));
}
@Override
public void call(final Inner inner) {
o.subscribe(subscriber);
}
}));
}

};
Expand Down
113 changes: 113 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorSubscribeOnBounded.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable;
import rx.Observable.Operator;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.util.functions.Action1;

/**
* Subscribes and unsubscribes Observers on the specified Scheduler.
* <p>
* Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
* in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
* <p>
* See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
* subscribe is solving.
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
*/
public class OperatorSubscribeOnBounded<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;
/**
* Indicate that events fired between the original subscription time and
* the actual subscription time should not get lost.
*/
private final boolean dontLoseEvents;
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
private final int bufferSize;

public OperatorSubscribeOnBounded(Scheduler scheduler) {
this.scheduler = scheduler;
this.dontLoseEvents = false;
this.bufferSize = -1;
}

/**
* Construct a SubscribeOn operator.
*
* @param scheduler
* the target scheduler
* @param bufferSize
* if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
* block the source. -1 indicates an unbounded buffer
*/
public OperatorSubscribeOnBounded(Scheduler scheduler, int bufferSize) {
this.scheduler = scheduler;
this.dontLoseEvents = true;
this.bufferSize = bufferSize;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
// ignore
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

boolean checkNeedBuffer(Observable<?> o) {
return dontLoseEvents;
}

@Override
public void onNext(final Observable<T> o) {
if (checkNeedBuffer(o)) {
// use buffering (possibly blocking) for a possibly synchronous subscribe
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber);
o.subscribe(bus);
subscriber.add(scheduler.schedule(new Action1<Inner>() {
@Override
public void call(final Inner inner) {
bus.enterPassthroughMode();
}
}));
return;
} else {
// no buffering (async subscribe)
subscriber.add(scheduler.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
o.subscribe(subscriber);
}
}));
}
}

};
}
}
79 changes: 79 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorUnsubscribeOn.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import rx.Observable.Operator;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.MultipleAssignmentSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
* Unsubscribes on the specified Scheduler.
* <p>
*/
public class OperatorUnsubscribeOn<T> implements Operator<T, T> {

private final Scheduler scheduler;

public OperatorUnsubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
}

@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final CompositeSubscription parentSubscription = new CompositeSubscription();
subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
final MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
mas.set(scheduler.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
parentSubscription.unsubscribe();
mas.unsubscribe();
}
}));
}

}));

return new Subscriber<T>(parentSubscription) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}

};
}
}
Loading

0 comments on commit 9be8468

Please sign in to comment.