Skip to content

Commit

Permalink
small improvement upgrating to Java 8
Browse files Browse the repository at this point in the history
  • Loading branch information
otaviojava committed Jul 17, 2018
1 parent 9d41560 commit e62a453
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 25 deletions.
Expand Up @@ -38,18 +38,16 @@ public class CancelStageVerification extends AbstractStageVerification {
@Test
public void cancelStageShouldCancelTheStage() {
CompletableFuture<Void> cancelled = new CompletableFuture<>();
CompletionStage<Void> result = ReactiveStreams.fromPublisher(s -> {
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
}
CompletionStage<Void> result = ReactiveStreams.fromPublisher(s -> s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
}

@Override
public void cancel() {
cancelled.complete(null);
}
});
}).cancel().run(getEngine());
@Override
public void cancel() {
cancelled.complete(null);
}
})).cancel().run(getEngine());
await(cancelled);
await(result);
}
Expand Down
Expand Up @@ -100,7 +100,7 @@ public void cancel() {
}

CompletionStage<List<Integer>> result = ReactiveStreams.of(1, 2, 3, 4, 5)
.flatMapPublisher(id -> new ScheduledPublisher(id))
.flatMapPublisher(ScheduledPublisher::new)
.toList()
.run(getEngine());

Expand Down
Expand Up @@ -42,9 +42,7 @@ public class PeekStageVerification extends AbstractStageVerification {
public void peekStageShouldNotModifyElements() {
AtomicInteger count = new AtomicInteger();
assertEquals(await(ReactiveStreams.of(1, 2, 3)
.peek(i -> {
count.incrementAndGet();
})
.peek(i -> count.incrementAndGet())
.toList()
.run(getEngine())), Arrays.asList(1, 2, 3));
assertEquals(count.get(), 3);
Expand Down
Expand Up @@ -51,17 +51,15 @@ public void subscriberStageShouldRedeemCompletionStageWhenFailed() {

@Test(expectedExceptions = CancellationException.class)
public void subscriberStageShouldRedeemCompletionStageWithCancellationExceptionWhenCancelled() {
CompletionStage<Void> result = ReactiveStreams.fromPublisher(subscriber -> {
subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
}
CompletionStage<Void> result = ReactiveStreams.fromPublisher(subscriber -> subscriber.onSubscribe(new Subscription() {
@Override
public void request(long n) {
}

@Override
public void cancel() {
}
});
}).to(
@Override
public void cancel() {
}
})).to(
ReactiveStreams.builder().cancel().build(getEngine())
).run(getEngine());
await(result);
Expand Down

0 comments on commit e62a453

Please sign in to comment.