Skip to content

Commit

Permalink
Publisher[retry|repeat] operators demand management if onNext throws (#…
Browse files Browse the repository at this point in the history
…2639)

Motivation:
Publisher `retry*` and `repeat*` operators need to persist outstanding
demand between "re-do" operations. Otherwise the downstream Subscriber
will have requested signals, but they won't be requested from the new
async source after switching. If a downstream operator (or Subscriber)
throws from onNext the retry/repeat operator has to decrement demand
to avoid violating the Reactive Streams specification (no more
than request(n) amount of onNext(..)), but downstream Subscribers that
didn't see the onNext (because an earlier operator threw) will be hung
waiting for request(n) that will never be delivered.

Modifications:
- Publisher[retry|repeat] introduce a flag to determine if downstream
onNext throwing should be terminal. This mode should be used unless
it is known the onNext signal always makes it to the last Subscriber
and its request(n) accounting can tolerate the exception it throws.
  • Loading branch information
Scottmitch committed Jul 5, 2023
1 parent 79c00bc commit 3843dd4
Show file tree
Hide file tree
Showing 10 changed files with 448 additions and 88 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.servicetalk.concurrent.api;

import io.servicetalk.concurrent.internal.ConcurrentSubscription;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.context.api.ContextMap;

Expand All @@ -23,6 +24,7 @@

import static io.servicetalk.concurrent.internal.TerminalNotification.complete;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static io.servicetalk.utils.internal.ThrowableUtils.throwException;

