Skip to content

Commit

Permalink
Publisher[retry|repeat] change default exception handling mode (#2642)
Browse files Browse the repository at this point in the history
Motivation:
The current default mode of Publisher retry/repeat operators allows
exceptions to propagate from onNext. However this may result in
incorrect demand management and lead to "hangs" due to off-by-one
demand (downstream subscriber requested X, but may only see X-1 if
an earlier downstream subscriber threw). ReactiveStreams spec
specifies that Subscriber methods are generally not expected to
throw [1]. For these reasons the current default maybe unsafe and
isn't commonly expected. The alternative mode makes exception handling
explicit and won't result in hangs.

[1] https://github.com/reactive-streams/reactive-streams-jvm#2.13

Modifications:
- Change default mode of operation to not allow downstream exceptions
to propagate upstream.
  • Loading branch information
Scottmitch authored Jul 6, 2023
1 parent f061ee8 commit 7f0b795
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2213,7 +2213,7 @@ public final Publisher<T> concatPropagateCancel(Completable next) {
* @see #retry(boolean, BiIntPredicate)
*/
public final Publisher<T> retry(BiIntPredicate<Throwable> shouldRetry) {
return retry(false, shouldRetry);
return retry(true, shouldRetry);
}

/**
Expand Down Expand Up @@ -2315,7 +2315,7 @@ public final Publisher<T> retry(boolean terminateOnNextException, BiIntPredicate
* @see #retryWhen(boolean, BiIntFunction)
*/
public final Publisher<T> retryWhen(BiIntFunction<Throwable, ? extends Completable> retryWhen) {
return retryWhen(false, retryWhen);
return retryWhen(true, retryWhen);
}

/**
Expand Down Expand Up @@ -2401,46 +2401,9 @@ public final Publisher<T> retryWhen(boolean terminateOnNextException,
* @return A {@link Publisher} that emits all items from this {@link Publisher} and re-subscribes when it completes
* if the passed {@link IntPredicate} returns {@code true}.
* @see <a href="https://reactivex.io/documentation/operators/repeat.html">ReactiveX repeat operator.</a>
* @see #repeat(boolean, IntPredicate)
*/
public final Publisher<T> repeat(IntPredicate shouldRepeat) {
return repeat(false, shouldRepeat);
}

/**
* Re-subscribes to this {@link Publisher} when it completes and the passed {@link IntPredicate} returns
* {@code true}.
* <pre>
* This method may result in a {@link StackOverflowError} if too many consecutive calls are made. This can be
* avoided by trampolining the call stack onto an {@link Executor}. For example:
* {@code repeatWhen(i -> i % 10 == 0 ? executor.submit(() -> { }) : Completable.completed())}
* </pre>
* This method provides a means to repeat an operation multiple times and in sequential programming is similar to:
* <pre>{@code
* List<T> results = new ...;
* int i = 0;
* do {
* results.addAll(resultOfThisPublisher());
* } while (shouldRepeat.test(++i));
* return results;
* }</pre>
* @param terminateOnNextException
* <ul>
* <li>{@code true} means that exceptions thrown from downstream {@link Subscriber#onNext(Object)} will be
* caught, cancel the {@link Subscription}, propagate a {@link Subscriber#onError(Throwable)} downstream, and
* no retry will be attempted.</li>
* <li>{@code false} means that exceptions thrown from downstream {@link Subscriber#onNext(Object)} will NOT
* be caught and will propagate upstream. May lead to incorrect demand accounting and "hangs" if this operator
* isn't the last in the chain.</li>
* </ul>
* @param shouldRepeat {@link IntPredicate} that given the repeat count determines if the operation should be
* repeated.
* @return A {@link Publisher} that emits all items from this {@link Publisher} and re-subscribes when it completes
* if the passed {@link IntPredicate} returns {@code true}.
* @see <a href="https://reactivex.io/documentation/operators/repeat.html">ReactiveX repeat operator.</a>
*/
public final Publisher<T> repeat(boolean terminateOnNextException, IntPredicate shouldRepeat) {
return new RedoPublisher<>(this, terminateOnNextException, (repeatCount, terminalNotification) ->
return new RedoPublisher<>(this, true, (repeatCount, terminalNotification) ->
terminalNotification.cause() == null && shouldRepeat.test(repeatCount));
}

Expand Down Expand Up @@ -2474,55 +2437,9 @@ public final Publisher<T> repeat(boolean terminateOnNextException, IntPredicate
* @return A {@link Publisher} that emits all items from this {@link Publisher} and re-subscribes if an error is
* emitted and {@link Completable} returned by {@link IntFunction} completes successfully.
* @see <a href="https://reactivex.io/documentation/operators/retry.html">ReactiveX retry operator.</a>
* @see #repeatWhen(boolean, IntFunction)
*/
public final Publisher<T> repeatWhen(IntFunction<? extends Completable> repeatWhen) {
return repeatWhen(false, repeatWhen);
}

/**
* Re-subscribes to this {@link Publisher} when it completes and the {@link Completable} returned by the supplied
* {@link IntFunction} completes successfully. If the returned {@link Completable} emits an error, the returned
* {@link Publisher} is completed.
* <pre>
* This method may result in a {@link StackOverflowError} if too many consecutive calls are made. This can be
* avoided by trampolining the call stack onto an {@link Executor}. For example:
* {@code repeatWhen(i -> i % 10 == 0 ? executor.submit(() -> { }) : Completable.completed())}
* </pre>
* This method provides a means to repeat an operation multiple times when in an asynchronous fashion and in
* sequential programming is similar to:
* <pre>{@code
* List<T> results = new ...;
* int i = 0;
* while (true) {
* results.addAll(resultOfThisPublisher());
* try {
* repeatWhen.apply(++i); // Either throws or completes normally
* } catch (Throwable cause) {
* break;
* }
* }
* return results;
* }</pre>
* @param terminateOnNextException
* <ul>
* <li>{@code true} means that exceptions thrown from downstream {@link Subscriber#onNext(Object)} will be
* caught, cancel the {@link Subscription}, propagate a {@link Subscriber#onError(Throwable)} downstream, and
* no retry will be attempted.</li>
* <li>{@code false} means that exceptions thrown from downstream {@link Subscriber#onNext(Object)} will NOT
* be caught and will propagate upstream. May lead to incorrect demand accounting and "hangs" if this operator
* isn't the last in the chain.</li>
* </ul>
* @param repeatWhen {@link IntFunction} that given the repeat count returns a {@link Completable}.
* If this {@link Completable} emits an error repeat is terminated, otherwise, original {@link Publisher} is
* re-subscribed when this {@link Completable} completes.
* @return A {@link Publisher} that emits all items from this {@link Publisher} and re-subscribes if an error is
* emitted and {@link Completable} returned by {@link IntFunction} completes successfully.
* @see <a href="https://reactivex.io/documentation/operators/retry.html">ReactiveX retry operator.</a>
*/
public final Publisher<T> repeatWhen(boolean terminateOnNextException,
IntFunction<? extends Completable> repeatWhen) {
return new RedoWhenPublisher<>(this, false, terminateOnNextException, (retryCount, notification) ->
return new RedoWhenPublisher<>(this, false, true, (retryCount, notification) ->
notification.cause() != null ? completed() : repeatWhen.apply(retryCount));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void exceptionAfterRetryPreservesDemand() {
// First repeat function will catch the error from onNext and propagate downstream to the second
// retry function. After the second repeat operator completes, this operator will trigger another repeat
// so we expect to see values from signals array twice.
.repeat(true, i -> i == 1)
.repeat(i -> i == 1)
.validateOutstandingDemand()
.map(t -> {
if (onNextCount.getAndIncrement() == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ void exceptionAfterRetryPreservesDemand() {
// First repeat function will catch the error from onNext and propagate downstream to the second
// retry function. After the second repeat operator completes, this operator will trigger another repeat
// so we expect to see values from signals array twice.
.repeatWhen(true, retryFunc)
.repeatWhen(retryFunc)
.validateOutstandingDemand()
.map(t -> {
if (onNextCount.getAndIncrement() == 0) {
Expand Down

0 comments on commit 7f0b795

Please sign in to comment.