Skip to content

Commit

Permalink
NettyChannelPublisher cancel active subscriber should terminate
Browse files Browse the repository at this point in the history
Motivation:
NettyChannelPublisher allows for resubscribes, and delivers queued data to the new Subscriber. If the previous Subscriber consumed some data and cancelled this may lead to the new Subscriber receiveing partial data from the last request.

Modifications:
- NettyChannelPublisher should discard any pending data, and deliver an error to new Subscribers if a previously active Subscriber cancels.

Result:
Resubscribes to NettyChannelPublisher won't get partial data for previous requests, like in the case of client pipelinining.
  • Loading branch information
Scottmitch committed Mar 23, 2020
1 parent 666a16b commit 3cf0a1c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
Expand Up @@ -234,6 +234,12 @@ private void cancel(SubscriptionImpl forSubscription) {
return;
}
resetSubscription();

// If a cancel occurs with a valid subscription we need to clear any pending data and set a fatalError so that
// any future Subscribers don't get partial data delivered from the queue.
pending = null;
fatalError = StacklessClosedChannelException.newInstance(NettyChannelPublisher.class, "cancel");

// If an incomplete subscriber is cancelled then close channel. A subscriber can cancel after getting complete,
// which should not close the channel.
closeChannelInbound();
Expand Down Expand Up @@ -270,7 +276,7 @@ private void subscribe0(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(EMPTY_SUBSCRIPTION);
subscriber.onError(new DuplicateSubscribeException(subscription.associatedSub, subscriber));
} else {
requestCount = 0; // Don't pollute requested count between subscribers
assert requestCount == 0;
subscription = new SubscriptionImpl(subscriber);
this.subscription = subscription;
subscriber.onSubscribe(subscription);
Expand Down
Expand Up @@ -20,6 +20,7 @@
import io.servicetalk.concurrent.PublisherSource.Subscription;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.TestCollectingPublisherSubscriber;
import io.servicetalk.concurrent.api.TestPublisherSubscriber;
import io.servicetalk.concurrent.internal.DuplicateSubscribeException;
import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout;
Expand Down Expand Up @@ -56,6 +57,7 @@
import static java.util.Objects.requireNonNull;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
Expand Down Expand Up @@ -111,6 +113,27 @@ public void tearDown() throws Exception {
}
}

@Test
public void testCancelThenResubscribeDeliversErrorAndNotQueuedData() throws InterruptedException {
TestCollectingPublisherSubscriber<Integer> subscriber1 = new TestCollectingPublisherSubscriber<>();
TestCollectingPublisherSubscriber<Integer> subscriber2 = new TestCollectingPublisherSubscriber<>();
toSource(publisher).subscribe(subscriber1);
Subscription subscription1 = subscriber1.awaitSubscription();
subscription1.request(1);

assertFalse(channel.writeInbound(1));
Integer next = subscriber1.takeOnNext();
assertThat(next, is(1));
assertFalse(channel.writeInbound(2)); // this write should be queued, because there isn't any requestN demand.

subscription1.cancel(); // cancel of active subscription should clear the queue and fail future Subscribers.

toSource(publisher).subscribe(subscriber2);
subscriber2.awaitSubscription().request(Long.MAX_VALUE);
assertThat(subscriber2.pollAllOnNext(), is(empty()));
assertThat(subscriber2.awaitOnError(), is(instanceOf(ClosedChannelException.class)));
}

@Test
public void testSupplyEqualsDemand() {
toSource(publisher).subscribe(subscriber);
Expand Down

0 comments on commit 3cf0a1c

Please sign in to comment.