Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add Single.mergeArray & mergeArrayDelayError #6882

Merged
merged 2 commits into from
Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -1415,7 +1415,7 @@ public static <T> Flowable<T> merge(
}

/**
* Merges an array sequence of {@link MaybeSource} instances into a single {@link Flowable} sequence,
* Merges an array of {@link MaybeSource} instances into a single {@link Flowable} sequence,
* running all {@code MaybeSource}s at once.
* <p>
* <img width="640" height="272" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.mergeArray.png" alt="">
Expand Down Expand Up @@ -1470,10 +1470,10 @@ public static <T> Flowable<T> mergeArray(MaybeSource<? extends T>... sources) {
* <img width="640" height="422" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.mergeArrayDelayError.png" alt="">
* <p>
* This behaves like {@link #merge(Publisher)} except that if any of the merged {@code MaybeSource}s notify of an
* error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that
* error via {@link Subscriber#onError onError}, {@code mergeArrayDelayError} will refrain from propagating that
* error notification until all of the merged {@code MaybeSource}s have finished emitting items.
* <p>
* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeDelayError} will only
* Even if multiple merged {@code MaybeSource}s send {@code onError} notifications, {@code mergeArrayDelayError} will only
* invoke the {@code onError} method of its subscribers once.
* <dl>
* <dt><b>Backpressure:</b></dt>
Expand Down
77 changes: 76 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Single.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.util.concurrent.*;
import java.util.stream.*;

import org.reactivestreams.Publisher;
import org.reactivestreams.*;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.disposables.Disposable;
Expand Down Expand Up @@ -1323,6 +1323,81 @@ public static <T> Flowable<T> merge(
return merge(Flowable.fromArray(source1, source2, source3, source4));
}

/**
* Merges an array of {@link SingleSource} instances into a single {@link Flowable} sequence,
* running all {@code SingleSource}s at once.
* <p>
* <img width="640" height="272" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.mergeArray.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeArray} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If any of the source {@code SingleSource}s signal a {@link Throwable} via {@code onError}, the resulting
* {@code Flowable} terminates with that {@code Throwable} and all other source {@code SingleSource}s are disposed.
* If more than one {@code SingleSource} signals an error, the resulting {@code Flowable} may terminate with the
* first one's error or, depending on the concurrency of the sources, may terminate with a
* {@link CompositeException} containing two or more of the various error signals.
* {@code Throwable}s that didn't make into the composite will be sent (individually) to the global error handler via
* {@link RxJavaPlugins#onError(Throwable)} method as {@link UndeliverableException} errors. Similarly, {@code Throwable}s
* signaled by source(s) after the returned {@code Flowable} has been cancelled or terminated with a
* (composite) error will be sent to the same global error handler.
* Use {@link #mergeArrayDelayError(SingleSource...)} to merge sources and terminate only when all source {@code SingleSource}s
* have completed or failed with an error.
* </dd>
* </dl>
* @param <T> the common and resulting value type
* @param sources the array sequence of {@code SingleSource} sources
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @see #mergeArrayDelayError(SingleSource...)
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static <T> Flowable<T> mergeArray(SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, sources.length);
}

/**
* Flattens an array of {@link SingleSource}s into one {@link Flowable}, in a way that allows a subscriber to receive all
* successfully emitted items from each of the source {@code SingleSource}s without being interrupted by an error
* notification from one of them.
* <p>
* <img width="640" height="422" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Single.mergeArrayDelayError.png" alt="">
* <p>
* This behaves like {@link #merge(Publisher)} except that if any of the merged {@code SingleSource}s notify of an
* error via {@link Subscriber#onError onError}, {@code mergeArrayDelayError} will refrain from propagating that
* error notification until all of the merged {@code SingleSource}s have finished emitting items.
* <p>
* Even if multiple merged {@code SingleSource}s send {@code onError} notifications, {@code mergeArrayDelayError} will only
* invoke the {@code onError} method of its subscribers once.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator honors backpressure from downstream.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code mergeArrayDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <T> the common element base type
* @param sources
* the array of {@code SingleSource}s
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code sources} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
@NonNull
public static <T> Flowable<T> mergeArrayDelayError(@NonNull SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, sources.length);
}

/**
* Merges an {@link Iterable} sequence of {@link SingleSource} instances into one {@link Flowable} sequence,
* running all {@code SingleSource}s at once and delaying any error(s) until all sources succeed or fail.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void dispose() {

@Test
public void cancel2() {
AbstractEmptyQueueFuseable<Object> qs = new AbstractEmptyQueueFuseable<Object>() {};
AbstractEmptyQueueFuseable<Object> qs = new AbstractEmptyQueueFuseable<Object>() { };

assertFalse(qs.isDisposed());

Expand All @@ -66,7 +66,7 @@ public void cancel2() {

@Test
public void dispose2() {
AbstractEmptyQueueFuseable<Object> qs = new AbstractEmptyQueueFuseable<Object>() {};
AbstractEmptyQueueFuseable<Object> qs = new AbstractEmptyQueueFuseable<Object>() { };

assertFalse(qs.isDisposed());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.single;

import org.junit.Test;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.exceptions.TestException;

public class SingleMergeArrayTest extends RxJavaTest {

@Test
public void normal() {
Single.mergeArray(Single.just(1), Single.just(2), Single.just(3))
.test()
.assertResult(1, 2, 3);
}

@Test
public void error() {
Single.mergeArray(Single.just(1), Single.error(new TestException()), Single.just(3))
.test()
.assertFailure(TestException.class, 1);
}

@Test
public void normalDelayError() {
Single.mergeArrayDelayError(Single.just(1), Single.just(2), Single.just(3))
.test()
.assertResult(1, 2, 3);
}

@Test
public void errorDelayError() {
Single.mergeArrayDelayError(Single.just(1), Single.error(new TestException()), Single.just(3))
.test()
.assertFailure(TestException.class, 1, 3);
}
}