Skip to content

Commit

Permalink
Publisher.multicast late subscriber cancel demand bug (#2683)
Browse files Browse the repository at this point in the history
Motivation:
Publisher.multicast tracks the active subscriber with the
least demand in order to control upstream demand. When cancel
happens if the subscriber with minimum demand is removed the
delta with the new minimum should be propagated upstream. However
there is a bug where the initial demand offset isn't considered
for late subscribers that get cancelled with may lead to not
propagating demand upstream and "hanging".
  • Loading branch information
Scottmitch committed Aug 30, 2023
1 parent 47fb42e commit a121e7f
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ long processCancelEvent(MulticastFixedSubscriber<T> subscriber) {
// processSubscribeEvent sets the initial value of new subscribers to the value of the current minimum.
// We need to subtract the initial value to request the delta between the old min.
return min == null ? 0 : min.priorityQueueValue - min.initPriorityQueueValue -
subscriber.priorityQueueValue;
(subscriber.priorityQueueValue - subscriber.initPriorityQueueValue);
}
return 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,43 @@ void cancelMinSubscriberRespectsQueueLimit() throws InterruptedException {
assertThat(subscription.requested(), is((long) queueLimit));
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
void threeSubscribersOneCancelRequestsUpstream(boolean cancel2First) throws InterruptedException {
Publisher<Integer> publisher = source.multicast(1, true);
toSource(publisher).subscribe(subscriber1);
Subscription localSubscription1 = subscriber1.awaitSubscription();
localSubscription1.request(1);
subscription.awaitRequestN(1);
toSource(publisher).subscribe(subscriber2);
Subscription localSubscription2 = subscriber2.awaitSubscription();
source.onNext(1);
assertThat(subscriber1.takeOnNext(), is(1));

toSource(publisher).subscribe(subscriber3);
Subscription localSubscription3 = subscriber3.awaitSubscription();

localSubscription1.request(1);
assertThat(subscription.requested(), is(1L));
if (cancel2First) {
localSubscription2.cancel();
localSubscription3.request(1);
} else {
localSubscription3.request(1);
assertThat(subscription.requested(), is(1L));
localSubscription2.cancel();
}
subscription.awaitRequestN(2);
source.onNext(2);
assertThat(subscriber1.takeOnNext(), is(2));
assertThat(subscriber3.takeOnNext(), is(2));
source.onComplete();
subscriber1.awaitOnComplete();
subscriber3.awaitOnComplete();
assertThat(subscriber2.pollOnNext(10, MILLISECONDS), is(nullValue()));
assertThat(subscriber2.pollTerminal(10, MILLISECONDS), is(nullValue()));
}

@ParameterizedTest
@MethodSource("trueFalseStream")
void threeSubscribersOneLateAfterCancel(boolean cancelMax, boolean cancelUpstream) throws InterruptedException {
Expand Down

0 comments on commit a121e7f

Please sign in to comment.