Skip to content

Commit

Permalink
Trigger Single to CS conversion on first CS method call (#1886)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
  • Loading branch information
danielkec committed May 28, 2020
1 parent f463c53 commit 41cbf30
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package io.helidon.common.reactive;

import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand All @@ -34,14 +32,10 @@
*/
public class CompletionAwaitable<T> implements CompletionStage<T>, Awaitable<T> {

private final AtomicBoolean triggeredSubscription = new AtomicBoolean();

private Supplier<CompletionStage<T>> originalStage;
private LinkedList<Runnable> subscribeTrigger = new LinkedList<>();

CompletionAwaitable(Supplier<CompletionStage<T>> originalStage, CompletionAwaitable<?> parent) {
this.originalStage = originalStage;
this.subscribeTrigger = parent.subscribeTrigger;
}

CompletionAwaitable() {
Expand All @@ -51,10 +45,6 @@ void setOriginalStage(final Supplier<CompletionStage<T>> originalStage) {
this.originalStage = originalStage;
}

void addSubscribeTrigger(final Runnable runnable) {
this.subscribeTrigger.addLast(runnable);
}

@Override
public <U> CompletionAwaitable<U> thenApply(final Function<? super T, ? extends U> fn) {
CompletionStage<U> completionStage = originalStage.get().thenApply(fn);
Expand Down Expand Up @@ -306,14 +296,6 @@ public CompletionAwaitable<T> exceptionally(final Function<Throwable, ? extends

@Override
public CompletableFuture<T> toCompletableFuture() {
CompletableFuture<T> future = originalStage.get().toCompletableFuture();
triggerSubscription();
return future;
}

private void triggerSubscription() {
if (triggeredSubscription.compareAndSet(false, true)) {
subscribeTrigger.forEach(Runnable::run);
}
return originalStage.get().toCompletableFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package io.helidon.common.reactive;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import io.helidon.common.LazyValue;

/**
* Single as CompletionStage.
Expand All @@ -27,23 +28,15 @@
*/
public abstract class CompletionSingle<T> extends CompletionAwaitable<T> implements Single<T> {

private final AtomicReference<CompletableFuture<T>> stageReference = new AtomicReference<>();
private final CompletableFuture<Void> cancelFuture = new CompletableFuture<>();

protected CompletionSingle() {
setOriginalStage(this::getLazyStage);
}

private CompletableFuture<T> getLazyStage() {
if (stageReference.get() == null) {
stageReference.set(toNullableStage());
}
return stageReference.get();
LazyValue<CompletableFuture<T>> lazyStage = LazyValue.create(this::toNullableStage);
setOriginalStage(lazyStage::get);
}

protected CompletableFuture<T> toNullableStage() {
SingleToFuture<T> subscriber = new SingleToFuture<>(true);
//addSubscribeTrigger(() -> this.subscribe(subscriber));
this.subscribe(subscriber);
return subscriber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.helidon.common.reactive;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;

Expand All @@ -38,11 +37,4 @@ final class SingleFromCompletionStage<T> extends CompletionSingle<T> {
public void subscribe(Flow.Subscriber<? super T> subscriber) {
MultiFromCompletionStage.subscribe(subscriber, source, nullMeansEmpty);
}

@Override
protected CompletableFuture<T> toNullableStage() {
SingleToFuture<T> subscriber = new SingleToFuture<>(true);
this.subscribe(subscriber);
return subscriber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.testng.Assert.assertThrows;

import org.junit.jupiter.api.Test;

public class AwaitTest {

private static final long EXPECTED_SUM = 10L;
Expand All @@ -52,20 +52,17 @@ void sameInstanceCallbacks() throws ExecutionException, InterruptedException {

Single<String> future =
Single.just("1")
.peek(it -> System.out.println("peekaboo"))
.peek(peekFuture::complete);
future.thenAccept(whenCompleteFuture::complete);

assertThat("Peek needs to be invoked at first CompletionStage method!", peekFuture.isDone(), is(true));
assertThat("WhenComplete needs to be invoked at first CompletionStage method!", whenCompleteFuture.isDone(), is(true));
assertThat("Peek needs to be invoked with first call to CS method!", peekFuture.isDone(), is(not(true)));

future.thenAccept(whenCompleteFuture::complete);

future.toCompletableFuture();
future.toCompletableFuture();
future.await();
future.await(100, TimeUnit.MILLISECONDS);

// assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true));
assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true));
assertThat(peekFuture.get(), is(equalTo("1")));
//assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true));
assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true));
assertThat(whenCompleteFuture.get(), is(equalTo("1")));
}

Expand All @@ -74,86 +71,84 @@ void lazyCSConversion() throws ExecutionException, InterruptedException {
CompletableFuture<String> peekFuture = new CompletableFuture<>();
CompletableFuture<String> whenCompleteFuture = new CompletableFuture<>();

CompletionAwaitable<String> awaitable = Single.just("1")
.peek(peekFuture::complete)
.whenComplete((s, throwable) -> whenCompleteFuture.complete(s));
Single<String> single = Single.just("1")
.peek(peekFuture::complete);

assertThat("Peek needs to be invoked!", peekFuture.isDone(), is(true));
assertThat(peekFuture.get(), is(equalTo("1")));
assertThat("WhenComplete needs to be invoked!", whenCompleteFuture.isDone(), is(true));
assertThat(whenCompleteFuture.get(), is(equalTo("1")));
assertThat("Peek needs to be invoked at first CS method!", peekFuture.isDone(), is(not(true)));

awaitable.await(100, TimeUnit.MILLISECONDS);
single.whenComplete((s, throwable) -> whenCompleteFuture.complete(s));

single.await(100, TimeUnit.MILLISECONDS);

assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true));
assertThat(peekFuture.get(), is(equalTo("1")));
assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true));
assertThat(whenCompleteFuture.get(), is(equalTo("1")));
}


@Test
void lazyCSConversionCallbackOrderSingle() {
void callbackOrderSingle() {
List<Integer> result = new ArrayList<>();
Consumer<Integer> registerCall = result::add;
AtomicInteger cnt = new AtomicInteger(0);

CompletionAwaitable<String> awaitable = Single.just("2")
.flatMapSingle(Single::just)
.peek(s -> registerCall.accept(1))
.peek(s -> result.add(1))
.map(s -> {
registerCall.accept(2);
result.add(2);
return s;
})
.flatMapSingle(Single::just)
.peek(s -> registerCall.accept(3))
.peek(s -> result.add(3))
.flatMapSingle(Single::just)
.map(s -> {
registerCall.accept(4);
result.add(4);
return s;
})
.flatMapSingle(Single::just)
.whenComplete((s, throwable) -> registerCall.accept(5))
.whenComplete((s, throwable) -> result.add(5))
.thenApply(s -> {
registerCall.accept(6);
result.add(6);
return s;
})
.whenComplete((s, throwable) -> registerCall.accept(7));
.whenComplete((s, throwable) -> result.add(7));

awaitable.await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS);

List<Integer> expected = IntStream.rangeClosed(1, 7).boxed().collect(Collectors.toList());

assertThat(result, equalTo(expected));
assertThat(result, equalTo(IntStream.rangeClosed(1, 7).boxed().collect(Collectors.toList())));
}


@Test
void lazyCSConversionCallbackOrderMulti() {
void callbackOrderMulti() {
List<Integer> result = new ArrayList<>();
AtomicInteger cnt = new AtomicInteger(0);
Runnable registerCall = () -> result.add(cnt.incrementAndGet());

CompletionAwaitable<Void> awaitable = Multi.just(1L, 2L, 3L)
CompletionAwaitable<Void> awaitable = Multi.just(1L)
.flatMap(Single::just)
.peek(s -> registerCall.run())
.peek(s -> result.add(1))
.map(s -> {
registerCall.run();
result.add(2);
return s;
})
.flatMap(Single::just)
.peek(s -> registerCall.run())
.peek(s -> result.add(3))
.flatMap(Single::just)
.map(s -> {
registerCall.run();
result.add(4);
return s;
})
.flatMap(Single::just)
.forEach(aLong -> registerCall.run())
.whenComplete((s, throwable) -> registerCall.run())
.forEach(aLong -> result.add(5))
.whenComplete((s, throwable) -> result.add(6))
.thenApply(s -> {
registerCall.run();
result.add(7);
return s;
})
.whenComplete((s, throwable) -> registerCall.run());
.whenComplete((s, throwable) -> result.add(8));

//For each operator triggers stream on its own, we are just waiting for it to finish
awaitable.await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS);
assertThat(result, equalTo(IntStream.rangeClosed(1, cnt.get()).boxed().collect(Collectors.toList())));
assertThat(result, equalTo(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList())));
}

@Test
Expand Down

0 comments on commit 41cbf30

Please sign in to comment.