Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Use more appropriate operators when delegating to Flowable ops #6888

Merged
merged 2 commits into from Jan 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 6 additions & 14 deletions src/main/java/io/reactivex/rxjava3/core/Maybe.java
Expand Up @@ -344,11 +344,10 @@ public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends Maybe
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, MaybeToPublisher.instance(), prefetch, ErrorMode.IMMEDIATE));
return RxJavaPlugins.onAssembly(new FlowableConcatMapMaybePublisher<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, prefetch));
}

/**
Expand Down Expand Up @@ -1141,7 +1140,7 @@ public static <T> Maybe<T> fromRunnable(@NonNull Runnable run) {
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources) {
return merge(Flowable.fromIterable(sources));
return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), false, Integer.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1218,11 +1217,10 @@ public static <T> Flowable<T> merge(@NonNull Publisher<@NonNull ? extends MaybeS
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> merge(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), false, maxConcurrency, 1));
return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybePublisher<>(sources, Functions.identity(), false, maxConcurrency));
}

/**
Expand Down Expand Up @@ -1490,18 +1488,14 @@ public static <T> Flowable<T> mergeArray(MaybeSource<? extends T>... sources) {
* @throws NullPointerException if {@code sources} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
@NonNull
public static <T> Flowable<T> mergeArrayDelayError(@NonNull MaybeSource<? extends T>... sources) {
Objects.requireNonNull(sources, "sources is null");
if (sources.length == 0) {
return Flowable.empty();
}
return Flowable.fromArray(sources).flatMap((Function)MaybeToPublisher.instance(), true, sources.length);
return Flowable.fromArray(sources).flatMapMaybe(Functions.identity(), true, Math.max(1, sources.length));
}

/**
Expand Down Expand Up @@ -1533,13 +1527,12 @@ public static <T> Flowable<T> mergeArrayDelayError(@NonNull MaybeSource<? extend
* @throws NullPointerException if {@code sources} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public static <T> Flowable<T> mergeDelayError(@NonNull Iterable<@NonNull ? extends MaybeSource<? extends T>> sources) {
return Flowable.fromIterable(sources).flatMap((Function)MaybeToPublisher.instance(), true);
return Flowable.fromIterable(sources).flatMapMaybe(Functions.identity(), true, Integer.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1609,15 +1602,14 @@ public static <T> Flowable<T> mergeDelayError(@NonNull Publisher<@NonNull ? exte
* @see <a href="http://reactivex.io/documentation/operators/merge.html">ReactiveX operators documentation: Merge</a>
* @since 2.2
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
@BackpressureSupport(BackpressureKind.FULL)
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> mergeDelayError(@NonNull Publisher<@NonNull ? extends MaybeSource<? extends T>> sources, int maxConcurrency) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, MaybeToPublisher.instance(), true, maxConcurrency, 1));
return RxJavaPlugins.onAssembly(new FlowableFlatMapMaybePublisher<>(sources, Functions.identity(), true, maxConcurrency));
}

/**
Expand Down
47 changes: 21 additions & 26 deletions src/main/java/io/reactivex/rxjava3/core/Single.java
Expand Up @@ -20,7 +20,6 @@
import org.reactivestreams.*;

import io.reactivex.rxjava3.annotations.*;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
Expand All @@ -32,7 +31,7 @@
import io.reactivex.rxjava3.internal.operators.flowable.*;
import io.reactivex.rxjava3.internal.operators.maybe.*;
import io.reactivex.rxjava3.internal.operators.mixed.*;
import io.reactivex.rxjava3.internal.operators.observable.*;
import io.reactivex.rxjava3.internal.operators.observable.ObservableSingleSingle;
import io.reactivex.rxjava3.internal.operators.single.*;
import io.reactivex.rxjava3.internal.util.ErrorMode;
import io.reactivex.rxjava3.observers.TestObserver;
Expand Down Expand Up @@ -195,7 +194,7 @@ public static <T> Single<T> ambArray(@NonNull SingleSource<? extends T>... sourc
@SchedulerSupport(SchedulerSupport.NONE)
@BackpressureSupport(BackpressureKind.FULL)
public static <T> Flowable<T> concat(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
return concat(Flowable.fromIterable(sources));
return Flowable.fromIterable(sources).concatMapSingleDelayError(Functions.identity(), false);
}

/**
Expand All @@ -216,10 +215,9 @@ public static <T> Flowable<T> concat(@NonNull Iterable<@NonNull ? extends Single
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Observable<T> concat(@NonNull ObservableSource<? extends SingleSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE));
return RxJavaPlugins.onAssembly(new ObservableConcatMapSingle<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, 2));
}

/**
Expand Down Expand Up @@ -272,11 +270,10 @@ public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends Singl
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> concat(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources, int prefetch) {
Objects.requireNonNull(sources, "sources is null");
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableConcatMapPublisher(sources, SingleInternalHelper.toFlowable(), prefetch, ErrorMode.IMMEDIATE));
return RxJavaPlugins.onAssembly(new FlowableConcatMapSinglePublisher<>(sources, Functions.identity(), ErrorMode.IMMEDIATE, prefetch));
}

/**
Expand Down Expand Up @@ -308,7 +305,7 @@ public static <T> Flowable<T> concat(
) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
return concat(Flowable.fromArray(source1, source2));
return Flowable.fromArray(source1, source2).concatMapSingleDelayError(Functions.identity(), false);
}

/**
Expand Down Expand Up @@ -344,7 +341,7 @@ public static <T> Flowable<T> concat(
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
Objects.requireNonNull(source3, "source3 is null");
return concat(Flowable.fromArray(source1, source2, source3));
return Flowable.fromArray(source1, source2, source3).concatMapSingleDelayError(Functions.identity(), false);
}

/**
Expand Down Expand Up @@ -383,7 +380,7 @@ public static <T> Flowable<T> concat(
Objects.requireNonNull(source2, "source2 is null");
Objects.requireNonNull(source3, "source3 is null");
Objects.requireNonNull(source4, "source4 is null");
return concat(Flowable.fromArray(source1, source2, source3, source4));
return Flowable.fromArray(source1, source2, source3, source4).concatMapSingleDelayError(Functions.identity(), false);
}

/**
Expand All @@ -409,7 +406,7 @@ public static <T> Flowable<T> concat(
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static <T> Flowable<T> concatArray(@NonNull SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).concatMap(SingleInternalHelper.toFlowable(), 2);
return Flowable.fromArray(sources).concatMapSingleDelayError(Functions.identity(), false);
}

/**
Expand All @@ -435,7 +432,7 @@ public static <T> Flowable<T> concatArray(@NonNull SingleSource<? extends T>...
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static <T> Flowable<T> concatArrayDelayError(@NonNull SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).concatMapDelayError(SingleInternalHelper.toFlowable(), true, 2);
return Flowable.fromArray(sources).concatMapSingleDelayError(Functions.identity(), true);
}

/**
Expand Down Expand Up @@ -1091,7 +1088,7 @@ public static <T> Single<T> fromObservable(@NonNull ObservableSource<? extends T
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
return merge(Flowable.fromIterable(sources));
return Flowable.fromIterable(sources).flatMapSingle(Functions.identity());
}

/**
Expand Down Expand Up @@ -1129,10 +1126,9 @@ public static <T> Flowable<T> merge(@NonNull Iterable<@NonNull ? extends SingleS
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> merge(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), false, Integer.MAX_VALUE, Flowable.bufferSize()));
return RxJavaPlugins.onAssembly(new FlowableFlatMapSinglePublisher<>(sources, Functions.identity(), false, Integer.MAX_VALUE));
}

/**
Expand Down Expand Up @@ -1212,7 +1208,7 @@ public static <T> Flowable<T> merge(
) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
return merge(Flowable.fromArray(source1, source2));
return Flowable.fromArray(source1, source2).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1265,7 +1261,7 @@ public static <T> Flowable<T> merge(
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
Objects.requireNonNull(source3, "source3 is null");
return merge(Flowable.fromArray(source1, source2, source3));
return Flowable.fromArray(source1, source2, source3).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1321,7 +1317,7 @@ public static <T> Flowable<T> merge(
Objects.requireNonNull(source2, "source2 is null");
Objects.requireNonNull(source3, "source3 is null");
Objects.requireNonNull(source4, "source4 is null");
return merge(Flowable.fromArray(source1, source2, source3, source4));
return Flowable.fromArray(source1, source2, source3, source4).flatMapSingle(Functions.identity(), false, Integer.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1360,7 +1356,7 @@ public static <T> Flowable<T> merge(
@SchedulerSupport(SchedulerSupport.NONE)
@SafeVarargs
public static <T> Flowable<T> mergeArray(SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, sources.length);
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), false, Math.max(1, sources.length));
}

/**
Expand Down Expand Up @@ -1396,7 +1392,7 @@ public static <T> Flowable<T> mergeArray(SingleSource<? extends T>... sources) {
@SafeVarargs
@NonNull
public static <T> Flowable<T> mergeArrayDelayError(@NonNull SingleSource<? extends T>... sources) {
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, sources.length);
return Flowable.fromArray(sources).flatMapSingle(Functions.identity(), true, Math.max(1, sources.length));
}

/**
Expand All @@ -1423,7 +1419,7 @@ public static <T> Flowable<T> mergeArrayDelayError(@NonNull SingleSource<? exten
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Flowable<T> mergeDelayError(@NonNull Iterable<@NonNull ? extends SingleSource<? extends T>> sources) {
return mergeDelayError(Flowable.fromIterable(sources));
return Flowable.fromIterable(sources).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE);
}

/**
Expand All @@ -1449,10 +1445,9 @@ public static <T> Flowable<T> mergeDelayError(@NonNull Iterable<@NonNull ? exten
@NonNull
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
@SuppressWarnings({ "unchecked", "rawtypes" })
public static <T> Flowable<T> mergeDelayError(@NonNull Publisher<@NonNull ? extends SingleSource<? extends T>> sources) {
Objects.requireNonNull(sources, "sources is null");
return RxJavaPlugins.onAssembly(new FlowableFlatMapPublisher(sources, SingleInternalHelper.toFlowable(), true, Integer.MAX_VALUE, Flowable.bufferSize()));
return RxJavaPlugins.onAssembly(new FlowableFlatMapSinglePublisher<>(sources, Functions.identity(), true, Integer.MAX_VALUE));
}

/**
Expand Down Expand Up @@ -1490,7 +1485,7 @@ public static <T> Flowable<T> mergeDelayError(
) {
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
return mergeDelayError(Flowable.fromArray(source1, source2));
return Flowable.fromArray(source1, source2).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1532,7 +1527,7 @@ public static <T> Flowable<T> mergeDelayError(
Objects.requireNonNull(source1, "source1 is null");
Objects.requireNonNull(source2, "source2 is null");
Objects.requireNonNull(source3, "source3 is null");
return mergeDelayError(Flowable.fromArray(source1, source2, source3));
return Flowable.fromArray(source1, source2, source3).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE);
}

/**
Expand Down Expand Up @@ -1577,7 +1572,7 @@ public static <T> Flowable<T> mergeDelayError(
Objects.requireNonNull(source2, "source2 is null");
Objects.requireNonNull(source3, "source3 is null");
Objects.requireNonNull(source4, "source4 is null");
return mergeDelayError(Flowable.fromArray(source1, source2, source3, source4));
return Flowable.fromArray(source1, source2, source3, source4).flatMapSingle(Functions.identity(), true, Integer.MAX_VALUE);
}

/**
Expand Down
Expand Up @@ -15,31 +15,35 @@

import org.reactivestreams.*;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMapMaybe.FlatMapMaybeSubscriber;

/**
* Maps upstream values into MaybeSources and merges their signals into one sequence.
* @param <T> the source value type
* @param <R> the result value type
*/
public final class FlowableFlatMapMaybePublisher<T, R> extends Flowable<R> {

public final class FlowableFlatMapPublisher<T, U> extends Flowable<U> {
final Publisher<T> source;
final Function<? super T, ? extends Publisher<? extends U>> mapper;

final Function<? super T, ? extends MaybeSource<? extends R>> mapper;

final boolean delayErrors;

final int maxConcurrency;
final int bufferSize;

public FlowableFlatMapPublisher(Publisher<T> source,
Function<? super T, ? extends Publisher<? extends U>> mapper,
boolean delayErrors, int maxConcurrency, int bufferSize) {
public FlowableFlatMapMaybePublisher(Publisher<T> source, Function<? super T, ? extends MaybeSource<? extends R>> mapper,
boolean delayError, int maxConcurrency) {
this.source = source;
this.mapper = mapper;
this.delayErrors = delayErrors;
this.delayErrors = delayError;
this.maxConcurrency = maxConcurrency;
this.bufferSize = bufferSize;
}

@Override
protected void subscribeActual(Subscriber<? super U> s) {
if (FlowableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
return;
}
source.subscribe(FlowableFlatMap.subscribe(s, mapper, delayErrors, maxConcurrency, bufferSize));
protected void subscribeActual(Subscriber<? super R> s) {
source.subscribe(new FlatMapMaybeSubscriber<>(s, mapper, delayErrors, maxConcurrency));
}
}
@@ -0,0 +1,49 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/

package io.reactivex.rxjava3.internal.operators.flowable;

import org.reactivestreams.*;

import io.reactivex.rxjava3.core.*;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.internal.operators.flowable.FlowableFlatMapSingle.FlatMapSingleSubscriber;

/**
* Maps upstream values into SingleSources and merges their signals into one sequence.
* @param <T> the source value type
* @param <R> the result value type
*/
public final class FlowableFlatMapSinglePublisher<T, R> extends Flowable<R> {

final Publisher<T> source;

final Function<? super T, ? extends SingleSource<? extends R>> mapper;

final boolean delayErrors;

final int maxConcurrency;

public FlowableFlatMapSinglePublisher(Publisher<T> source, Function<? super T, ? extends SingleSource<? extends R>> mapper,
boolean delayError, int maxConcurrency) {
this.source = source;
this.mapper = mapper;
this.delayErrors = delayError;
this.maxConcurrency = maxConcurrency;
}

@Override
protected void subscribeActual(Subscriber<? super R> s) {
source.subscribe(new FlatMapSingleSubscriber<>(s, mapper, delayErrors, maxConcurrency));
}
}