Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -272,9 +272,9 @@ public static <T> Flowable<T> concat(Publisher<? extends MaybeSource<? extends T

/**
* Concatenate the single values, in a non-overlapping fashion, of the MaybeSource sources in the array.
* <dl>
* <p>
* <img width="640" height="526" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.concatArray.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The returned {@code Flowable} honors the backpressure of the downstream consumer.</dd>
* <dt><b>Scheduler:</b></dt>
Expand Down
1 change: 0 additions & 1 deletion src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -13175,7 +13175,6 @@ public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler
* will be emitted by the resulting ObservableSource.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/throttleWithTimeout.png" alt="">
* <p>
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code throttleWithTimeout} operates by default on the {@code computation} {@link Scheduler}.</dd>
Expand Down
109 changes: 95 additions & 14 deletions src/main/java/io/reactivex/processors/AsyncProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,90 @@
* <p>
* <img width="640" height="239" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/AsyncProcessor.png" alt="">
* <p>
* The implementation of onXXX methods are technically thread-safe but non-serialized calls
* to them may lead to undefined state in the currently subscribed Subscribers.
* This processor does not have a public constructor by design; a new empty instance of this
* {@code AsyncProcessor} can be created via the {@link #create()} method.
* <p>
* Since an {@code AsyncProcessor} is a Reactive Streams {@code Processor} type,
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>)
* as parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a
* {@link NullPointerException} being thrown and the processor's state is not changed.
* <p>
* {@code AsyncProcessor} is a {@link io.reactivex.Flowable} as well as a {@link FlowableProcessor} and supports backpressure from the downstream but
* its {@link Subscriber}-side consumes items in an unbounded manner.
* <p>
* When this {@code AsyncProcessor} is terminated via {@link #onError(Throwable)}, the
* last observed item (if any) is cleared and late {@link Subscriber}s only receive
* the {@code onError} event.
* <p>
* The {@code AsyncProcessor} caches the latest item internally and it emits this item only when {@code onComplete} is called.
* Therefore, it is not recommended to use this {@code Processor} with infinite or never-completing sources.
* <p>
* Even though {@code AsyncProcessor} implements the {@link Subscriber} interface, calling
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
* if the processor is used as a standalone source. However, calling {@code onSubscribe}
* after the {@code AsyncProcessor} reached its terminal state will result in the
* given {@link Subscription} being canceled immediately.
* <p>
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).
* The implementation of {@code onXXX} methods are technically thread-safe but non-serialized calls
* to them may lead to undefined state in the currently subscribed {@code Subscriber}s.
* <p>
* This {@code AsyncProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the very last observed value -
* after this {@code AsyncProcessor} has been completed - in a non-blocking and thread-safe
* manner via {@link #hasValue()}, {@link #getValue()}, {@link #getValues()} or {@link #getValues(Object[])}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code AsyncProcessor} honors the backpressure of the downstream {@code Subscriber}s and won't emit
* its single value to a particular {@code Subscriber} until that {@code Subscriber} has requested an item.
* When the {@code AsyncProcessor} is subscribed to a {@link io.reactivex.Flowable}, the processor consumes this
* {@code Flowable} in an unbounded manner (requesting `Long.MAX_VALUE`) as only the very last upstream item is
* retained by it.
* </dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code AsyncProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and
* the {@code Subscriber}s get notified on the thread where the terminating {@code onError} or {@code onComplete}
* methods were invoked.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>When the {@link #onError(Throwable)} is called, the {@code AsyncProcessor} enters into a terminal state
* and emits the same {@code Throwable} instance to the last set of {@code Subscriber}s. During this emission,
* if one or more {@code Subscriber}s dispose their respective {@code Subscription}s, the
* {@code Throwable} is delivered to the global error handler via
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Subscriber}s
* cancel at once).
* If there were no {@code Subscriber}s subscribed to this {@code AsyncProcessor} when the {@code onError()}
* was called, the global error handler is not invoked.
* </dd>
* </dl>
* <p>
* Example usage:
* <pre><code>
* AsyncProcessor&lt;Object&gt; processor = AsyncProcessor.create();
*
* TestSubscriber&lt;Object&gt; ts1 = processor.test();
*
* ts1.assertEmpty();
*
* processor.onNext(1);
*
* // AsyncProcessor only emits when onComplete was called.
* ts1.assertEmpty();
*
* processor.onNext(2);
* processor.onComplete();
*
* // onComplete triggers the emission of the last cached item and the onComplete event.
* ts1.assertResult(2);
*
* TestSubscriber&lt;Object&gt; ts2 = processor.test();
*
* // late Subscribers receive the last cached item too
* ts2.assertResult(2);
* </code></pre>
* @param <T> the value type
*/
public final class AsyncProcessor<T> extends FlowableProcessor<T> {
Expand Down Expand Up @@ -75,7 +156,7 @@ public void onSubscribe(Subscription s) {
s.cancel();
return;
}
// PublishSubject doesn't bother with request coordination.
// AsyncProcessor doesn't bother with request coordination.
s.request(Long.MAX_VALUE);
}

Expand Down Expand Up @@ -168,9 +249,9 @@ protected void subscribeActual(Subscriber<? super T> s) {

/**
* Tries to add the given subscriber to the subscribers array atomically
* or returns false if the subject has terminated.
* or returns false if the processor has terminated.
* @param ps the subscriber to add
* @return true if successful, false if the subject has terminated
* @return true if successful, false if the processor has terminated
*/
boolean add(AsyncSubscription<T> ps) {
for (;;) {
Expand All @@ -192,8 +273,8 @@ boolean add(AsyncSubscription<T> ps) {
}

/**
* Atomically removes the given subscriber if it is subscribed to the subject.
* @param ps the subject to remove
* Atomically removes the given subscriber if it is subscribed to this processor.
* @param ps the subscriber's subscription wrapper to remove
*/
@SuppressWarnings("unchecked")
void remove(AsyncSubscription<T> ps) {
Expand Down Expand Up @@ -232,28 +313,28 @@ void remove(AsyncSubscription<T> ps) {
}

/**
* Returns true if the subject has any value.
* Returns true if this processor has any value.
* <p>The method is thread-safe.
* @return true if the subject has any value
* @return true if this processor has any value
*/
public boolean hasValue() {
return subscribers.get() == TERMINATED && value != null;
}

/**
* Returns a single value the Subject currently has or null if no such value exists.
* Returns a single value this processor currently has or null if no such value exists.
* <p>The method is thread-safe.
* @return a single value the Subject currently has or null if no such value exists
* @return a single value this processor currently has or null if no such value exists
*/
@Nullable
public T getValue() {
return subscribers.get() == TERMINATED ? value : null;
}

/**
* Returns an Object array containing snapshot all values of the Subject.
* Returns an Object array containing snapshot all values of this processor.
* <p>The method is thread-safe.
* @return the array containing the snapshot of all values of the Subject
* @return the array containing the snapshot of all values of this processor
* @deprecated in 2.1.14; put the result of {@link #getValue()} into an array manually, will be removed in 3.x
*/
@Deprecated
Expand All @@ -263,7 +344,7 @@ public Object[] getValues() {
}

/**
* Returns a typed array containing a snapshot of all values of the Subject.
* Returns a typed array containing a snapshot of all values of this processor.
* <p>The method follows the conventions of Collection.toArray by setting the array element
* after the last value to null (if the capacity permits).
* <p>The method is thread-safe.
Expand Down
20 changes: 11 additions & 9 deletions src/main/java/io/reactivex/processors/BehaviorProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,11 +85,13 @@
* after the {@code BehaviorProcessor} reached its terminal state will result in the
* given {@code Subscription} being cancelled immediately.
* <p>
* Calling {@link #onNext(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
* Calling {@link #onNext(Object)}, {@link #offer(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
* through external means of serialization). The {@link #toSerialized()} method available to all {@code FlowableProcessor}s
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).
* Note that serializing over {@link #offer(Object)} is not supported through {@code toSerialized()} because it is a method
* available on the {@code PublishProcessor} and {@code BehaviorProcessor} classes only.
* <p>
* This {@code BehaviorProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasSubscribers()} as well as means to read the latest observed value
Expand Down Expand Up @@ -127,34 +129,34 @@
* Example usage:
* <pre> {@code

// observer will receive all events.
// subscriber will receive all events.
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
processor.subscribe(observer);
processor.subscribe(subscriber);
processor.onNext("one");
processor.onNext("two");
processor.onNext("three");

// observer will receive the "one", "two" and "three" events, but not "zero"
// subscriber will receive the "one", "two" and "three" events, but not "zero"
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
processor.onNext("zero");
processor.onNext("one");
processor.subscribe(observer);
processor.subscribe(subscriber);
processor.onNext("two");
processor.onNext("three");

// observer will receive only onComplete
// subscriber will receive only onComplete
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
processor.onNext("zero");
processor.onNext("one");
processor.onComplete();
processor.subscribe(observer);
processor.subscribe(subscriber);

// observer will receive only onError
// subscriber will receive only onError
BehaviorProcessor<Object> processor = BehaviorProcessor.create("default");
processor.onNext("zero");
processor.onNext("one");
processor.onError(new RuntimeException("error"));
processor.subscribe(observer);
processor.subscribe(subscriber);
} </pre>
*
* @param <T>
Expand Down
85 changes: 68 additions & 17 deletions src/main/java/io/reactivex/processors/PublishProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,67 @@
*
* <p>
* <img width="640" height="278" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/PublishProcessor.png" alt="">
*
* <p>The processor does not coordinate backpressure for its subscribers and implements a weaker onSubscribe which
* calls requests Long.MAX_VALUE from the incoming Subscriptions. This makes it possible to subscribe the PublishProcessor
* to multiple sources (note on serialization though) unlike the standard Subscriber contract. Child subscribers, however, are not overflown but receive an
* IllegalStateException in case their requested amount is zero.
*
* <p>The implementation of onXXX methods are technically thread-safe but non-serialized calls
* to them may lead to undefined state in the currently subscribed Subscribers.
*
* <p>Due to the nature Flowables are constructed, the PublishProcessor can't be instantiated through
* {@code new} but must be created via the {@link #create()} method.
* <p>
* This processor does not have a public constructor by design; a new empty instance of this
* {@code PublishProcessor} can be created via the {@link #create()} method.
* <p>
* Since a {@code PublishProcessor} is a Reactive Streams {@code Processor} type,
* {@code null}s are not allowed (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Rule 2.13</a>) as
* parameters to {@link #onNext(Object)} and {@link #onError(Throwable)}. Such calls will result in a
* {@link NullPointerException} being thrown and the processor's state is not changed.
* <p>
* {@code PublishProcessor} is a {@link io.reactivex.Flowable} as well as a {@link FlowableProcessor},
* however, it does not coordinate backpressure between different subscribers and between an
* upstream source and a subscriber. If an upstream item is received via {@link #onNext(Object)}, if
* a subscriber is not ready to receive an item, that subscriber is terminated via a {@link MissingBackpressureException}.
* To avoid this case, use {@link #offer(Object)} and retry sometime later if it returned false.
* The {@code PublishProcessor}'s {@link Subscriber}-side consumes items in an unbounded manner.
* <p>
* For a multicasting processor type that also coordinates between the downstream {@code Subscriber}s and the upstream
* source as well, consider using {@link MulticastProcessor}.
* <p>
* When this {@code PublishProcessor} is terminated via {@link #onError(Throwable)} or {@link #onComplete()},
* late {@link Subscriber}s only receive the respective terminal event.
* <p>
* Unlike a {@link BehaviorProcessor}, a {@code PublishProcessor} doesn't retain/cache items, therefore, a new
* {@code Subscriber} won't receive any past items.
* <p>
* Even though {@code PublishProcessor} implements the {@link Subscriber} interface, calling
* {@code onSubscribe} is not required (<a href="https://github.com/reactive-streams/reactive-streams-jvm#2.12">Rule 2.12</a>)
* if the processor is used as a standalone source. However, calling {@code onSubscribe}
* after the {@code PublishProcessor} reached its terminal state will result in the
* given {@link Subscription} being canceled immediately.
* <p>
* Calling {@link #onNext(Object)}, {@link #offer(Object)}, {@link #onError(Throwable)} and {@link #onComplete()}
* is required to be serialized (called from the same thread or called non-overlappingly from different threads
* through external means of serialization). The {@link #toSerialized()} method available to all {@link FlowableProcessor}s
* provides such serialization and also protects against reentrance (i.e., when a downstream {@code Subscriber}
* consuming this processor also wants to call {@link #onNext(Object)} on this processor recursively).
* Note that serializing over {@link #offer(Object)} is not supported through {@code toSerialized()} because it is a method
* available on the {@code PublishProcessor} and {@code BehaviorProcessor} classes only.
* <p>
* This {@code PublishProcessor} supports the standard state-peeking methods {@link #hasComplete()}, {@link #hasThrowable()},
* {@link #getThrowable()} and {@link #hasSubscribers()}.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The processor does not coordinate backpressure for its subscribers and implements a weaker {@code onSubscribe} which
* calls requests Long.MAX_VALUE from the incoming Subscriptions. This makes it possible to subscribe the {@code PublishProcessor}
* to multiple sources (note on serialization though) unlike the standard {@code Subscriber} contract. Child subscribers, however, are not overflown but receive an
* {@link IllegalStateException} in case their requested amount is zero.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code PublishProcessor} does not operate by default on a particular {@link io.reactivex.Scheduler} and
* the {@code Subscriber}s get notified on the thread the respective {@code onXXX} methods were invoked.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>When the {@link #onError(Throwable)} is called, the {@code PublishProcessor} enters into a terminal state
* and emits the same {@code Throwable} instance to the last set of {@code Subscriber}s. During this emission,
* if one or more {@code Subscriber}s cancel their respective {@code Subscription}s, the
* {@code Throwable} is delivered to the global error handler via
* {@link io.reactivex.plugins.RxJavaPlugins#onError(Throwable)} (multiple times if multiple {@code Subscriber}s
* cancel at once).
* If there were no {@code Subscriber}s subscribed to this {@code PublishProcessor} when the {@code onError()}
* was called, the global error handler is not invoked.
* </dd>
* </dl>
*
* Example usage:
* <pre> {@code
Expand All @@ -55,6 +105,7 @@

} </pre>
* @param <T> the value type multicasted to Subscribers.
* @see MulticastProcessor
*/
public final class PublishProcessor<T> extends FlowableProcessor<T> {
/** The terminated indicator for the subscribers array. */
Expand Down Expand Up @@ -113,9 +164,9 @@ protected void subscribeActual(Subscriber<? super T> t) {

/**
* Tries to add the given subscriber to the subscribers array atomically
* or returns false if the subject has terminated.
* or returns false if this processor has terminated.
* @param ps the subscriber to add
* @return true if successful, false if the subject has terminated
* @return true if successful, false if this processor has terminated
*/
boolean add(PublishSubscription<T> ps) {
for (;;) {
Expand All @@ -137,8 +188,8 @@ boolean add(PublishSubscription<T> ps) {
}

/**
* Atomically removes the given subscriber if it is subscribed to the subject.
* @param ps the subject to remove
* Atomically removes the given subscriber if it is subscribed to this processor.
* @param ps the subscription wrapping a subscriber to remove
*/
@SuppressWarnings("unchecked")
void remove(PublishSubscription<T> ps) {
Expand Down Expand Up @@ -182,7 +233,7 @@ public void onSubscribe(Subscription s) {
s.cancel();
return;
}
// PublishSubject doesn't bother with request coordination.
// PublishProcessor doesn't bother with request coordination.
s.request(Long.MAX_VALUE);
}

Expand Down Expand Up @@ -288,7 +339,7 @@ static final class PublishSubscription<T> extends AtomicLong implements Subscrip
private static final long serialVersionUID = 3562861878281475070L;
/** The actual subscriber. */
final Subscriber<? super T> actual;
/** The subject state. */
/** The parent processor servicing this subscriber. */
final PublishProcessor<T> parent;

/**
Expand Down
Loading