Skip to content

Commit

Permalink
3.x: Sync up with 2.2.10 snapshot (#6507)
Browse files Browse the repository at this point in the history
* 3.x: Sync up with 2.2.10 snapshot

* Add the Flowable javadoc changes too.
  • Loading branch information
akarnokd committed Jun 17, 2019
1 parent b95e3dc commit 2bed8c1
Show file tree
Hide file tree
Showing 13 changed files with 1,515 additions and 29 deletions.
1,329 changes: 1,329 additions & 0 deletions CHANGES.md

Large diffs are not rendered by default.

33 changes: 33 additions & 0 deletions docs/What's-different-in-3.0.md
@@ -0,0 +1,33 @@
Table of contents

# Introduction
TBD.

### API signature changes

TBD.

- as() merged into to()
- some operators returning a more appropriate Single or Maybe
- functional interfaces throws widening to Throwable
- standard methods removed
- standard methods signature changes

### Standardized operators

(former experimental and beta operators from 2.x)

TBD.

### Operator behavior changes

TBD.

- connectable sources lifecycle-fixes


### Test support changes

TBD.

- methods removed from the test consumers
2 changes: 2 additions & 0 deletions gradle/javadoc_cleanup.gradle
Expand Up @@ -12,6 +12,8 @@ task javadocCleanup(dependsOn: "javadoc") doLast {
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/subjects/ReplaySubject.html'));
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/processors/ReplayProcessor.html'));
fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/plugins/RxJavaPlugins.html'));

fixJavadocFile(rootProject.file('build/docs/javadoc/io/reactivex/parallel/ParallelFlowable.html'));
}

def fixJavadocFile(file) {
Expand Down
108 changes: 100 additions & 8 deletions src/main/java/io/reactivex/Flowable.java
Expand Up @@ -36,12 +36,12 @@
import io.reactivex.subscribers.*;

/**
* The Flowable class that implements the Reactive-Streams Pattern and offers factory methods,
* intermediate operators and the ability to consume reactive dataflows.
* The Flowable class that implements the <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a>
* Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
* <p>
* Reactive-Streams operates with {@code Publisher}s which {@code Flowable} extends. Many operators
* Reactive Streams operates with {@link Publisher}s which {@code Flowable} extends. Many operators
* therefore accept general {@code Publisher}s directly and allow direct interoperation with other
* Reactive-Streams implementations.
* Reactive Streams implementations.
* <p>
* The Flowable hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
Expand All @@ -51,11 +51,103 @@
* <p>
* <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png" alt="">
* <p>
* The {@code Flowable} follows the protocol
* <pre><code>
* onSubscribe onNext* (onError | onComplete)?
* </code></pre>
* where the stream can be disposed through the {@link Subscription} instance provided to consumers through
* {@link Subscriber#onSubscribe(Subscription)}.
* Unlike the {@code Observable.subscribe()} of version 1.x, {@link #subscribe(Subscriber)} does not allow external cancellation
* of a subscription and the {@link Subscriber} instance is expected to expose such capability if needed.
* <p>
* Flowables support backpressure and require {@link Subscriber}s to signal demand via {@link Subscription#request(long)}.
* <p>
* Example:
* <pre><code>
* Disposable d = Flowable.just("Hello world!")
* .delay(1, TimeUnit.SECONDS)
* .subscribeWith(new DisposableSubscriber&lt;String&gt;() {
* &#64;Override public void onStart() {
* System.out.println("Start!");
* request(1);
* }
* &#64;Override public void onNext(String t) {
* System.out.println(t);
* request(1);
* }
* &#64;Override public void onError(Throwable t) {
* t.printStackTrace();
* }
* &#64;Override public void onComplete() {
* System.out.println("Done!");
* }
* });
*
* Thread.sleep(500);
* // the sequence can now be cancelled via dispose()
* d.dispose();
* </code></pre>
* <p>
* The Reactive Streams specification is relatively strict when defining interactions between {@code Publisher}s and {@code Subscriber}s, so much so
* that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
* request amounts via {@link Subscription#request(long)}.
* Therefore, RxJava has introduced the {@link FlowableSubscriber} interface that indicates the consumer can be driven with relaxed rules.
* All RxJava operators are implemented with these relaxed rules in mind.
* If the subscribing {@code Subscriber} does not implement this interface, for example, due to it being from another Reactive Streams compliant
* library, the Flowable will automatically apply a compliance wrapper around it.
* <p>
* {@code Flowable} is an abstract class, but it is not advised to implement sources and custom operators by extending the class directly due
* to the large amounts of <a href="https://github.com/reactive-streams/reactive-streams-jvm#specification">Reactive Streams</a>
* rules to be followed to the letter. See <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">the wiki</a> for
* some guidance if such custom implementations are necessary.
* <p>
* The recommended way of creating custom {@code Flowable}s is by using the {@link #create(FlowableOnSubscribe, BackpressureStrategy)} factory method:
* <pre><code>
* Flowable&lt;String&gt; source = Flowable.create(new FlowableOnSubscribe&lt;String&gt;() {
* &#64;Override
* public void subscribe(FlowableEmitter&lt;String&gt; emitter) throws Exception {
*
* // signal an item
* emitter.onNext("Hello");
*
* // could be some blocking operation
* Thread.sleep(1000);
*
* // the consumer might have cancelled the flow
* if (emitter.isCancelled() {
* return;
* }
*
* emitter.onNext("World");
*
* Thread.sleep(1000);
*
* // the end-of-sequence has to be signaled, otherwise the
* // consumers may never finish
* emitter.onComplete();
* }
* }, BackpressureStrategy.BUFFER);
*
* System.out.println("Subscribe!");
*
* source.subscribe(System.out::println);
*
* System.out.println("Done!");
* </code></pre>
* <p>
* RxJava reactive sources, such as {@code Flowable}, are generally synchronous and sequential in nature. In the ReactiveX design, the location (thread)
* where operators run is <i>orthogonal</i> to when the operators can work with data. This means that asynchrony and parallelism
* has to be explicitly expressed via operators such as {@link #subscribeOn(Scheduler)}, {@link #observeOn(Scheduler)} and {@link #parallel()}. In general,
* operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
* <p>
* For more information see the <a href="http://reactivex.io/documentation/Publisher.html">ReactiveX
* documentation</a>.
*
* @param <T>
* the type of the items emitted by the Flowable
* @see Observable
* @see ParallelFlowable
* @see io.reactivex.subscribers.DisposableSubscriber
*/
public abstract class Flowable<T> implements Publisher<T> {
/** The default buffer size. */
Expand Down Expand Up @@ -2199,7 +2291,7 @@ public static <T> Flowable<T> fromIterable(Iterable<? extends T> source) {
}

/**
* Converts an arbitrary Reactive-Streams Publisher into a Flowable if not already a
* Converts an arbitrary Reactive Streams Publisher into a Flowable if not already a
* Flowable.
* <p>
* The {@link Publisher} must follow the
Expand Down Expand Up @@ -4385,7 +4477,7 @@ public static Flowable<Long> timer(long delay, TimeUnit unit, Scheduler schedule

/**
* Create a Flowable by wrapping a Publisher <em>which has to be implemented according
* to the Reactive-Streams specification by handling backpressure and
* to the Reactive Streams specification by handling backpressure and
* cancellation correctly; no safeguards are provided by the Flowable itself</em>.
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down Expand Up @@ -13569,7 +13661,7 @@ public final Flowable<T> retryWhen(
* Subscribes to the current Flowable and wraps the given Subscriber into a SafeSubscriber
* (if not already a SafeSubscriber) that
* deals with exceptions thrown by a misbehaving Subscriber (that doesn't follow the
* Reactive-Streams specification).
* Reactive Streams specification).
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>This operator leaves the reactive world and the backpressure behavior depends on the Subscriber's behavior.</dd>
Expand Down Expand Up @@ -14792,7 +14884,7 @@ public final void subscribe(Subscriber<? super T> s) {
* If the {@link Flowable} rejects the subscription attempt or otherwise fails it will signal
* the error via {@link FlowableSubscriber#onError(Throwable)}.
* <p>
* This subscribe method relaxes the following Reactive-Streams rules:
* This subscribe method relaxes the following Reactive Streams rules:
* <ul>
* <li>§1.3: onNext should not be called concurrently until onSubscribe returns.
* <b>FlowableSubscriber.onSubscribe should make sure a sync or async call triggered by request() is safe.</b></li>
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/reactivex/Observable.java
Expand Up @@ -42,7 +42,7 @@
* Many operators in the class accept {@code ObservableSource}(s), the base reactive interface
* for such non-backpressured flows, which {@code Observable} itself implements as well.
* <p>
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()},
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
* overloads that allow setting their internal buffer size explicitly.
* <p>
Expand Down
Expand Up @@ -501,13 +501,14 @@ public void onComplete() {
buffer = null;
}

queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this);
if (b != null) {
queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainMaxLoop(queue, downstream, false, this, this);
}
w.dispose();
}

w.dispose();
}

@Override
Expand Down
Expand Up @@ -504,10 +504,12 @@ public void onComplete() {
buffer = null;
}

queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainLoop(queue, downstream, false, this, this);
if (b != null) {
queue.offer(b);
done = true;
if (enter()) {
QueueDrainHelper.drainLoop(queue, downstream, false, this, this);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/reactivex/processors/package-info.java
Expand Up @@ -16,7 +16,7 @@

/**
* Classes representing so-called hot backpressure-aware sources, aka <strong>processors</strong>,
* that implement the {@link FlowableProcessor} class,
* that implement the {@link io.reactivex.processors.FlowableProcessor FlowableProcessor} class,
* the Reactive Streams {@link org.reactivestreams.Processor Processor} interface
* to allow forms of multicasting events to one or more subscribers as well as consuming another
* Reactive Streams {@link org.reactivestreams.Publisher Publisher}.
Expand All @@ -33,7 +33,7 @@
* </ul>
* <p>
* The non-backpressured variants of the {@code FlowableProcessor} class are called
* {@link io.reactivex.Subject}s and reside in the {@code io.reactivex.subjects} package.
* {@link io.reactivex.subjects.Subject}s and reside in the {@code io.reactivex.subjects} package.
* @see io.reactivex.subjects
*/
package io.reactivex.processors;
10 changes: 5 additions & 5 deletions src/main/java/io/reactivex/schedulers/Schedulers.java
Expand Up @@ -299,7 +299,7 @@ public static Scheduler single() {
* a time delay or periodically will use the {@link #single()} scheduler for the timed waiting
* before posting the actual task to the given executor.
* <p>
* Tasks submitted to the {@link Scheduler.Worker} of this {@code Scheduler} are also not interruptible. Use the
* Tasks submitted to the {@link io.reactivex.Scheduler.Worker Scheduler.Worker} of this {@code Scheduler} are also not interruptible. Use the
* {@link #from(Executor, boolean)} overload to enable task interruption via this wrapper.
* <p>
* If the provided executor supports the standard Java {@link ExecutorService} API,
Expand Down Expand Up @@ -332,7 +332,7 @@ public static Scheduler single() {
* }
* </code></pre>
* <p>
* This type of scheduler is less sensitive to leaking {@link Scheduler.Worker} instances, although
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
* execute those tasks "unexpectedly".
* <p>
Expand All @@ -350,7 +350,7 @@ public static Scheduler from(@NonNull Executor executor) {
* Wraps an {@link Executor} into a new Scheduler instance and delegates {@code schedule()}
* calls to it.
* <p>
* The tasks scheduled by the returned {@link Scheduler} and its {@link Scheduler.Worker}
* The tasks scheduled by the returned {@link Scheduler} and its {@link io.reactivex.Scheduler.Worker Scheduler.Worker}
* can be optionally interrupted.
* <p>
* If the provided executor doesn't support any of the more specific standard Java executor
Expand Down Expand Up @@ -388,14 +388,14 @@ public static Scheduler from(@NonNull Executor executor) {
* }
* </code></pre>
* <p>
* This type of scheduler is less sensitive to leaking {@link Scheduler.Worker} instances, although
* This type of scheduler is less sensitive to leaking {@link io.reactivex.Scheduler.Worker Scheduler.Worker} instances, although
* not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or
* execute those tasks "unexpectedly".
* <p>
* Note that this method returns a new {@link Scheduler} instance, even for the same {@link Executor} instance.
* @param executor
* the executor to wrap
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link Scheduler.Worker} will
* @param interruptibleWorker if {@code true} the tasks submitted to the {@link io.reactivex.Scheduler.Worker Scheduler.Worker} will
* be interrupted when the task is disposed.
* @return the new Scheduler wrapping the Executor
* @since 2.2.6 - experimental
Expand Down
Expand Up @@ -2769,4 +2769,19 @@ public void timedSizeBufferAlreadyCleared() {

sub.run();
}

@Test
public void bufferExactFailingSupplier() {
Flowable.empty()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {
@Override
public List<Object> call() throws Exception {
throw new TestException();
}
}, false)
.test()
.awaitDone(1, TimeUnit.SECONDS)
.assertFailure(TestException.class)
;
}
}
Expand Up @@ -19,8 +19,6 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.observers.TestObserver;
import io.reactivex.plugins.RxJavaPlugins;
import io.reactivex.subjects.PublishSubject;
import org.junit.Test;

import java.util.List;
Expand Down
Expand Up @@ -2136,4 +2136,19 @@ public ObservableSource<List<Object>> apply(Observable<Object> o)
}
});
}

@Test
public void bufferExactFailingSupplier() {
Observable.empty()
.buffer(1, TimeUnit.SECONDS, Schedulers.computation(), 10, new Callable<List<Object>>() {
@Override
public List<Object> call() throws Exception {
throw new TestException();
}
}, false)
.test()
.awaitDone(1, TimeUnit.SECONDS)
.assertFailure(TestException.class)
;
}
}
Expand Up @@ -19,7 +19,6 @@
import io.reactivex.exceptions.TestException;
import io.reactivex.functions.Action;
import io.reactivex.observers.TestObserver;
import org.junit.Assert;
import org.junit.Test;

import java.util.List;
Expand Down

0 comments on commit 2bed8c1

Please sign in to comment.