Skip to content

Commit

Permalink
Merge pull request #869 from benjchristensen/subscribeOn+groupBy
Browse files Browse the repository at this point in the history
subscribeOn + groupBy
  • Loading branch information
benjchristensen committed Feb 14, 2014
2 parents 1e6224d + 6863f57 commit b9fe278
Show file tree
Hide file tree
Showing 8 changed files with 581 additions and 84 deletions.
3 changes: 2 additions & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -7102,11 +7102,12 @@ public final Subscription subscribe(Subscriber<? super T> observer, Scheduler sc
* @return the source Observable modified so that its subscriptions and unsubscriptions happen on the
* specified {@link Scheduler}
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#wiki-subscribeon">RxJava Wiki: subscribeOn()</a>
* @see #subscribeOn(rx.Scheduler, int)
*/
public final Observable<T> subscribeOn(Scheduler scheduler) {
return nest().lift(new OperatorSubscribeOn<T>(scheduler));
}

/**
* Returns an Observable that extracts a Double from each of the items emitted by the source Observable via
* a function you specify, and then emits the sum of these Doubles.
Expand Down
8 changes: 8 additions & 0 deletions rxjava-core/src/main/java/rx/observers/TestSubscriber.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ public void awaitTerminalEvent(long timeout, TimeUnit unit) {
}
}

public void awaitTerminalEventAndUnsubscribeOnTimeout(long timeout, TimeUnit unit) {
try {
awaitTerminalEvent(timeout, unit);
} catch (RuntimeException e) {
unsubscribe();
}
}

public Thread getLastSeenThread() {
return lastSeenThread;
}
Expand Down
193 changes: 193 additions & 0 deletions rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/**
* Copyright 2014 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.operators;

import java.util.LinkedList;
import java.util.Queue;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;

/**
* Buffers the incoming events until notified, then replays the
* buffered events and continues as a simple pass-through subscriber.
* @param <T> the streamed value type
*/
public class BufferUntilSubscriber<T> extends Subscriber<T> {
/** The actual subscriber. */
private final Subscriber<? super T> actual;
/** Indicate the pass-through mode. */
private volatile boolean passthroughMode;
/** Protect mode transition. */
private final Object gate = new Object();
/** The buffered items. */
private final Queue<Object> queue = new LinkedList<Object>();
/** The queue capacity. */
private final int capacity;
/** Null sentinel (in case queue type is changed). */
private static final Object NULL_SENTINEL = new Object();
/** Complete sentinel. */
private static final Object COMPLETE_SENTINEL = new Object();
/**
* Container for an onError event.
*/
private static final class ErrorSentinel {
final Throwable t;

public ErrorSentinel(Throwable t) {
this.t = t;
}

}
/**
* Constructor that wraps the actual subscriber and shares its subscription.
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
* @param actual
*/
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual) {
super(actual);
this.actual = actual;
this.capacity = capacity;
}
/**
* Constructor that wraps the actual subscriber and uses the given composite
* subscription.
* @param capacity the queue capacity to accept before blocking, negative value indicates an unbounded queue
* @param actual
* @param cs
*/
public BufferUntilSubscriber(int capacity, Subscriber<? super T> actual, CompositeSubscription cs) {
super(cs);
this.actual = actual;
this.capacity = capacity;
}

/**
* Call this method to replay the buffered events and continue as a pass-through subscriber.
* If already in pass-through mode, this method is a no-op.
*/
public void enterPassthroughMode() {
if (!passthroughMode) {
synchronized (gate) {
if (!passthroughMode) {
while (!queue.isEmpty()) {
Object o = queue.poll();
if (!actual.isUnsubscribed()) {
if (o == NULL_SENTINEL) {
actual.onNext(null);
} else
if (o == COMPLETE_SENTINEL) {
actual.onCompleted();
} else
if (o instanceof ErrorSentinel) {
actual.onError(((ErrorSentinel)o).t);
} else
if (o != null) {
@SuppressWarnings("unchecked")
T v = (T)o;
actual.onNext(v);
} else {
throw new NullPointerException();
}
}
}
passthroughMode = true;
gate.notifyAll();
}
}
}
}
@Override
public void onNext(T t) {
if (!passthroughMode) {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(t != null ? t : NULL_SENTINEL);
return;
}
try {
while (!passthroughMode) {
gate.wait();
}
if (actual.isUnsubscribed()) {
return;
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
actual.onError(ex);
return;
}
}
}
}
actual.onNext(t);
}

