Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: Add blockingForEach(Consumer, int) overload #6800

Merged
merged 1 commit into from
Dec 26, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 48 additions & 5 deletions src/main/java/io/reactivex/rxjava3/core/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5663,8 +5663,8 @@ public final T blockingFirst(@NonNull T defaultItem) {
* sequence.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator consumes the source {@code Flowable} in an unbounded manner
* (i.e., no backpressure applied to it).</dd>
* <dd>The operator requests {@link Flowable#bufferSize()} upfront, then 75% of this
* amount when 75% is received.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
Expand All @@ -5676,14 +5676,57 @@ public final T blockingFirst(@NonNull T defaultItem) {
* @param onNext
* the {@link Consumer} to invoke for each item emitted by the {@code Flowable}
* @throws RuntimeException
* if an error occurs
* if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
* as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
* @see #subscribe(Consumer)
* @see #blockingForEach(Consumer, int)
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingForEach(@NonNull Consumer<? super T> onNext) {
Iterator<T> it = blockingIterable().iterator();
blockingForEach(onNext, bufferSize());
}

/**
* Consumes the upstream {@code Flowable} in a blocking fashion and invokes the given
* {@code Consumer} with each upstream item on the <em>current thread</em> until the
* upstream terminates.
* <p>
* <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/B.forEach.png" alt="">
* <p>
* <em>Note:</em> the method will only return if the upstream terminates or the current
* thread is interrupted.
* <p>
* This method executes the {@code Consumer} on the current thread while
* {@link #subscribe(Consumer)} executes the consumer on the original caller thread of the
* sequence.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The operator requests the given {@code prefetch} amount upfront, then 75% of this
* amount when 75% is received.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
*
* @param onNext
* the {@link Consumer} to invoke for each item emitted by the {@code Flowable}
* @param bufferSize
* the number of items to prefetch upfront, then 75% of it after 75% received
* @throws RuntimeException
* if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
* as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
* @see #subscribe(Consumer)
*/
@BackpressureSupport(BackpressureKind.FULL)
@SchedulerSupport(SchedulerSupport.NONE)
public final void blockingForEach(@NonNull Consumer<? super T> onNext, int bufferSize) {
Iterator<T> it = blockingIterable(bufferSize).iterator();
while (it.hasNext()) {
try {
onNext.accept(it.next());
Expand Down
41 changes: 40 additions & 1 deletion src/main/java/io/reactivex/rxjava3/core/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -5170,11 +5170,50 @@ public final T blockingFirst(@NonNull T defaultItem) {
* if an error occurs
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
* @see #subscribe(Consumer)
* @see #blockingForEach(Consumer, int)
*/
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final void blockingForEach(@NonNull Consumer<? super T> onNext) {
Iterator<T> it = blockingIterable().iterator();
blockingForEach(onNext, bufferSize());
}

/**
* Consumes the upstream {@code Observable} in a blocking fashion and invokes the given
* {@code Consumer} with each upstream item on the <em>current thread</em> until the
* upstream terminates.
* <p>
* <img width="640" height="330" src="https://github.com/ReactiveX/RxJava/wiki/images/rx-operators/blockingForEach.o.png" alt="">
* <p>
* <em>Note:</em> the method will only return if the upstream terminates or the current
* thread is interrupted.
* <p>
* This method executes the {@code Consumer} on the current thread while
* {@link #subscribe(Consumer)} executes the consumer on the original caller thread of the
* sequence.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code blockingForEach} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Error handling:</b></dt>
* <dd>If the source signals an error, the operator wraps a checked {@link Exception}
* into {@link RuntimeException} and throws that. Otherwise, {@code RuntimeException}s and
* {@link Error}s are rethrown as they are.</dd>
* </dl>
*
* @param onNext
* the {@link Consumer} to invoke for each item emitted by the {@code Observable}
* @param capacityHint
* the number of items expected to be buffered (allows reducing buffer reallocations)
* @throws RuntimeException
* if an error occurs; {@link Error}s and {@link RuntimeException}s are rethrown
* as they are, checked {@link Exception}s are wrapped into {@code RuntimeException}s
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX documentation: Subscribe</a>
* @see #subscribe(Consumer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
public final void blockingForEach(@NonNull Consumer<? super T> onNext, int capacityHint) {
Iterator<T> it = blockingIterable(capacityHint).iterator();
while (it.hasNext()) {
try {
onNext.accept(it.next());
Expand Down