From 14ef580186d4196d2319aa407f46cbd977186eac Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 26 Dec 2019 23:37:07 +0100 Subject: [PATCH] 3.x: Add blockingForEach(Consumer, int) overload --- .../io/reactivex/rxjava3/core/Flowable.java | 53 +++++++++++++++++-- .../io/reactivex/rxjava3/core/Observable.java | 41 +++++++++++++- 2 files changed, 88 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java index 57efac10e0..df3a8a270a 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java @@ -5663,8 +5663,8 @@ public final T blockingFirst(@NonNull T defaultItem) { * sequence. *
*
Backpressure:
- *
The operator consumes the source {@code Flowable} in an unbounded manner - * (i.e., no backpressure applied to it).
+ *
The operator requests {@link Flowable#bufferSize()} upfront, then 75% of this + * amount when 75% is received.
*
Scheduler:
*
{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.
*
Error handling:
@@ -5676,14 +5676,57 @@ public final T blockingFirst(@NonNull T defaultItem) { * @param onNext * the {@link Consumer} to invoke for each item emitted by the {@code Flowable} * @throws RuntimeException - * if an error occurs + * if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown + * as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s * @see ReactiveX documentation: Subscribe * @see #subscribe(Consumer) + * @see #blockingForEach(Consumer, int) */ - @BackpressureSupport(BackpressureKind.UNBOUNDED_IN) + @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) public final void blockingForEach(@NonNull Consumer onNext) { - Iterator it = blockingIterable().iterator(); + blockingForEach(onNext, bufferSize()); + } + + /** + * Consumes the upstream {@code Flowable} in a blocking fashion and invokes the given + * {@code Consumer} with each upstream item on the current thread until the + * upstream terminates. + *

+ * + *

+ * Note: the method will only return if the upstream terminates or the current + * thread is interrupted. + *

+ * This method executes the {@code Consumer} on the current thread while + * {@link #subscribe(Consumer)} executes the consumer on the original caller thread of the + * sequence. + *

+ *
Backpressure:
+ *
The operator requests the given {@code prefetch} amount upfront, then 75% of this + * amount when 75% is received.
+ *
Scheduler:
+ *
{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the source signals an error, the operator wraps a checked {@link Exception} + * into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and + * {@link Error}s are rethrown as they are.
+ *
+ * + * @param onNext + * the {@link Consumer} to invoke for each item emitted by the {@code Flowable} + * @param bufferSize + * the number of items to prefetch upfront, then 75% of it after 75% received + * @throws RuntimeException + * if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown + * as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s + * @see ReactiveX documentation: Subscribe + * @see #subscribe(Consumer) + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final void blockingForEach(@NonNull Consumer onNext, int bufferSize) { + Iterator it = blockingIterable(bufferSize).iterator(); while (it.hasNext()) { try { onNext.accept(it.next()); diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java index eb33792f34..1933a6db0b 100644 --- a/src/main/java/io/reactivex/rxjava3/core/Observable.java +++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java @@ -5170,11 +5170,50 @@ public final T blockingFirst(@NonNull T defaultItem) { * if an error occurs * @see ReactiveX documentation: Subscribe * @see #subscribe(Consumer) + * @see #blockingForEach(Consumer, int) */ @SchedulerSupport(SchedulerSupport.NONE) @NonNull public final void blockingForEach(@NonNull Consumer onNext) { - Iterator it = blockingIterable().iterator(); + blockingForEach(onNext, bufferSize()); + } + + /** + * Consumes the upstream {@code Observable} in a blocking fashion and invokes the given + * {@code Consumer} with each upstream item on the current thread until the + * upstream terminates. + *

+ * + *

+ * Note: the method will only return if the upstream terminates or the current + * thread is interrupted. + *

+ * This method executes the {@code Consumer} on the current thread while + * {@link #subscribe(Consumer)} executes the consumer on the original caller thread of the + * sequence. + *

+ *
Scheduler:
+ *
{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.
+ *
Error handling:
+ *
If the source signals an error, the operator wraps a checked {@link Exception} + * into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and + * {@link Error}s are rethrown as they are.
+ *
+ * + * @param onNext + * the {@link Consumer} to invoke for each item emitted by the {@code Observable} + * @param capacityHint + * the number of items expected to be buffered (allows reducing buffer reallocations) + * @throws RuntimeException + * if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown + * as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s + * @see ReactiveX documentation: Subscribe + * @see #subscribe(Consumer) + */ + @SchedulerSupport(SchedulerSupport.NONE) + @NonNull + public final void blockingForEach(@NonNull Consumer onNext, int capacityHint) { + Iterator it = blockingIterable(capacityHint).iterator(); while (it.hasNext()) { try { onNext.accept(it.next());