/**
* {@link Publisher} to do {@link Publisher#repeat(IntPredicate)} and {@link Publisher#retry(BiIntPredicate)}
Expand All @@ -31,12 +33,14 @@
* @param <T> Type of items emitted from this {@link Publisher}.
*/
final class RedoPublisher<T> extends AbstractNoHandleSubscribePublisher<T> {

private final boolean terminateOnNextException;
private final Publisher<T> original;
private final BiPredicate<Integer, TerminalNotification> shouldRedo;

RedoPublisher(Publisher<T> original, BiPredicate<Integer, TerminalNotification> shouldRedo) {
RedoPublisher(Publisher<T> original, boolean terminateOnNextException,
BiPredicate<Integer, TerminalNotification> shouldRedo) {
this.original = original;
this.terminateOnNextException = terminateOnNextException;
this.shouldRedo = shouldRedo;
}

Expand All @@ -45,16 +49,27 @@ void handleSubscribe(Subscriber<? super T> subscriber, ContextMap contextMap,
AsyncContextProvider contextProvider) {
// For the current subscribe operation we want to use contextMap directly, but in the event a re-subscribe
// operation occurs we want to restore the original state of the AsyncContext map, so we save a copy upfront.
original.delegateSubscribe(new RedoSubscriber<>(new SequentialSubscription(), 0, subscriber, contextMap.copy(),
contextProvider, this), contextMap, contextProvider);
original.delegateSubscribe(new RedoSubscriber<>(terminateOnNextException, new SequentialSubscription(), 0,
subscriber, contextMap.copy(), contextProvider, this), contextMap, contextProvider);
}

abstract static class AbstractRedoSubscriber<T> implements Subscriber<T> {
final SequentialSubscription subscription;
/**
* Unless you are sure all downstream operators consume the {@link Subscriber#onNext(Object)} this option
* SHOULD be {@code true}. Otherwise, the outstanding demand counting in this operator will be incorrect and may
* lead to a "hang" (e.g. this operator thinks demand has been consumed downstream so won't request it upstream
* after the retry, but if not all downstream operators see the signal because one before threw, they may wait
* for a signal they requested but will never be delivered).
*/
private final boolean terminateOnNextException;
private final SequentialSubscription subscription;
private boolean terminated;
final Subscriber<? super T> subscriber;
int redoCount;

AbstractRedoSubscriber(SequentialSubscription subscription, int redoCount, Subscriber<? super T> subscriber) {
AbstractRedoSubscriber(boolean terminateOnNextException, SequentialSubscription subscription, int redoCount,
Subscriber<? super T> subscriber) {
this.terminateOnNextException = terminateOnNextException;
this.subscription = subscription;
this.redoCount = redoCount;
this.subscriber = subscriber;
Expand All @@ -63,6 +78,11 @@ abstract static class AbstractRedoSubscriber<T> implements Subscriber<T> {
@Override
public final void onSubscribe(Subscription s) {
s = decorate(s);
if (terminateOnNextException) {
// ConcurrentSubscription because if exception is thrown from downstream onNext we invoke cancel which
// may introduce concurrency on the subscription.
s = ConcurrentSubscription.wrap(s);
}
// Downstream Subscriber only gets one Subscription but every time we re-subscribe we switch the current
// Subscription in SequentialSubscription to the new Subscription. This will make sure that we always
// request from the "current" Subscription.
Expand All @@ -75,38 +95,79 @@ public final void onSubscribe(Subscription s) {
}
}

@Override
public final void onNext(T t) {
if (terminated) {
return;
}
subscription.itemReceived();
try {
subscriber.onNext(t);
} catch (Throwable cause) {
handleOnNextException(cause);
}
}

@Override
public final void onError(Throwable cause) {
if (terminated) {
return;
}
onError0(cause);
}

@Override
public final void onComplete() {
if (terminated) {
return;
}
onComplete0();
}

abstract void onComplete0();

abstract void onError0(Throwable cause);

Subscription decorate(Subscription s) {
return s;
}

private void handleOnNextException(Throwable cause) {
if (!terminateOnNextException) {
throwException(cause);
} else if (terminated) { // just in case on-next delivered a terminal in re-entry fashion
return;
}
terminated = true;
try {
subscription.cancel();
} finally {
subscriber.onError(cause);
}
}
}

private static final class RedoSubscriber<T> extends AbstractRedoSubscriber<T> {
private final RedoPublisher<T> redoPublisher;
private final ContextMap contextMap;
private final AsyncContextProvider contextProvider;

RedoSubscriber(SequentialSubscription subscription, int redoCount, Subscriber<? super T> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider,
RedoSubscriber(boolean terminateOnNextException, SequentialSubscription subscription, int redoCount,
Subscriber<? super T> subscriber, ContextMap contextMap, AsyncContextProvider contextProvider,
RedoPublisher<T> redoPublisher) {
super(subscription, redoCount, subscriber);
super(terminateOnNextException, subscription, redoCount, subscriber);
this.redoPublisher = redoPublisher;
this.contextMap = contextMap;
this.contextProvider = contextProvider;
}

@Override
public void onNext(T t) {
subscription.itemReceived();
subscriber.onNext(t);
}

@Override
public void onError(Throwable t) {
void onError0(Throwable t) {
tryRedo(TerminalNotification.error(t));
}

@Override
public void onComplete() {
void onComplete0() {
tryRedo(complete());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.servicetalk.concurrent.Cancellable;
import io.servicetalk.concurrent.CompletableSource;
import io.servicetalk.concurrent.api.RedoPublisher.AbstractRedoSubscriber;
import io.servicetalk.concurrent.internal.SequentialCancellable;
import io.servicetalk.concurrent.internal.TerminalNotification;
import io.servicetalk.context.api.ContextMap;
Expand All @@ -35,7 +36,7 @@
* @param <T> Type of items emitted from this {@link Publisher}.
*/
final class RedoWhenPublisher<T> extends AbstractNoHandleSubscribePublisher<T> {

private final boolean terminateOnNextException;
private final Publisher<T> original;
private final BiFunction<Integer, TerminalNotification, Completable> shouldRedo;
private final boolean forRetry;
Expand All @@ -44,16 +45,20 @@ final class RedoWhenPublisher<T> extends AbstractNoHandleSubscribePublisher<T> {
* New instance.
*
* @param original {@link Publisher} on which this operator is applied.
* @param shouldRedo {@link BiFunction} to create a {@link Completable} that determines whether to redo the
* operation.
* @param forRetry If redo has to be done for error i.e. it is used for retry. If {@code true} completion for
* original source will complete the subscriber. Otherwise, error will send the error to the subscriber.
* @param terminateOnNextException {@code true} exceptions from {@link Subscriber#onNext(Object)} will be caught and
* terminated inside this operator (and the {@link Subscription} will be cancelled). {@code false} means exceptions
* from {@link Subscriber#onNext(Object)} will not be caught.
* @param shouldRedo {@link BiFunction} to create a {@link Completable} that determines whether to redo the
* operation.
*/
RedoWhenPublisher(Publisher<T> original, BiFunction<Integer, TerminalNotification, Completable> shouldRedo,
boolean forRetry) {
RedoWhenPublisher(Publisher<T> original, boolean forRetry, boolean terminateOnNextException,
BiFunction<Integer, TerminalNotification, Completable> shouldRedo) {
this.original = original;
this.shouldRedo = shouldRedo;
this.forRetry = forRetry;
this.terminateOnNextException = terminateOnNextException;
this.shouldRedo = shouldRedo;
}

@Override
Expand All @@ -62,12 +67,12 @@ void handleSubscribe(Subscriber<? super T> subscriber,
// For the current subscribe operation we want to use contextMap directly, but in the event a re-subscribe
// operation occurs we want to restore the original state of the AsyncContext map, so we save a copy upfront.
original.delegateSubscribe(
new RedoSubscriber<>(new SequentialSubscription(), 0, subscriber,
new RedoSubscriber<>(terminateOnNextException, new SequentialSubscription(), 0, subscriber,
contextMap.copy(), contextProvider, this),
contextMap, contextProvider);
}

private static final class RedoSubscriber<T> extends RedoPublisher.AbstractRedoSubscriber<T> {
private static final class RedoSubscriber<T> extends AbstractRedoSubscriber<T> {
private final SequentialCancellable cancellable;
private final RedoWhenPublisher<T> redoPublisher;
private final ContextMap contextMap;
Expand All @@ -87,6 +92,8 @@ public void onComplete() {

@Override
public void onError(Throwable t) {
// No need to check if terminated already because this Completable is only subscribed
// if a terminal is received from upstream but not yet delivered downstream.
if (!redoPublisher.forRetry) {
// repeat operator terminates repeat with error.
subscriber.onComplete();
Expand All @@ -96,24 +103,18 @@ public void onError(Throwable t) {
}
};

RedoSubscriber(SequentialSubscription subscription, int redoCount, Subscriber<? super T> subscriber,
ContextMap contextMap, AsyncContextProvider contextProvider,
RedoSubscriber(boolean terminateOnNextException, SequentialSubscription subscription, int redoCount,
Subscriber<? super T> subscriber, ContextMap contextMap, AsyncContextProvider contextProvider,
RedoWhenPublisher<T> redoPublisher) {
super(subscription, redoCount, subscriber);
super(terminateOnNextException, subscription, redoCount, subscriber);
this.redoPublisher = redoPublisher;
this.contextMap = contextMap;
this.contextProvider = contextProvider;
cancellable = new SequentialCancellable();
}

@Override
public void onNext(T t) {
subscription.itemReceived();
subscriber.onNext(t);
}

@Override
public void onError(Throwable t) {
void onError0(Throwable t) {
if (!redoPublisher.forRetry) {
subscriber.onError(t);
return;
Expand All @@ -124,7 +125,7 @@ public void onError(Throwable t) {
}

@Override
public void onComplete() {
void onComplete0() {
if (redoPublisher.forRetry) {
subscriber.onComplete();
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@ void originalSourceIsRetriedIfSubscriberThrows() {
AtomicInteger counter = new AtomicInteger();
toSource(defer(() -> from(counter.incrementAndGet()))
.whenOnNext(items::add)
.retry((i, t) -> i < 3 && t == DELIBERATE_EXCEPTION)
.retry(false, (i, t) -> i < 3 && t == DELIBERATE_EXCEPTION)
.buffer(new TestBufferStrategy(bPublisher, 1)))
.subscribe(new Subscriber<Integer>() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package io.servicetalk.concurrent.api.publisher;

import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.TestPublisher;
import io.servicetalk.concurrent.api.TestSubscription;
import io.servicetalk.concurrent.test.internal.TestPublisherSubscriber;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntPredicate;

import static io.servicetalk.concurrent.api.SourceAdapters.toSource;
Expand All @@ -41,8 +43,7 @@
import static org.mockito.Mockito.when;

class RepeatTest {

private final TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();
private TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();
private final TestPublisher<Integer> source = new TestPublisher<>();
private final IntPredicate shouldRepeat = mock(IntPredicate.class);
private boolean shouldRepeatValue;
Expand Down Expand Up @@ -131,4 +132,33 @@ void testCancel() {
source.onComplete();
assertTrue(subscription.isCancelled());
}

@Test
void exceptionAfterRetryPreservesDemand() {
final Integer[] signals = new Integer[] {1, 2, 3};
final AtomicInteger onNextCount = new AtomicInteger();
subscriber = new TestPublisherSubscriber<>();
toSource(Publisher.from(signals)
// First repeat function will catch the error from onNext and propagate downstream to the second
// retry function. After the second repeat operator completes, this operator will trigger another repeat
// so we expect to see values from signals array twice.
.repeat(true, i -> i == 1)
.validateOutstandingDemand()
.map(t -> {
if (onNextCount.getAndIncrement() == 0) {
throw DELIBERATE_EXCEPTION;
}
return t;
})
.onErrorComplete()
// Second retry function will kick in and resubscribe generating new state.
.repeat(i -> i == 1)
.validateOutstandingDemand()
).subscribe(subscriber);

subscriber.awaitSubscription().request(signals.length * 2);
assertThat(subscriber.takeOnNext(signals.length), contains(signals));
assertThat(subscriber.takeOnNext(signals.length), contains(signals));
subscriber.awaitOnComplete();
}
}

0 comments on commit 3843dd4

Please sign in to comment.