Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Improve JavaDocs of Observable and fix similar issues elsewhere #6831

Merged
merged 3 commits into from
Jan 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ public static Completable create(@NonNull CompletableOnSubscribe source) {
* when the {@code Completable} is subscribed to.
* @return the created {@code Completable} instance
* @throws NullPointerException if {@code source} is {@code null}
* @throws IllegalArgumentException if {@code source} is a {@code Completable}
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -364,7 +365,7 @@ public static Completable unsafeCreate(@NonNull CompletableSource source) {
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static Completable defer(@NonNull Supplier<? extends CompletableSource> completableSupplier) {
Objects.requireNonNull(completableSupplier, "completableSupplier");
Objects.requireNonNull(completableSupplier, "completableSupplier is null");
return RxJavaPlugins.onAssembly(new CompletableDefer(completableSupplier));
}

Expand Down
228 changes: 123 additions & 105 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,14 @@ public static <T> Maybe<T> amb(@NonNull Iterable<? extends MaybeSource<? extends
* @param sources the array of sources. A subscription to each source will
* occur in the same order as in the array.
* @return the new {@code Maybe} instance
* @throws NullPointerException if {@code sources} is {@code null}
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
@SafeVarargs
public static <T> Maybe<T> ambArray(@NonNull MaybeSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return empty();
}
Expand Down Expand Up @@ -402,6 +404,7 @@ public static <T> Flowable<T> concatArray(@NonNull MaybeSource<? extends T>... s
@SafeVarargs
@NonNull
public static <T> Flowable<T> concatArrayDelayError(@NonNull MaybeSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return Flowable.empty();
} else
Expand Down Expand Up @@ -1364,6 +1367,7 @@ public static <T> Flowable<T> mergeArray(MaybeSource<? extends T>... sources) {
@SafeVarargs
@NonNull
public static <T> Flowable<T> mergeArrayDelayError(@NonNull MaybeSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return Flowable.empty();
}
Expand Down Expand Up @@ -1764,6 +1768,8 @@ public static Maybe<Long> timer(long delay, @NonNull TimeUnit unit, @NonNull Sch
* @param <T> the value type
* @param onSubscribe the function that is called with the subscribing {@code MaybeObserver}
* @return the new {@code Maybe} instance
* @throws IllegalArgumentException if {@code onSubscribe} is a {@code Maybe}
* @throws NullPointerException if {@code onSubscribe} is {@code null}
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -1858,6 +1864,7 @@ public static <T, D> Maybe<T> using(@NonNull Supplier<? extends D> resourceSuppl
* @param <T> the value type
* @param source the source to wrap
* @return the {@code Maybe} wrapper or the source cast to {@code Maybe} (if possible)
* @throws NullPointerException if {@code source} is {@code null}
*/
@CheckReturnValue
@NonNull
Expand Down Expand Up @@ -3522,7 +3529,7 @@ public final Single<Boolean> isEmpty() {
* if (str.length() &lt; 2) {
* downstream.onSuccess(str);
* } else {
* // Maybe i {@code Maybe} ly expected to produce one of the onXXX events
* // Maybe is expected to produce one of the onXXX events only
* downstream.onComplete();
* }
* }
Expand Down
6,690 changes: 3,593 additions & 3,097 deletions src/main/java/io/reactivex/rxjava3/core/Observable.java

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,13 +152,15 @@ public static <T> Single<T> amb(@NonNull Iterable<? extends SingleSource<? exten
* @param sources the array of sources. A subscription to each source will
* occur in the same order as in this array.
* @return the new {@code Single} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @since 2.0
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
@NonNull
public static <T> Single<T> ambArray(@NonNull SingleSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return error(SingleInternalHelper.emptyThrower());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,17 @@ private FlowableBlockingSubscribe() {
/**
* Subscribes to the source and calls the Subscriber methods on the current thread.
* <p>
* @param o the source publisher
* @param source the source publisher
* The cancellation and backpressure is composed through.
* @param subscriber the subscriber to forward events and calls to in the current thread
* @param <T> the value type
*/
public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T> subscriber) {
public static <T> void subscribe(Publisher<? extends T> source, Subscriber<? super T> subscriber) {
final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();

BlockingSubscriber<T> bs = new BlockingSubscriber<>(queue);

o.subscribe(bs);
source.subscribe(bs);

try {
for (;;) {
Expand Down Expand Up @@ -77,15 +77,15 @@ public static <T> void subscribe(Publisher<? extends T> o, Subscriber<? super T>

/**
* Runs the source observable to a terminal event, ignoring any values and rethrowing any exception.
* @param o the source publisher
* @param source the source to await
* @param <T> the value type
*/
public static <T> void subscribe(Publisher<? extends T> o) {
public static <T> void subscribe(Publisher<? extends T> source) {
BlockingIgnoringReceiver callback = new BlockingIgnoringReceiver();
LambdaSubscriber<T> ls = new LambdaSubscriber<>(Functions.emptyConsumer(),
callback, callback, Functions.REQUEST_MAX);

o.subscribe(ls);
source.subscribe(ls);

BlockingHelper.awaitForComplete(callback, ls);
Throwable e = callback.error;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ private ObservableBlockingSubscribe() {
* The call to dispose() is composed through.
* @param observer the subscriber to forward events and calls to in the current thread
* @param <T> the value type
* @throws NullPointerException if {@code observer} is {@code null}
*/
public static <T> void subscribe(ObservableSource<? extends T> o, Observer<? super T> observer) {
final BlockingQueue<Object> queue = new LinkedBlockingQueue<>();
Expand Down
33 changes: 19 additions & 14 deletions src/main/java/io/reactivex/rxjava3/parallel/ParallelFlowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,11 @@ public abstract class ParallelFlowable<@NonNull T> {
*
* @param subscribers the array of Subscribers
* @return true if the number of subscribers equals to the parallelism level
* @throws NullPointerException if {@code subscribers} is {@code null}
* @throws IllegalArgumentException if {@code subscribers.length} is different from {@link #parallelism()}
*/
protected final boolean validate(@NonNull Subscriber<?>[] subscribers) {
Objects.requireNonNull(subscribers, "subscribers is null");
int p = parallelism();
if (subscribers.length != p) {
Throwable iae = new IllegalArgumentException("parallelism = " + p + ", subscribers = " + subscribers.length);
Expand Down Expand Up @@ -156,7 +159,7 @@ public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> sourc
@BackpressureSupport(BackpressureKind.FULL)
public static <@NonNull T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source,
int parallelism, int prefetch) {
Objects.requireNonNull(source, "source");
Objects.requireNonNull(source, "source is null");
ObjectHelper.verifyPositive(parallelism, "parallelism");
ObjectHelper.verifyPositive(prefetch, "prefetch");

Expand All @@ -183,7 +186,7 @@ public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> sourc
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ParallelMap<>(this, mapper));
}

Expand Down Expand Up @@ -212,7 +215,7 @@ public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper, @NonNull ParallelFailureHandling errorHandler) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTry<>(this, mapper, errorHandler));
}
Expand Down Expand Up @@ -243,7 +246,7 @@ public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTry<>(this, mapper, errorHandler));
}
Expand All @@ -267,7 +270,7 @@ public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate) {
Objects.requireNonNull(predicate, "predicate");
Objects.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new ParallelFilter<>(this, predicate));
}

Expand Down Expand Up @@ -295,7 +298,7 @@ public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate)
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull ParallelFailureHandling errorHandler) {
Objects.requireNonNull(predicate, "predicate");
Objects.requireNonNull(predicate, "predicate is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelFilterTry<>(this, predicate, errorHandler));
}
Expand Down Expand Up @@ -325,7 +328,7 @@ public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate,
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
Objects.requireNonNull(predicate, "predicate");
Objects.requireNonNull(predicate, "predicate is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelFilterTry<>(this, predicate, errorHandler));
}
Expand Down Expand Up @@ -401,7 +404,7 @@ public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler) {
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
Objects.requireNonNull(scheduler, "scheduler");
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new ParallelRunOn<>(this, scheduler, prefetch));
}
Expand All @@ -426,7 +429,7 @@ public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetc
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final Flowable<T> reduce(@NonNull BiFunction<T, T, T> reducer) {
Objects.requireNonNull(reducer, "reducer");
Objects.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new ParallelReduceFull<>(this, reducer));
}

Expand All @@ -453,8 +456,8 @@ public final Flowable<T> reduce(@NonNull BiFunction<T, T, T> reducer) {
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> ParallelFlowable<R> reduce(@NonNull Supplier<R> initialSupplier, @NonNull BiFunction<R, ? super T, R> reducer) {
Objects.requireNonNull(initialSupplier, "initialSupplier");
Objects.requireNonNull(reducer, "reducer");
Objects.requireNonNull(initialSupplier, "initialSupplier is null");
Objects.requireNonNull(reducer, "reducer is null");
return RxJavaPlugins.onAssembly(new ParallelReduce<>(this, initialSupplier, reducer));
}

Expand Down Expand Up @@ -1024,13 +1027,15 @@ public final <C> ParallelFlowable<C> collect(@NonNull Supplier<? extends C> coll
* @param <T> the value type
* @param publishers the array of publishers
* @return the new ParallelFlowable instance
* @throws IllegalArgumentException if {@code publishers} is an empty array
*/
@CheckReturnValue
@NonNull
@SafeVarargs
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
public static <@NonNull T> ParallelFlowable<T> fromArray(@NonNull Publisher<T>... publishers) {
Objects.requireNonNull(publishers, "publishers is null");
if (publishers.length == 0) {
throw new IllegalArgumentException("Zero publishers not supported");
}
Expand Down Expand Up @@ -1427,7 +1432,7 @@ public final <U> ParallelFlowable<U> flatMapIterable(@NonNull Function<? super T
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Optional<? extends R>> mapper) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ParallelMapOptional<>(this, mapper));
}

Expand Down Expand Up @@ -1456,7 +1461,7 @@ public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Op
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Optional<? extends R>> mapper, @NonNull ParallelFailureHandling errorHandler) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTryOptional<>(this, mapper, errorHandler));
}
Expand Down Expand Up @@ -1487,7 +1492,7 @@ public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Op
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
public final <R> ParallelFlowable<R> mapOptional(@NonNull Function<? super T, Optional<? extends R>> mapper, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
Objects.requireNonNull(mapper, "mapper");
Objects.requireNonNull(mapper, "mapper is null");
Objects.requireNonNull(errorHandler, "errorHandler is null");
return RxJavaPlugins.onAssembly(new ParallelMapTryOptional<>(this, mapper, errorHandler));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public void skipLastWithBackpressure() {

}

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void skipLastWithNegativeCount() {
Flowable.just("one").skipLast(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public void takeLastWithZeroCount() {
verify(subscriber, times(1)).onComplete();
}

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void takeLastWithNegativeCount() {
Flowable.just("one").takeLast(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@

public class FlowableTakeLastTimedTest extends RxJavaTest {

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void takeLastTimedWithNegativeCount() {
Flowable.just("one").takeLast(-1, 1, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public void badCapacityHint() throws Exception {
try {
Observable.concatEager(Arrays.asList(source, source, source), 1, -99);
} catch (IllegalArgumentException ex) {
assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
assertEquals("bufferSize > 0 required but it was -99", ex.getMessage());
}

}
Expand All @@ -547,7 +547,7 @@ public void mappingBadCapacityHint() throws Exception {
try {
Observable.just(source, source, source).concatMapEager((Function)Functions.identity(), 10, -99);
} catch (IllegalArgumentException ex) {
assertEquals("prefetch > 0 required but it was -99", ex.getMessage());
assertEquals("bufferSize > 0 required but it was -99", ex.getMessage());
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void skipLastWithBackpressure() {

}

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void skipLastWithNegativeCount() {
Observable.just("one").skipLast(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public void takeLastWithZeroCount() {
verify(observer, times(1)).onComplete();
}

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void takeLastWithNegativeCount() {
Observable.just("one").takeLast(-1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@

public class ObservableTakeLastTimedTest extends RxJavaTest {

@Test(expected = IndexOutOfBoundsException.class)
@Test(expected = IllegalArgumentException.class)
public void takeLastTimedWithNegativeCount() {
Observable.just("one").takeLast(-1, 1, TimeUnit.SECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ public void checkMaybe() throws Exception {
checkSource("Maybe", "io.reactivex.rxjava3.core");
}

@Test
public void checkObservable() throws Exception {
checkSource("Observable", "io.reactivex.rxjava3.core");
}

static void checkSource(String baseClassName, String packageName) throws Exception {
File f = TestHelper.findSource(baseClassName);
File f = TestHelper.findSource(baseClassName, packageName);
if (f == null) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -990,5 +990,5 @@ static void backpressureMentionedWithoutAnnotation(StringBuilder e, RxMethod m,
}
}

static final String[] AT_RETURN_WORDS = { "@return a ", "@return the new ", "@return a new " };
static final String[] AT_RETURN_WORDS = { "@return a ", "@return an ", "@return the new ", "@return a new " };
}
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,12 @@ void checkClass(Class<?> clazz) {
error = ex;
}

if (!success && error.getCause() instanceof NullPointerException) {
if (!error.getCause().toString().contains("is null")) {
fail++;
b.append("\r\nNPEs should indicate which argument failed: " + m + " # " + i + " = " + p + ", tag = " + tag + ", params = " + Arrays.toString(callParams2));
}
}
if (success != shouldSucceed) {
fail++;
if (shouldSucceed) {
Expand Down
Loading