Skip to content

Commit

Permalink
Publisher.timeoutDemand(Duration) not timing out demand (#2652)
Browse files Browse the repository at this point in the history
Motivation:
The Publisher.timeoutDemand(Duration) method calls timeout operator
overload instead of the timeoutDemand overload and is therefor applying
the wrong timeout.
  • Loading branch information
Scottmitch committed Jul 14, 2023
1 parent 704da42 commit d4f0d3d
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2151,7 +2151,7 @@ public final Publisher<T> timeoutDemand(long duration, TimeUnit unit,
* @see #timeoutDemand(Duration, io.servicetalk.concurrent.Executor)
*/
public final Publisher<T> timeoutDemand(Duration duration) {
return timeout(duration, global());
return timeoutDemand(duration, global());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
Expand Down Expand Up @@ -96,9 +97,13 @@ boolean restartAtOnNext() {
@RegisterExtension
static final ExecutorExtension<TestExecutor> executorExtension = ExecutorExtension.withTestExecutor();

private final TestPublisher<Integer> publisher = new TestPublisher<>();
private final TestPublisherSubscriber<Integer> subscriber = new TestPublisherSubscriber<>();
private final TestSubscription subscription = new TestSubscription();
private final TestPublisher<Integer> publisher = new TestPublisher.Builder<Integer>().disableAutoOnSubscribe()
.build(sub -> {
sub.onSubscribe(subscription);
return sub;
});
private TestExecutor testExecutor;

@BeforeEach
Expand Down Expand Up @@ -148,7 +153,6 @@ public Cancellable schedule(final Runnable task, final long delay, final TimeUni
}
})
).subscribe(subscriber);
publisher.onSubscribe(subscription);

assertThat(subscriber.awaitOnError(), sameInstance(DELIBERATE_EXCEPTION));
assertTrue(subscription.isCancelled());
Expand Down Expand Up @@ -393,6 +397,24 @@ void timeoutDemandTimerCancellation() throws ExecutionException, InterruptedExce
}
}

@Test
void timeoutDemandDefaultExecutor() throws InterruptedException {
final int millis = 300;
toSource(publisher.timeoutDemand(ofMillis(millis)))
.subscribe(subscriber);

subscriber.awaitSubscription().request(1);
subscription.awaitRequestN(1);

// Wait until the expiration time expires to verify demand is being timed out, not onNext.
Thread.sleep(millis * 2);
publisher.onNext(1);

assertThat(subscriber.takeOnNext(), equalTo(1));
assertThat(subscriber.awaitOnError(), instanceOf(TimeoutException.class));
subscription.awaitCancelled();
}

private void init(TimerBehaviorParam params) {
init(params, ofNanos(1));
}
Expand Down

0 comments on commit d4f0d3d

Please sign in to comment.