The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
- * behavior.
- *
Scheduler:
- *
{@code filter} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param predicate
- * a function that evaluates each item emitted by the current {@code Flowable}, returning {@code true}
- * if it passes the filter
- * @return the new {@code Flowable} instance
- * @throws NullPointerException if {@code predicate} is {@code null}
- * @see ReactiveX operators documentation: Filter
- */
+ /** {@inheritDoc} */
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
+ @Override
public final Flowable filter(@NonNull Predicate super T> predicate) {
Objects.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new FlowableFilter<>(this, predicate));
@@ -12038,31 +12024,12 @@ public final Single lastOrError() {
return RxJavaPlugins.onAssembly(new FlowableLift<>(this, lifter));
}
- /**
- * Returns a {@code Flowable} that applies a specified function to each item emitted by the current {@code Flowable} and
- * emits the results of these function applications.
- *
- *
- *
- *
Backpressure:
- *
The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
- * behavior.
- *
Scheduler:
- *
{@code map} does not operate by default on a particular {@link Scheduler}.
- *
- *
- * @param the output type
- * @param mapper
- * a function to apply to each item emitted by the current {@code Flowable}
- * @return the new {@code Flowable} instance
- * @throws NullPointerException if {@code mapper} is {@code null}
- * @see ReactiveX operators documentation: Map
- * @see #mapOptional(Function)
- */
+ /** {@inheritDoc} */
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
+ @Override
public final <@NonNull R> Flowable map(@NonNull Function super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableMap<>(this, mapper));
@@ -20896,7 +20863,7 @@ public final Stream blockingStream(int prefetch) {
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableFlatMapStream<>(this, mapper, prefetch));
}
-
+
/**
* Construct a {@code Flowable} and use the given {@code generator}
* to generate items on demand while running on the given {@link ExecutorService}.
@@ -20948,7 +20915,7 @@ public final Stream blockingStream(int prefetch) {
}
/**
- * Returns a {@code Flowable} that turns an upstream item an upstream item into
+ * Returns a {@code Flowable} that turns an upstream item an upstream item into
* zero or more downstream values by running on the given {@link ExecutorService}.
*
*
diff --git a/src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java b/src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java
new file mode 100644
index 0000000000..357307cddd
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package io.reactivex.rxjava4.core.docs;
+
+import io.reactivex.rxjava4.annotations.*;
+import io.reactivex.rxjava4.core.*;
+import io.reactivex.rxjava4.functions.*;
+
+/**
+ * Documents a set of operators so that the main Flowable source file is not cluttered.
+ * @param the element type of the flow
+ */
+public sealed interface FlowableDocBasic permits Flowable {
+ /**
+ * Returns a {@code Flowable} that applies a specified function to each item emitted by the current {@code Flowable} and
+ * emits the results of these function applications.
+ *
+ *
+ *
+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
+ * behavior.
+ *
Scheduler:
+ *
{@code map} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param the output type
+ * @param mapper
+ * a function to apply to each item emitted by the current {@code Flowable}
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code mapper} is {@code null}
+ * @see ReactiveX operators documentation: Map
+ * @see #mapOptional(Function)
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.PASS_THROUGH)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ <@NonNull R> Flowable map(@NonNull Function super T, ? extends R> mapper);
+
+ /**
+ * Filters items emitted by the current {@code Flowable} by only emitting those that satisfy a specified predicate.
+ *
+ *
+ *
+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
+ * behavior.
+ *
Scheduler:
+ *
{@code filter} does not operate by default on a particular {@link Scheduler}.
+ *
+ *
+ * @param predicate
+ * a function that evaluates each item emitted by the current {@code Flowable}, returning {@code true}
+ * if it passes the filter
+ * @return the new {@code Flowable} instance
+ * @throws NullPointerException if {@code predicate} is {@code null}
+ * @see ReactiveX operators documentation: Filter
+ */
+ @CheckReturnValue
+ @NonNull
+ @BackpressureSupport(BackpressureKind.PASS_THROUGH)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ Flowable filter(@NonNull Predicate super T> predicate);
+
+}
diff --git a/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualCreateExecutor.java b/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualCreateExecutor.java
index 4c598957bc..f15b0ba9ad 100644
--- a/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualCreateExecutor.java
+++ b/src/main/java/io/reactivex/rxjava4/internal/virtual/FlowableVirtualCreateExecutor.java
@@ -48,7 +48,7 @@ protected void subscribeActual(Subscriber super T> s) {
executor.submit(parent);
}
- static final class ExecutorVirtualCreateSubscription extends AtomicLong
+ static final class ExecutorVirtualCreateSubscription extends AtomicLong
implements Subscription, Callable, VirtualEmitter {
private static final long serialVersionUID = -6959205135542203083L;
diff --git a/src/main/java/io/reactivex/rxjava4/plugins/RxJavaPlugins.java b/src/main/java/io/reactivex/rxjava4/plugins/RxJavaPlugins.java
index 26bf4ce19d..d20c6fc03f 100644
--- a/src/main/java/io/reactivex/rxjava4/plugins/RxJavaPlugins.java
+++ b/src/main/java/io/reactivex/rxjava4/plugins/RxJavaPlugins.java
@@ -16,8 +16,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Objects;
import java.util.concurrent.*;
-
-import static java.util.concurrent.Flow.*;
+import java.util.concurrent.Flow.Subscriber;
import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.core.*;
@@ -115,7 +114,7 @@ public final class RxJavaPlugins {
@SuppressWarnings("rawtypes")
@Nullable
- static volatile BiFunction super ParallelFlowable, @NonNull ? super Subscriber[], @NonNull ? extends Subscriber[]> onParallelSubscribe;
+ static volatile BiFunction super ParallelFlowable, @NonNull ? super Subscriber<@NonNull ?>[], @NonNull ? extends Subscriber<@NonNull ?>[]> onParallelSubscribe;
@Nullable
static volatile BooleanSupplier onBeforeBlocking;
@@ -1008,12 +1007,12 @@ public static CompletableObserver onSubscribe(@NonNull Completable source, @NonN
* @param subscribers the array of subscribers
* @return the value returned by the hook
*/
- @SuppressWarnings({ "rawtypes" })
+ @SuppressWarnings({ "unchecked" })
@NonNull
- public static <@NonNull T> Subscriber super T>[] onSubscribe(@NonNull ParallelFlowable source, @NonNull Subscriber super T>[] subscribers) {
- BiFunction super ParallelFlowable, @NonNull ? super Subscriber[], @NonNull ? extends Subscriber[]> f = onParallelSubscribe;
+ public static <@NonNull T> Subscriber super T>[] onSubscribe(@NonNull ParallelFlowable extends T> source, @NonNull Subscriber super T>[] subscribers) {
+ var f = onParallelSubscribe;
if (f != null) {
- return apply(f, source, subscribers);
+ return (@NonNull Subscriber<@NonNull ? super @NonNull T>[]) apply(f, source, subscribers);
}
return subscribers;
}
@@ -1161,7 +1160,7 @@ public static void setOnParallelAssembly(@Nullable Function super ParallelFlow
* @since 3.1.0
*/
@SuppressWarnings("rawtypes")
- public static void setOnParallelSubscribe(@Nullable BiFunction super ParallelFlowable, @NonNull ? super Subscriber[], @NonNull ? extends Subscriber[]> handler) {
+ public static void setOnParallelSubscribe(@Nullable BiFunction super ParallelFlowable, @NonNull ? super Subscriber<@NonNull ?>[], @NonNull ? extends Subscriber<@NonNull ?>[]> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
@@ -1176,7 +1175,7 @@ public static void setOnParallelSubscribe(@Nullable BiFunction super ParallelF
*/
@SuppressWarnings("rawtypes")
@Nullable
- public static BiFunction super ParallelFlowable, @NonNull ? super Subscriber[], @NonNull ? extends Subscriber[]> getOnParallelSubscribe() {
+ public static BiFunction super ParallelFlowable, @NonNull ? super Subscriber<@NonNull ?>[], @NonNull ? extends Subscriber<@NonNull ?>[]> getOnParallelSubscribe() {
return onParallelSubscribe;
}
@@ -1355,7 +1354,7 @@ public static Scheduler createExecutorScheduler(@NonNull Executor executor, bool
* @return the result of the function call
*/
@NonNull
- static <@NonNull T, @NonNull U, @NonNull R> R apply(@NonNull BiFunction f, @NonNull T t, @NonNull U u) {
+ static <@NonNull T, @NonNull U, @NonNull R> R apply(@NonNull BiFunction super T, ? super U, ? extends R> f, @NonNull T t, @NonNull U u) {
try {
return f.apply(t, u);
} catch (Throwable ex) {
diff --git a/src/test/java/io/reactivex/rxjava4/plugins/RxJavaPluginsTest.java b/src/test/java/io/reactivex/rxjava4/plugins/RxJavaPluginsTest.java
index d36fc9703f..a0464b8817 100644
--- a/src/test/java/io/reactivex/rxjava4/plugins/RxJavaPluginsTest.java
+++ b/src/test/java/io/reactivex/rxjava4/plugins/RxJavaPluginsTest.java
@@ -589,14 +589,12 @@ public void onComplete() {
.assertComplete();
}
- @SuppressWarnings("rawtypes")
@Test
public void parallelFlowableStart() {
try {
- RxJavaPlugins.setOnParallelSubscribe(new BiFunction() {
- @Override
- public Subscriber[] apply(ParallelFlowable f, final Subscriber[] t) {
- return new Subscriber[] { new Subscriber() {
+ RxJavaPlugins.setOnParallelSubscribe((_, t) -> {
+ var result = new Subscriber>[] {
+ new Subscriber