Skip to content

Commit

Permalink
replace call to run with run(getEngine())
Browse files Browse the repository at this point in the history
Issue found by James in #116.
The fix is trivial.

Signed-off-by: Clement Escoffier <clement.escoffier@gmail.com>
  • Loading branch information
cescoffier committed Dec 12, 2018
1 parent 328f261 commit 4c74609
Showing 1 changed file with 8 additions and 8 deletions.
Expand Up @@ -51,7 +51,7 @@ public void coupledStageShouldCancelAndCompleteUpstreamWhenDownstreamCancels() {
rs.coupled(rs.builder().onComplete(() -> subscriberCompleted.complete(null)).ignore(),
idlePublisher())
).cancel()
.run();
.run(getEngine());

await(subscriberCompleted);
await(upstreamCancelled);
Expand All @@ -68,7 +68,7 @@ public void coupledStageShouldCancelAndCompleteUpstreamWhenPublisherCompletes()
rs.coupled(rs.builder().onComplete(() -> subscriberCompleted.complete(null)).ignore(),
rs.empty())
).ignore()
.run();
.run(getEngine());

await(subscriberCompleted);
await(upstreamCancelled);
Expand All @@ -85,7 +85,7 @@ public void coupledStageShouldCancelAndCompleteUpstreamWhenPublisherFails() {
rs.coupled(rs.builder().onError(subscriberFailed::complete).ignore(),
rs.failed(new QuietRuntimeException("failed")))
).ignore()
.run();
.run(getEngine());

assertTrue(await(subscriberFailed) instanceof QuietRuntimeException);
await(upstreamCancelled);
Expand All @@ -102,7 +102,7 @@ public void coupledStageShouldCancelAndCompleteDownstreamWhenUpstreamCompletes()
idlePublisher().onTerminate(() -> publisherCancelled.complete(null)))
).onComplete(() -> downstreamCompleted.complete(null))
.ignore()
.run();
.run(getEngine());

await(publisherCancelled);
await(downstreamCompleted);
Expand All @@ -119,7 +119,7 @@ public void coupledStageShouldCancelAndFailDownstreamWhenUpstreamFails() {
idlePublisher().onTerminate(() -> publisherCancelled.complete(null)))
).onError(downstreamFailed::complete)
.ignore()
.run();
.run(getEngine());

await(publisherCancelled);
assertTrue(await(downstreamFailed) instanceof QuietRuntimeException);
Expand All @@ -136,7 +136,7 @@ public void coupledStageShouldCancelAndCompleteDownstreamWhenSubscriberCancels()
idlePublisher().onTerminate(() -> publisherCancelled.complete(null)))
).onComplete(() -> downstreamCompleted.complete(null))
.ignore()
.run();
.run(getEngine());

await(publisherCancelled);
await(downstreamCompleted);
Expand All @@ -145,8 +145,8 @@ public void coupledStageShouldCancelAndCompleteDownstreamWhenSubscriberCancels()
@Test
public void coupledStageShouldBeResuable() {
ProcessorBuilder<Object, Integer> coupled = rs.coupled(rs.builder().ignore(), rs.of(1, 2, 3));
assertEquals(await(idlePublisher().via(coupled).toList().run()), Arrays.asList(1, 2, 3));
assertEquals(await(idlePublisher().via(coupled).toList().run()), Arrays.asList(1, 2, 3));
assertEquals(await(idlePublisher().via(coupled).toList().run(getEngine())), Arrays.asList(1, 2, 3));
assertEquals(await(idlePublisher().via(coupled).toList().run(getEngine())), Arrays.asList(1, 2, 3));
}

@Override
Expand Down

0 comments on commit 4c74609

Please sign in to comment.