Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ java {

tasks.withType(JavaCompile) {
options.compilerArgs << "-parameters"
options.compilerArgs += '-Xlint:-module'
options.compilerArgs += '-Xlint:unchecked'
}

apply from: file("gradle/javadoc_cleanup.gradle")
Expand Down
4 changes: 2 additions & 2 deletions config/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<module name="Checker">
<property name="severity" value="warning"/>
<module name="SuppressionFilter">
<property name="file" value="${project_loc}/config/checkstyle/suppressions.xml"/>
<property name="file" value="${config_loc}/suppressions.xml"/>
<property name="optional" value="false"/>
</module>
<module name="BeforeExecutionExclusionFileFilter">
Expand All @@ -28,6 +28,6 @@
</module>
<module name="Header">
<property name="fileExtensions" value="java"/>
<property name="headerFile" value="${project_loc}/config/license/HEADER_JAVA"/>
<property name="headerFile" value="${config_loc}/../license/HEADER_JAVA"/>
</module>
</module>
10 changes: 5 additions & 5 deletions config/checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

<suppressions>

<suppress checks="MissingJavadocMethod,JavadocMethod" files=".*[\\/]src[\\/]main[\\/]java[\\/]io[\\/]reactivex[\\/]rxjava4[\\/]internal[\\/].*\.java$"/>
<suppress checks="MissingJavadocMethod,JavadocMethod" files=".*[\\/]src[\\/]test[\\/]java[\\/].*\.java$"/>
<suppress checks="MissingJavadocMethod,JavadocMethod" files=".*[\\/]src[\\/]jmh[\\/]java[\\/].*\.java$"/>
<suppress checks="MissingJavadocMethod,JavadocMethod" files=".*Test\.java$"/>
<suppress checks="MissingJavadocMethod,JavadocMethod" files=".*Perf\.java$"/>
<suppress checks="MissingJavadocMethod|JavadocMethod" files=".*[\\/]src[\\/]main[\\/]java[\\/]io[\\/]reactivex[\\/]rxjava4[\\/]internal[\\/].*\.java$"/>
<suppress checks="MissingJavadocMethod|JavadocMethod" files=".*[\\/]src[\\/]test[\\/]java[\\/].*\.java$"/>
<suppress checks="MissingJavadocMethod|JavadocMethod" files=".*[\\/]src[\\/]jmh[\\/]java[\\/].*\.java$"/>
<suppress checks="MissingJavadocMethod|JavadocMethod" files=".*Test\.java$"/>
<suppress checks="MissingJavadocMethod|JavadocMethod" files=".*Perf\.java$"/>

</suppressions>
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import io.reactivex.rxjava4.functions.Function;

@SuppressWarnings("exports")
@SuppressWarnings("exports")
@BenchmarkMode(Mode.Throughput)
@Warmup(iterations = 5)
@Measurement(iterations = 5, time = 5, timeUnit = TimeUnit.SECONDS)
Expand Down
53 changes: 10 additions & 43 deletions src/main/java/io/reactivex/rxjava4/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.stream.*;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.core.docs.FlowableDocBasic;
import io.reactivex.rxjava4.disposables.*;
import io.reactivex.rxjava4.exceptions.*;
import io.reactivex.rxjava4.flowables.*;
Expand Down Expand Up @@ -153,7 +154,9 @@
* @see ParallelFlowable
* @see io.reactivex.rxjava4.subscribers.DisposableSubscriber
*/
public abstract class Flowable<@NonNull T> implements Publisher<T> {
public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
FlowableDocBasic<T>
{
/** The default buffer size. */
static final int BUFFER_SIZE;
static {
Expand Down Expand Up @@ -10146,29 +10149,12 @@ public final Single<T> elementAtOrError(long index) {
return RxJavaPlugins.onAssembly(new FlowableElementAtSingle<>(this, index, null));
}

/**
* Filters items emitted by the current {@code Flowable} by only emitting those that satisfy a specified predicate.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/filter.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code filter} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param predicate
* a function that evaluates each item emitted by the current {@code Flowable}, returning {@code true}
* if it passes the filter
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code predicate} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
*/
/** {@inheritDoc} */
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final Flowable<T> filter(@NonNull Predicate<? super T> predicate) {
Objects.requireNonNull(predicate, "predicate is null");
return RxJavaPlugins.onAssembly(new FlowableFilter<>(this, predicate));
Expand Down Expand Up @@ -12038,31 +12024,12 @@ public final Single<T> lastOrError() {
return RxJavaPlugins.onAssembly(new FlowableLift<>(this, lifter));
}

/**
* Returns a {@code Flowable} that applies a specified function to each item emitted by the current {@code Flowable} and
* emits the results of these function applications.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the output type
* @param mapper
* a function to apply to each item emitted by the current {@code Flowable}
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code mapper} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
* @see #mapOptional(Function)
*/
/** {@inheritDoc} */
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final <@NonNull R> Flowable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new FlowableMap<>(this, mapper));
Expand Down Expand Up @@ -20896,7 +20863,7 @@ public final Stream<T> blockingStream(int prefetch) {
ObjectHelper.verifyPositive(prefetch, "prefetch");
return RxJavaPlugins.onAssembly(new FlowableFlatMapStream<>(this, mapper, prefetch));
}

/**
* Construct a {@code Flowable} and use the given {@code generator}
* to generate items on demand while running on the given {@link ExecutorService}.
Expand Down Expand Up @@ -20948,7 +20915,7 @@ public final Stream<T> blockingStream(int prefetch) {
}

/**
* Returns a {@code Flowable} that turns an upstream item an upstream item into
* Returns a {@code Flowable} that turns an upstream item an upstream item into
* zero or more downstream values by running on the given {@link ExecutorService}.
* <p>
* <dl>
Expand Down
76 changes: 76 additions & 0 deletions src/main/java/io/reactivex/rxjava4/core/docs/FlowableDocBasic.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.rxjava4.core.docs;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.core.*;
import io.reactivex.rxjava4.functions.*;

/**
* Documents a set of operators so that the main Flowable source file is not cluttered.
* @param <T> the element type of the flow
*/
public sealed interface FlowableDocBasic<T> permits Flowable {
/**
* Returns a {@code Flowable} that applies a specified function to each item emitted by the current {@code Flowable} and
* emits the results of these function applications.
* <p>
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/map.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code map} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param <R> the output type
* @param mapper
* a function to apply to each item emitted by the current {@code Flowable}
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code mapper} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/map.html">ReactiveX operators documentation: Map</a>
* @see #mapOptional(Function)
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
<@NonNull R> Flowable<R> map(@NonNull Function<? super T, ? extends R> mapper);

/**
* Filters items emitted by the current {@code Flowable} by only emitting those that satisfy a specified predicate.
* <p>
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/filter.v3.png" alt="">
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator doesn't interfere with backpressure which is determined by the current {@code Flowable}'s backpressure
* behavior.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code filter} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param predicate
* a function that evaluates each item emitted by the current {@code Flowable}, returning {@code true}
* if it passes the filter
* @return the new {@code Flowable} instance
* @throws NullPointerException if {@code predicate} is {@code null}
* @see <a href="http://reactivex.io/documentation/operators/filter.html">ReactiveX operators documentation: Filter</a>
*/
@CheckReturnValue
@NonNull
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
@SchedulerSupport(SchedulerSupport.NONE)
Flowable<T> filter(@NonNull Predicate<? super T> predicate);

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ protected void subscribeActual(Subscriber<? super T> s) {
executor.submit(parent);
}

static final class ExecutorVirtualCreateSubscription<T> extends AtomicLong
static final class ExecutorVirtualCreateSubscription<T> extends AtomicLong
implements Subscription, Callable<Void>, VirtualEmitter<T> {

private static final long serialVersionUID = -6959205135542203083L;
Expand Down
19 changes: 9 additions & 10 deletions src/main/java/io/reactivex/rxjava4/plugins/RxJavaPlugins.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Objects;
import java.util.concurrent.*;

import static java.util.concurrent.Flow.*;
import java.util.concurrent.Flow.Subscriber;

import io.reactivex.rxjava4.annotations.*;
import io.reactivex.rxjava4.core.*;
Expand Down Expand Up @@ -115,7 +114,7 @@ public final class RxJavaPlugins {

@SuppressWarnings("rawtypes")
@Nullable
static volatile BiFunction<? super ParallelFlowable, @NonNull ? super Subscriber[], @NonNull ? extends Subscriber[]> onParallelSubscribe;
static volatile BiFunction<? super ParallelFlowable, @NonNull ? super Subscriber<@NonNull ?>[], @NonNull ? extends Subscriber<@NonNull ?>[]> onParallelSubscribe;

@Nullable
static volatile BooleanSupplier onBeforeBlocking;
Expand Down Expand Up @@ -1008,12 +1007,12 @@ public static CompletableObserver onSubscribe(@NonNull Completable source, @NonN
* @param subscribers the array of subscribers
* @return the value returned by the hook
*/
@SuppressWarnings({ "rawtypes" })
@SuppressWarnings({ "unchecked" })
@NonNull
public static <@NonNull T> Subscriber<? super T>[] onSubscribe(@NonNull ParallelFlowable<T> source, @NonNull Subscriber<? super T>[] subscribers) {
BiFunction<? super ParallelFlowable, @NonNull ? super Subscriber[], @NonNull ? extends Subscriber[]> f = onParallelSubscribe;
public static <@NonNull T> Subscriber<? super T>[] onSubscribe(@NonNull ParallelFlowable<? extends T> source, @NonNull Subscriber<? super T>[] subscribers) {
var f = onParallelSubscribe;
if (f != null) {
return apply(f, source, subscribers);
return (@NonNull Subscriber<@NonNull ? super @NonNull T>[]) apply(f, source, subscribers);
}
return subscribers;
}
Expand Down Expand Up @@ -1161,7 +1160,7 @@ public static void setOnParallelAssembly(@Nullable Function<? super ParallelFlow
* @since 3.1.0
*/
@SuppressWarnings("rawtypes")
public static void setOnParallelSubscribe(@Nullable BiFunction<? super ParallelFlowable, @NonNull ? super Subscriber[], @NonNull ? extends Subscriber[]> handler) {
public static void setOnParallelSubscribe(@Nullable BiFunction<? super ParallelFlowable, @NonNull ? super Subscriber<@NonNull ?>[], @NonNull ? extends Subscriber<@NonNull ?>[]> handler) {
if (lockdown) {
throw new IllegalStateException("Plugins can't be changed anymore");
}
Expand All @@ -1176,7 +1175,7 @@ public static void setOnParallelSubscribe(@Nullable BiFunction<? super ParallelF
*/
@SuppressWarnings("rawtypes")
@Nullable
public static BiFunction<? super ParallelFlowable, @NonNull ? super Subscriber[], @NonNull ? extends Subscriber[]> getOnParallelSubscribe() {
public static BiFunction<? super ParallelFlowable, @NonNull ? super Subscriber<@NonNull ?>[], @NonNull ? extends Subscriber<@NonNull ?>[]> getOnParallelSubscribe() {
return onParallelSubscribe;
}

Expand Down Expand Up @@ -1355,7 +1354,7 @@ public static Scheduler createExecutorScheduler(@NonNull Executor executor, bool
* @return the result of the function call
*/
@NonNull
static <@NonNull T, @NonNull U, @NonNull R> R apply(@NonNull BiFunction<T, U, R> f, @NonNull T t, @NonNull U u) {
static <@NonNull T, @NonNull U, @NonNull R> R apply(@NonNull BiFunction<? super T, ? super U, ? extends R> f, @NonNull T t, @NonNull U u) {
try {
return f.apply(t, u);
} catch (Throwable ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -589,14 +589,12 @@ public void onComplete() {
.assertComplete();
}

@SuppressWarnings("rawtypes")
@Test
public void parallelFlowableStart() {
try {
RxJavaPlugins.setOnParallelSubscribe(new BiFunction<ParallelFlowable, Subscriber[], Subscriber[]>() {
@Override
public Subscriber[] apply(ParallelFlowable f, final Subscriber[] t) {
return new Subscriber[] { new Subscriber() {
RxJavaPlugins.setOnParallelSubscribe((_, t) -> {
var result = new Subscriber<?>[] {
new Subscriber<Object>() {

@Override
public void onSubscribe(Subscription s) {
Expand All @@ -606,7 +604,7 @@ public void onSubscribe(Subscription s) {
@SuppressWarnings("unchecked")
@Override
public void onNext(Object value) {
t[0].onNext((Integer)value - 9);
((Subscriber<Integer>)t[0]).onNext(((Integer)value - 9));
}

@Override
Expand All @@ -621,8 +619,9 @@ public void onComplete() {

}
};
return result;
}
});
);

Flowable.range(10, 3)
.parallel(1)
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/reactivex/rxjava4/tck/BaseTck.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ public static void before() {
@AfterClass
public static void after() {
service.shutdown();
}
}

/**
* Creates an Iterable with the specified number of elements or an infinite one if
* {@code elements >} {@link Integer#MAX_VALUE}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ public static List<RxMethod> parse(File f, String baseClassName) throws Exceptio
StringBuilder b = JavadocForAnnotations.readFile(f);

int baseIndex = b.indexOf("public abstract class " + baseClassName);
if (baseIndex < 0) {
baseIndex = b.indexOf("public abstract non-sealed class " + baseClassName);
}

if (baseIndex < 0) {
throw new AssertionError("Wrong base class file: " + baseClassName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,17 @@ static final void scanFor(StringBuilder sourceCode, String annotation, String in
int k = sourceCode.indexOf(inDoc, j);

if (k < 0 || k > idx) {
// when printed on the console, IDEs will create a clickable link to help navigate to the offending point
e.append("java.lang.RuntimeException: missing ").append(inDoc).append(" section\r\n")
;
int lc = lineNumber(sourceCode, idx);

e.append(" at io.reactivex.rxjava4.core.").append(baseClassName)
.append(" (").append(baseClassName).append(".java:")
.append(lc).append(")").append("\r\n\r\n");
var subSource = sourceCode.substring(j, idx);
if (!subSource.startsWith("/** {@inheritDoc} */")) {
// when printed on the console, IDEs will create a clickable link to help navigate to the offending point
e.append("java.lang.RuntimeException: missing ").append(inDoc).append(" section\r\n")
;
int lc = lineNumber(sourceCode, idx);

e.append(" at io.reactivex.rxjava4.core.").append(baseClassName)
.append(" (").append(baseClassName).append(".java:")
.append(lc).append(")").append("\r\n\r\n");
}
}
}

Expand Down
Loading