diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java index ce02849d56..5e676681ff 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java +++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java @@ -1415,7 +1415,7 @@ public static Flowable merge( } /** - * Merges an array sequence of {@link MaybeSource} instances into a single {@link Flowable} sequence, + * Merges an array of {@link MaybeSource} instances into a single {@link Flowable} sequence, * running all {@code MaybeSource}s at once. *

* @@ -1470,10 +1470,10 @@ public static Flowable mergeArray(MaybeSource... sources) { * *

* This behaves like {@link #merge(Publisher)} except that if any of the merged {@code MaybeSource}s notify of an - * error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that + * error via {@link Subscriber#onError onError}, {@code mergeArrayDelayError} will refrain from propagating that * error notification until all of the merged {@code MaybeSource}s have finished emitting items. *

- * Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only + * Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeArrayDelayError} will only * invoke the {@code onError} method of its subscribers once. *

*
Backpressure:
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java index 9f5c3446c5..5e9b084ad2 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Single.java +++ b/src/main/java/io/reactivex/rxjava3/core/Single.java @@ -17,7 +17,7 @@ import java.util.concurrent.*; import java.util.stream.*; -import org.reactivestreams.Publisher; +import org.reactivestreams.*; import io.reactivex.rxjava3.annotations.*; import io.reactivex.rxjava3.disposables.Disposable; @@ -1323,6 +1323,81 @@ public static Flowable merge( return merge(Flowable.fromArray(source1, source2, source3, source4)); } + /** + * Merges an array of {@link SingleSource} instances into a single {@link Flowable} sequence, + * running all {@code SingleSource}s at once. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code mergeArray} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If any of the source {@code SingleSource}s signal a {@link Throwable} via {@code onError}, the resulting + * {@code Flowable} terminates with that {@code Throwable} and all other source {@code SingleSource}s are disposed. + * If more than one {@code SingleSource} signals an error, the resulting {@code Flowable} may terminate with the + * first one's error or, depending on the concurrency of the sources, may terminate with a + * {@link CompositeException} containing two or more of the various error signals. + * {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via + * {@link RxJavaPlugins#onError(Throwable)} method as {@link UndeliverableException} errors. Similarly, {@code Throwable}s + * signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a + * (composite) error will be sent to the same global error handler. + * Use {@link #mergeArrayDelayError(SingleSource...)} to merge sources and terminate only when all source {@code SingleSource}s + * have completed or failed with an error. + *
+ *
+ * @param the common and resulting value type + * @param sources the array sequence of {@code SingleSource} sources + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @see #mergeArrayDelayError(SingleSource...) + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @NonNull + @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs + public static Flowable mergeArray(SingleSource... sources) { + return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, sources.length); + } + + /** + * Flattens an array of {@link SingleSource}s into one {@link Flowable}, in a way that allows a subscriber to receive all + * successfully emitted items from each of the source {@code SingleSource}s without being interrupted by an error + * notification from one of them. + *

+ * + *

+ * This behaves like {@link #merge(Publisher)} except that if any of the merged {@code SingleSource}s notify of an + * error via {@link Subscriber#onError onError}, {@code mergeArrayDelayError} will refrain from propagating that + * error notification until all of the merged {@code SingleSource}s have finished emitting items. + *

+ * Even if multiple merged {@code SingleSource}s send {@code onError} notifications, {@code mergeArrayDelayError} will only + * invoke the {@code onError} method of its subscribers once. + *

+ *
Backpressure:
+ *
The operator honors backpressure from downstream.
+ *
Scheduler:
+ *
{@code mergeArrayDelayError} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the common element base type + * @param sources + * the array of {@code SingleSource}s + * @return the new {@code Flowable} instance + * @throws NullPointerException if {@code sources} is {@code null} + * @see ReactiveX operators documentation: Merge + */ + @BackpressureSupport(BackpressureKind.FULL) + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @SafeVarargs + @NonNull + public static Flowable mergeArrayDelayError(@NonNull SingleSource... sources) { + return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, sources.length); + } + /** * Merges an {@link Iterable} sequence of {@link SingleSource} instances into one {@link Flowable} sequence, * running all {@code SingleSource}s at once and delaying any error(s) until all sources succeed or fail. diff --git a/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java b/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java index ed4718fd1c..e87bfd315a 100644 --- a/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java +++ b/src/test/java/io/reactivex/rxjava3/internal/fuseable/CancellableQueueFuseableTest.java @@ -57,7 +57,7 @@ public void dispose() { @Test public void cancel2() { - AbstractEmptyQueueFuseable qs = new AbstractEmptyQueueFuseable() {}; + AbstractEmptyQueueFuseable qs = new AbstractEmptyQueueFuseable() { }; assertFalse(qs.isDisposed()); @@ -66,7 +66,7 @@ public void cancel2() { @Test public void dispose2() { - AbstractEmptyQueueFuseable qs = new AbstractEmptyQueueFuseable() {}; + AbstractEmptyQueueFuseable qs = new AbstractEmptyQueueFuseable() { }; assertFalse(qs.isDisposed()); diff --git a/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleMergeArrayTest.java b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleMergeArrayTest.java new file mode 100644 index 0000000000..9434a4fcc1 --- /dev/null +++ b/src/test/java/io/reactivex/rxjava3/internal/operators/single/SingleMergeArrayTest.java @@ -0,0 +1,50 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.rxjava3.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.rxjava3.core.*; +import io.reactivex.rxjava3.exceptions.TestException; + +public class SingleMergeArrayTest extends RxJavaTest { + + @Test + public void normal() { + Single.mergeArray(Single.just(1), Single.just(2), Single.just(3)) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void error() { + Single.mergeArray(Single.just(1), Single.error(new TestException()), Single.just(3)) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void normalDelayError() { + Single.mergeArrayDelayError(Single.just(1), Single.just(2), Single.just(3)) + .test() + .assertResult(1, 2, 3); + } + + @Test + public void errorDelayError() { + Single.mergeArrayDelayError(Single.just(1), Single.error(new TestException()), Single.just(3)) + .test() + .assertFailure(TestException.class, 1, 3); + } +}