@Override
public void onError(Throwable e) {
if (!passthroughMode) {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(new ErrorSentinel(e));
return;
}
try {
while (!passthroughMode) {
gate.wait();
}
if (actual.isUnsubscribed()) {
return;
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
actual.onError(ex);
return;
}
}
}
}
actual.onError(e);
}

@Override
public void onCompleted() {
if (!passthroughMode) {
synchronized (gate) {
if (!passthroughMode) {
if (capacity < 0 || queue.size() < capacity) {
queue.offer(COMPLETE_SENTINEL);
return;
}
try {
while (!passthroughMode) {
gate.wait();
}
if (actual.isUnsubscribed()) {
return;
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
actual.onError(ex);
return;
}
}
}
}
actual.onCompleted();
}

}
3 changes: 3 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperatorParallel.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public Integer call(T t) {

@Override
public Observable<R> call(GroupedObservable<Integer, T> g) {
// Must use observeOn not subscribeOn because we have a single source behind groupBy.
// The origin is already subscribed to, we are moving each group on to a new thread
// but the origin itself can only be on a single thread.
return f.call(g.observeOn(scheduler));
}
});
Expand Down
102 changes: 57 additions & 45 deletions rxjava-core/src/main/java/rx/operators/OperatorSubscribeOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,54 @@
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.subscriptions.CompositeSubscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Action1;

/**
* Asynchronously subscribes and unsubscribes Observers on the specified Scheduler.
* Subscribes and unsubscribes Observers on the specified Scheduler.
* <p>
* Will occur asynchronously except when subscribing to `GroupedObservable`, `PublishSubject` and possibly other "hot" Observables
* in which case it will subscribe synchronously and buffer/block onNext calls until the subscribe has occurred.
* <p>
* See https://github.com/Netflix/RxJava/issues/844 for more information on the "time gap" issue that the synchronous
* subscribe is solving.
*
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/subscribeOn.png">
*/
public class OperatorSubscribeOn<T> implements Operator<T, Observable<T>> {

private final Scheduler scheduler;
/**
* Indicate that events fired between the original subscription time and
* the actual subscription time should not get lost.
*/
private final boolean dontLoseEvents;
/** The buffer size to avoid flooding. Negative value indicates an unbounded buffer. */
private final int bufferSize;

public OperatorSubscribeOn(Scheduler scheduler) {
this.scheduler = scheduler;
this.dontLoseEvents = false;
this.bufferSize = -1;
}

/**
* Construct a SubscribeOn operator.
*
* @param scheduler
* the target scheduler
* @param bufferSize
* if dontLoseEvents == true, this indicates the buffer size. Filling the buffer will
* block the source. -1 indicates an unbounded buffer
*/
public OperatorSubscribeOn(Scheduler scheduler, int bufferSize) {
this.scheduler = scheduler;
this.dontLoseEvents = true;
this.bufferSize = bufferSize;
}

@Override
public Subscriber<? super Observable<T>> call(final Subscriber<? super T> subscriber) {
return new Subscriber<Observable<T>>() {
return new Subscriber<Observable<T>>(subscriber) {

@Override
public void onCompleted() {
Expand All @@ -52,48 +79,33 @@ public void onError(Throwable e) {
subscriber.onError(e);
}

boolean checkNeedBuffer(Observable<?> o) {
return dontLoseEvents;
}

@Override
public void onNext(final Observable<T> o) {
scheduler.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
final CompositeSubscription cs = new CompositeSubscription();
subscriber.add(Subscriptions.create(new Action0() {

@Override
public void call() {
inner.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
cs.unsubscribe();
}

});
}

}));
cs.add(subscriber);
o.subscribe(new Subscriber<T>(cs) {

@Override
public void onCompleted() {
subscriber.onCompleted();
}

@Override
public void onError(Throwable e) {
subscriber.onError(e);
}

@Override
public void onNext(T t) {
subscriber.onNext(t);
}
});
}
});
if (checkNeedBuffer(o)) {
// use buffering (possibly blocking) for a possibly synchronous subscribe
final BufferUntilSubscriber<T> bus = new BufferUntilSubscriber<T>(bufferSize, subscriber);
o.subscribe(bus);
subscriber.add(scheduler.schedule(new Action1<Inner>() {
@Override
public void call(final Inner inner) {
bus.enterPassthroughMode();
}
}));
return;
} else {
// no buffering (async subscribe)
subscriber.add(scheduler.schedule(new Action1<Inner>() {

@Override
public void call(final Inner inner) {
o.subscribe(subscriber);
}
}));
}
}

};
Expand Down
Loading

0 comments on commit b9fe278

Please sign in to comment.