Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UnsafeSubscribe #1010

Merged
merged 3 commits into from
Apr 3, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
553 changes: 211 additions & 342 deletions rxjava-core/src/main/java/rx/Observable.java

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/joins/JoinObserver1.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void addActivePlan(ActivePlan0 activePlan) {
public void subscribe(Object gate) {
if (subscribed.compareAndSet(false, true)) {
this.gate = gate;
source.materialize().subscribe(this);
source.materialize().unsafeSubscribe(this);
} else {
throw new IllegalStateException("Can only be subscribed to once.");
}
Expand Down
41 changes: 13 additions & 28 deletions rxjava-core/src/main/java/rx/observables/BlockingObservable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@

import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func1;
import rx.observers.SafeSubscriber;
import rx.operators.OperationLatest;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationToFuture;
import rx.operators.OperationToIterator;
import rx.operators.BlockingOperatorLatest;
import rx.operators.BlockingOperatorMostRecent;
import rx.operators.BlockingOperatorNext;
import rx.operators.BlockingOperatorToFuture;
import rx.operators.BlockingOperatorToIterator;

/**
* An extension of {@link Observable} that provides blocking operators.
Expand Down Expand Up @@ -64,17 +62,6 @@ public static <T> BlockingObservable<T> from(final Observable<? extends T> o) {
return new BlockingObservable<T>(o);
}

/**
* Used for protecting against errors being thrown from {@link Subscriber} implementations and ensuring onNext/onError/onCompleted contract
* compliance.
* <p>
* See https://github.com/Netflix/RxJava/issues/216 for discussion on
* "Guideline 6.4: Protect calls to user code from within an operator"
*/
private Subscription protectivelyWrapAndSubscribe(Subscriber<? super T> observer) {
return o.subscribe(new SafeSubscriber<T>(observer));
}

/**
* Invoke a method on each item emitted by the {@link Observable}; block
* until the Observable completes.
Expand All @@ -97,12 +84,10 @@ public void forEach(final Action1<? super T> onNext) {
final AtomicReference<Throwable> exceptionFromOnError = new AtomicReference<Throwable>();

/**
* Wrapping since raw functions provided by the user are being invoked.
*
* See https://github.com/Netflix/RxJava/issues/216 for discussion on
* "Guideline 6.4: Protect calls to user code from within an operator"
* Use 'subscribe' instead of 'unsafeSubscribe' for Rx contract behavior
* as this is the final subscribe in the chain.
*/
protectivelyWrapAndSubscribe(new Subscriber<T>() {
o.subscribe(new Subscriber<T>() {
@Override
public void onCompleted() {
latch.countDown();
Expand Down Expand Up @@ -158,7 +143,7 @@ public void onNext(T args) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: getIterator()</a>
*/
public Iterator<T> getIterator() {
return OperationToIterator.toIterator(o);
return BlockingOperatorToIterator.toIterator(o);
}

/**
Expand Down Expand Up @@ -311,7 +296,7 @@ public T lastOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229751.aspx">MSDN: Observable.MostRecent</a>
*/
public Iterable<T> mostRecent(T initialValue) {
return OperationMostRecent.mostRecent(o, initialValue);
return BlockingOperatorMostRecent.mostRecent(o, initialValue);
}

/**
Expand All @@ -324,7 +309,7 @@ public Iterable<T> mostRecent(T initialValue) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211897.aspx">MSDN: Observable.Next</a>
*/
public Iterable<T> next() {
return OperationNext.next(o);
return BlockingOperatorNext.next(o);
}

/**
Expand All @@ -344,7 +329,7 @@ public Iterable<T> next() {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212115.aspx">MSDN: Observable.Latest</a>
*/
public Iterable<T> latest() {
return OperationLatest.latest(o);
return BlockingOperatorLatest.latest(o);
}

/**
Expand Down Expand Up @@ -441,7 +426,7 @@ public T singleOrDefault(T defaultValue, Func1<? super T, Boolean> predicate) {
* @see <a href="https://github.com/Netflix/RxJava/wiki/Blocking-Observable-Operators#transformations-tofuture-toiterable-and-toiteratorgetiterator">RxJava Wiki: toFuture()</a>
*/
public Future<T> toFuture() {
return OperationToFuture.toFuture(o);
return BlockingOperatorToFuture.toFuture(o);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public static <K, T> GroupedObservable<K, T> from(K key, final Observable<T> o)

@Override
public void call(Subscriber<? super T> s) {
o.subscribe(s);
o.unsafeSubscribe(s);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
* Wait for and iterate over the latest values of the source observable.
* If the source works faster than the iterator, values may be skipped, but
* not the onError or onCompleted events.
*/
public final class OperationLatest {
public final class BlockingOperatorLatest {
/** Utility class. */
private OperationLatest() {
private BlockingOperatorLatest() {
throw new IllegalStateException("No instances!");
}

Expand All @@ -48,7 +48,7 @@ public Iterator<T> iterator() {
}

/** Observer of source, iterator for output. */
static final class LatestObserverIterator<T> implements Observer<Notification<? extends T>>, Iterator<T> {
static final class LatestObserverIterator<T> extends Subscriber<Notification<? extends T>> implements Iterator<T> {
final Semaphore notify = new Semaphore(0);
// observer's notification
final AtomicReference<Notification<? extends T>> reference = new AtomicReference<Notification<? extends T>>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
Expand All @@ -29,7 +29,7 @@
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.mostRecent.png">
*/
public final class OperationMostRecent {
public final class BlockingOperatorMostRecent {

public static <T> Iterable<T> mostRecent(final Observable<? extends T> source, final T initialValue) {

Expand All @@ -39,6 +39,10 @@ public Iterator<T> iterator() {
MostRecentObserver<T> mostRecentObserver = new MostRecentObserver<T>(initialValue);
final MostRecentIterator<T> nextIterator = new MostRecentIterator<T>(mostRecentObserver);

/**
* Subscribe instead of unsafeSubscribe since this is the final subscribe in the chain
* since it is for BlockingObservable.
*/
source.subscribe(mostRecentObserver);

return nextIterator;
Expand Down Expand Up @@ -74,7 +78,7 @@ public void remove() {
}
}

private static class MostRecentObserver<T> implements Observer<T> {
private static class MostRecentObserver<T> extends Subscriber<T> {
private final AtomicBoolean completed = new AtomicBoolean(false);
private final AtomicReference<T> value;
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
* Returns an Iterable that blocks until the Observable emits another item, then returns that item.
* <p>
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/B.next.png">
*/
public final class OperationNext {
public final class BlockingOperatorNext {

public static <T> Iterable<T> next(final Observable<? extends T> items) {
return new Iterable<T>() {
Expand Down Expand Up @@ -133,7 +133,7 @@ public void remove() {
}
}

private static class NextObserver<T> implements Observer<Notification<? extends T>> {
private static class NextObserver<T> extends Subscriber<Notification<? extends T>> {
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
private final AtomicBoolean waiting = new AtomicBoolean(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.Subscription;

/**
Expand All @@ -35,7 +35,7 @@
* The toFuture operation throws an exception if the Observable emits more than one item. If the
* Observable may emit more than item, use <code>toList().toFuture()</code>.
*/
public class OperationToFuture {
public class BlockingOperatorToFuture {

/**
* Returns a Future that expects a single item from the observable.
Expand All @@ -52,7 +52,7 @@ public static <T> Future<T> toFuture(Observable<? extends T> that) {
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

final Subscription s = that.subscribe(new Observer<T>() {
final Subscription s = that.subscribe(new Subscriber<T>() {

@Override
public void onCompleted() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscriber;
import rx.exceptions.Exceptions;

/**
Expand All @@ -33,7 +33,7 @@
*
* @see <a href="https://github.com/Netflix/RxJava/issues/50">Issue #50</a>
*/
public class OperationToIterator {
public class BlockingOperatorToIterator {

/**
* Returns an iterator that iterates all values of the observable.
Expand All @@ -45,7 +45,8 @@ public class OperationToIterator {
public static <T> Iterator<T> toIterator(Observable<? extends T> source) {
final BlockingQueue<Notification<? extends T>> notifications = new LinkedBlockingQueue<Notification<? extends T>>();

source.materialize().subscribe(new Observer<Notification<? extends T>>() {
// using subscribe instead of unsafeSubscribe since this is a BlockingObservable "final subscribe"
source.materialize().subscribe(new Subscriber<Notification<? extends T>>() {
@Override
public void onCompleted() {
// ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void onNext(T t) {
* It will then immediately swap itself out for the actual (after a single notification), but since this is now
* being done on the same producer thread no further buffering will occur.
*/
private static class PassThruObserver<T> implements Observer<T> {
private static class PassThruObserver<T> extends Subscriber<T> {

private final Observer<? super T> actual;
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
Expand Down Expand Up @@ -133,7 +133,7 @@ private void drainIfNeededAndSwitchToActual() {

}

private static class BufferedObserver<T> implements Observer<T> {
private static class BufferedObserver<T> extends Subscriber<T> {
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();

@Override
Expand Down
62 changes: 50 additions & 12 deletions rxjava-core/src/main/java/rx/operators/ChunkedOperation.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@
import rx.Observer;
import rx.Scheduler;
import rx.Scheduler.Inner;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.functions.Func0;
import rx.functions.Func1;
import rx.subscriptions.CompositeSubscription;

/**
* The base class for operations that break observables into "chunks". Currently buffers and windows.
Expand Down Expand Up @@ -408,7 +410,7 @@ public void pushValue(T value) {
* The type of object all internal {@link rx.operators.ChunkedOperation.Chunk} objects record.
* <C> The type of object being tracked by the {@link Chunk}
*/
protected static class ChunkObserver<T, C> implements Observer<T> {
protected static class ChunkObserver<T, C> extends Subscriber<T> {

private final Chunks<T, C> chunks;
private final Observer<? super C> observer;
Expand Down Expand Up @@ -492,12 +494,24 @@ public ObservableBasedSingleChunkCreator(NonOverlappingChunks<T, C> chunks, Func

private void listenForChunkEnd() {
Observable<? extends TClosing> closingObservable = chunkClosingSelector.call();
closingObservable.subscribe(new Action1<TClosing>() {
closingObservable.unsafeSubscribe(new Subscriber<TClosing>() {

@Override
public void onCompleted() {

}

@Override
public void onError(Throwable e) {

}

@Override
public void call(TClosing closing) {
public void onNext(TClosing t) {
chunks.emitAndReplaceChunk();
listenForChunkEnd();
listenForChunkEnd();
}

});
}

Expand All @@ -524,23 +538,47 @@ public void stop() {
*/
protected static class ObservableBasedMultiChunkCreator<T, C, TOpening, TClosing> implements ChunkCreator {

private final SafeObservableSubscription subscription = new SafeObservableSubscription();
private final CompositeSubscription subscription = new CompositeSubscription();

public ObservableBasedMultiChunkCreator(final OverlappingChunks<T, C> chunks, Observable<? extends TOpening> openings, final Func1<? super TOpening, ? extends Observable<? extends TClosing>> chunkClosingSelector) {
subscription.wrap(openings.subscribe(new Action1<TOpening>() {
openings.unsafeSubscribe(new Subscriber<TOpening>(subscription) {

@Override
public void onCompleted() {

}

@Override
public void call(TOpening opening) {
public void onError(Throwable e) {

}

@Override
public void onNext(TOpening opening) {
final Chunk<T, C> chunk = chunks.createChunk();
Observable<? extends TClosing> closingObservable = chunkClosingSelector.call(opening);

closingObservable.subscribe(new Action1<TClosing>() {
closingObservable.unsafeSubscribe(new Subscriber<TClosing>() {

@Override
public void call(TClosing closing) {
chunks.emitChunk(chunk);
public void onCompleted() {

}
});

@Override
public void onError(Throwable e) {

}

@Override
public void onNext(TClosing t) {
chunks.emitChunk(chunk);
}

});
}
}));

});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package rx.operators;

import java.io.ObjectStreamException;
import java.io.Serializable;

import rx.Notification;
Expand Down
Loading