Skip to content

Commit

Permalink
Publisher.retry wrap exceptions thrown from onNext (#2649)
Browse files Browse the repository at this point in the history
Motivation:
Publisher.retry default mode of operation changed to not propagate
exceptions from onNext due to demand hangs. This means the retry
operator may not honor the retry strategy due to invalid usage.

Modifications:
- wrap exceptions from onNext to clarify the rational for not honoring
the retry strategy
  • Loading branch information
Scottmitch committed Jul 12, 2023
1 parent c30e076 commit 996e41b
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.function.BiPredicate;
import java.util.function.IntPredicate;

import static io.servicetalk.concurrent.internal.SubscriberUtils.newExceptionNormalReturn;
import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;
Expand Down Expand Up @@ -133,6 +134,7 @@ Subscription decorate(Subscription s) {
}

private void handleOnNextException(Throwable cause) {
cause = newExceptionNormalReturn(cause);
if (!terminateOnNextException) {
throwException(cause);
} else if (terminated) { // just in case on-next delivered a terminal in re-entry fashion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

Expand Down Expand Up @@ -650,13 +651,14 @@ void boundaryTerminalRetryRespectsSequential(boolean onError) throws Interrupted
void originalSourceIsRetriedIfSubscriberThrows() {
TestPublisher<Accumulator<Integer, Integer>> bPublisher = new TestPublisher<>();
DelayedSubscription bSubscription = new DelayedSubscription();
AtomicReference<TerminalNotification> terminal = new AtomicReference<>();
AtomicReference<TerminalNotification> terminalRef = new AtomicReference<>();
BlockingQueue<Integer> items = new LinkedBlockingDeque<>();
BlockingQueue<Integer> buffers = new LinkedBlockingDeque<>();
AtomicInteger counter = new AtomicInteger();
toSource(defer(() -> from(counter.incrementAndGet()))
.whenOnNext(items::add)
.retry(false, (i, t) -> i < 3 && t == DELIBERATE_EXCEPTION)
.retry(false, (i, t) -> i < 3 &&
(t instanceof IllegalStateException && t.getCause() == DELIBERATE_EXCEPTION))
.buffer(new TestBufferStrategy(bPublisher, 1)))
.subscribe(new Subscriber<Integer>() {
@Override
Expand All @@ -674,12 +676,12 @@ public void onNext(@Nullable Integer integer) {

@Override
public void onError(Throwable t) {
terminal.set(error(t));
terminalRef.set(error(t));
}

@Override
public void onComplete() {
terminal.set(complete());
terminalRef.set(complete());
}
});
bPublisher.onNext(new SumAccumulator(bPublisher)); // it will generate a new boundary on each accumulation
Expand All @@ -693,7 +695,11 @@ public void onComplete() {
assertThat(items, contains(1, 2, 3));
assertThat(buffers, hasSize(3));
assertThat(buffers, contains(1, 2, 3));
assertThat(terminal.get().cause(), is(DELIBERATE_EXCEPTION));
TerminalNotification terminal = terminalRef.get();
assertThat(terminal, notNullValue());
Throwable cause = terminal.cause();
assertThat(cause, instanceOf(IllegalStateException.class));
assertThat(cause.getCause(), is(DELIBERATE_EXCEPTION));
}

private static void verifyCancelled(TestSubscription subscription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ void exceptionAfterRetryPreservesDemand() {
final Integer[] signals = new Integer[] {1, 2, 3};
final AtomicInteger onNextCount = new AtomicInteger();
subscriber = new TestPublisherSubscriber<>();
BiIntPredicate<Throwable> retryFunc = (count, cause) -> cause == DELIBERATE_EXCEPTION;
BiIntPredicate<Throwable> retryFunc = (count, cause) ->
cause instanceof IllegalStateException && cause.getCause() == DELIBERATE_EXCEPTION;
toSource(Publisher.from(signals)
// First retry function will catch the error from onNext and propagate downstream to the second
// retry function.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,8 @@ void exceptionAfterRetryPreservesDemand() {
final Integer[] signals = new Integer[] {1, 2, 3};
final AtomicInteger onNextCount = new AtomicInteger();
subscriber = new TestPublisherSubscriber<>();
BiIntFunction<Throwable, Completable> retryFunc = (count, cause) -> cause == DELIBERATE_EXCEPTION ?
BiIntFunction<Throwable, Completable> retryFunc = (count, cause) ->
cause instanceof IllegalStateException && cause.getCause() == DELIBERATE_EXCEPTION ?
executor.timer(ofMillis(10)) : Completable.failed(cause);
toSource(Publisher.from(signals)
// First retry function will catch the error from onNext and propagate downstream to the second
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,9 @@
<Method name="close"/>
<Bug pattern="THROWS_METHOD_THROWS_CLAUSE_BASIC_EXCEPTION"/>
</Match>
<Match>
<Class name="io.servicetalk.concurrent.internal.SubscriberUtils"/>
<Method name="newExceptionNormalReturn"/>
<Bug pattern="BC_UNCONFIRMED_CAST"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,20 @@ public static IllegalArgumentException newExceptionForInvalidRequestN(long n) {
return new IllegalArgumentException("Rule 3.9 states non-positive request signals are illegal, but got: " + n);
}

/**
* Create a new exception when a subscriber throws when it doesn't return "normally" according to
* <a href="https://github.com/reactive-streams/reactive-streams-jvm#2.13">Reactive Streams, Rule 2.13</a>.
* @param cause The original cause that was thrown.
* @return The exception which clarifies the invalid behavior.
*/
public static RuntimeException newExceptionNormalReturn(Throwable cause) {
return SubscriberReturnNormalException.class.equals(cause.getClass()) ?
(SubscriberReturnNormalException) cause :
new SubscriberReturnNormalException(
"Rule 2.13 states Subscriber methods must return normally (failures via onError method only)." +
" Throwing may put operators that track demand into an undefined state.", cause);
}

/**
* Deliver a terminal complete to a {@link Subscriber} that has not yet had
* {@link PublisherSource.Subscriber#onSubscribe(PublisherSource.Subscription)} called.
Expand Down Expand Up @@ -392,4 +406,10 @@ private static void logDuplicateTerminal0(Object subscriber, @Nullable Throwable
"https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.9" +
"https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md#1.7", cause));
}

private static final class SubscriberReturnNormalException extends IllegalStateException {
SubscriberReturnNormalException(String message, Throwable cause) {
super(message, cause);
}
}
}

0 comments on commit 996e41b

Please sign in to comment.