Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trigger Single to CS conversion on first CS method call #1886

Merged
merged 1 commit into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@

package io.helidon.common.reactive;

import java.util.LinkedList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
Expand All @@ -34,14 +32,10 @@
*/
public class CompletionAwaitable<T> implements CompletionStage<T>, Awaitable<T> {

private final AtomicBoolean triggeredSubscription = new AtomicBoolean();

private Supplier<CompletionStage<T>> originalStage;
private LinkedList<Runnable> subscribeTrigger = new LinkedList<>();

CompletionAwaitable(Supplier<CompletionStage<T>> originalStage, CompletionAwaitable<?> parent) {
this.originalStage = originalStage;
this.subscribeTrigger = parent.subscribeTrigger;
}

CompletionAwaitable() {
Expand All @@ -51,10 +45,6 @@ void setOriginalStage(final Supplier<CompletionStage<T>> originalStage) {
this.originalStage = originalStage;
}

void addSubscribeTrigger(final Runnable runnable) {
this.subscribeTrigger.addLast(runnable);
}

@Override
public <U> CompletionAwaitable<U> thenApply(final Function<? super T, ? extends U> fn) {
CompletionStage<U> completionStage = originalStage.get().thenApply(fn);
Expand Down Expand Up @@ -306,14 +296,6 @@ public CompletionAwaitable<T> exceptionally(final Function<Throwable, ? extends

@Override
public CompletableFuture<T> toCompletableFuture() {
CompletableFuture<T> future = originalStage.get().toCompletableFuture();
triggerSubscription();
return future;
}

private void triggerSubscription() {
if (triggeredSubscription.compareAndSet(false, true)) {
subscribeTrigger.forEach(Runnable::run);
}
return originalStage.get().toCompletableFuture();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@
package io.helidon.common.reactive;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;

import io.helidon.common.LazyValue;

/**
* Single as CompletionStage.
Expand All @@ -27,23 +28,15 @@
*/
public abstract class CompletionSingle<T> extends CompletionAwaitable<T> implements Single<T> {

private final AtomicReference<CompletableFuture<T>> stageReference = new AtomicReference<>();
private final CompletableFuture<Void> cancelFuture = new CompletableFuture<>();

protected CompletionSingle() {
setOriginalStage(this::getLazyStage);
}

private CompletableFuture<T> getLazyStage() {
if (stageReference.get() == null) {
stageReference.set(toNullableStage());
}
return stageReference.get();
LazyValue<CompletableFuture<T>> lazyStage = LazyValue.create(this::toNullableStage);
setOriginalStage(lazyStage::get);
}

protected CompletableFuture<T> toNullableStage() {
SingleToFuture<T> subscriber = new SingleToFuture<>(true);
//addSubscribeTrigger(() -> this.subscribe(subscriber));
this.subscribe(subscriber);
return subscriber;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package io.helidon.common.reactive;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;

Expand All @@ -38,11 +37,4 @@ final class SingleFromCompletionStage<T> extends CompletionSingle<T> {
public void subscribe(Flow.Subscriber<? super T> subscriber) {
MultiFromCompletionStage.subscribe(subscriber, source, nullMeansEmpty);
}

@Override
protected CompletableFuture<T> toNullableStage() {
SingleToFuture<T> subscriber = new SingleToFuture<>(true);
this.subscribe(subscriber);
return subscriber;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.testng.Assert.assertThrows;

import org.junit.jupiter.api.Test;

public class AwaitTest {

private static final long EXPECTED_SUM = 10L;
Expand All @@ -52,20 +52,17 @@ void sameInstanceCallbacks() throws ExecutionException, InterruptedException {

Single<String> future =
Single.just("1")
.peek(it -> System.out.println("peekaboo"))
.peek(peekFuture::complete);
future.thenAccept(whenCompleteFuture::complete);

assertThat("Peek needs to be invoked at first CompletionStage method!", peekFuture.isDone(), is(true));
assertThat("WhenComplete needs to be invoked at first CompletionStage method!", whenCompleteFuture.isDone(), is(true));
assertThat("Peek needs to be invoked with first call to CS method!", peekFuture.isDone(), is(not(true)));

future.thenAccept(whenCompleteFuture::complete);

future.toCompletableFuture();
future.toCompletableFuture();
future.await();
future.await(100, TimeUnit.MILLISECONDS);

// assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true));
assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true));
assertThat(peekFuture.get(), is(equalTo("1")));
//assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true));
assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true));
assertThat(whenCompleteFuture.get(), is(equalTo("1")));
}

Expand All @@ -74,86 +71,84 @@ void lazyCSConversion() throws ExecutionException, InterruptedException {
CompletableFuture<String> peekFuture = new CompletableFuture<>();
CompletableFuture<String> whenCompleteFuture = new CompletableFuture<>();

CompletionAwaitable<String> awaitable = Single.just("1")
.peek(peekFuture::complete)
.whenComplete((s, throwable) -> whenCompleteFuture.complete(s));
Single<String> single = Single.just("1")
.peek(peekFuture::complete);

assertThat("Peek needs to be invoked!", peekFuture.isDone(), is(true));
assertThat(peekFuture.get(), is(equalTo("1")));
assertThat("WhenComplete needs to be invoked!", whenCompleteFuture.isDone(), is(true));
assertThat(whenCompleteFuture.get(), is(equalTo("1")));
assertThat("Peek needs to be invoked at first CS method!", peekFuture.isDone(), is(not(true)));

awaitable.await(100, TimeUnit.MILLISECONDS);
single.whenComplete((s, throwable) -> whenCompleteFuture.complete(s));

single.await(100, TimeUnit.MILLISECONDS);

assertThat("Peek needs to be invoked at await!", peekFuture.isDone(), is(true));
assertThat(peekFuture.get(), is(equalTo("1")));
assertThat("WhenComplete needs to be invoked at await!", whenCompleteFuture.isDone(), is(true));
assertThat(whenCompleteFuture.get(), is(equalTo("1")));
}


@Test
void lazyCSConversionCallbackOrderSingle() {
void callbackOrderSingle() {
List<Integer> result = new ArrayList<>();
Consumer<Integer> registerCall = result::add;
AtomicInteger cnt = new AtomicInteger(0);

CompletionAwaitable<String> awaitable = Single.just("2")
.flatMapSingle(Single::just)
.peek(s -> registerCall.accept(1))
.peek(s -> result.add(1))
.map(s -> {
registerCall.accept(2);
result.add(2);
return s;
})
.flatMapSingle(Single::just)
.peek(s -> registerCall.accept(3))
.peek(s -> result.add(3))
.flatMapSingle(Single::just)
.map(s -> {
registerCall.accept(4);
result.add(4);
return s;
})
.flatMapSingle(Single::just)
.whenComplete((s, throwable) -> registerCall.accept(5))
.whenComplete((s, throwable) -> result.add(5))
.thenApply(s -> {
registerCall.accept(6);
result.add(6);
return s;
})
.whenComplete((s, throwable) -> registerCall.accept(7));
.whenComplete((s, throwable) -> result.add(7));

awaitable.await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS);

List<Integer> expected = IntStream.rangeClosed(1, 7).boxed().collect(Collectors.toList());

assertThat(result, equalTo(expected));
assertThat(result, equalTo(IntStream.rangeClosed(1, 7).boxed().collect(Collectors.toList())));
}


@Test
void lazyCSConversionCallbackOrderMulti() {
void callbackOrderMulti() {
List<Integer> result = new ArrayList<>();
AtomicInteger cnt = new AtomicInteger(0);
Runnable registerCall = () -> result.add(cnt.incrementAndGet());

CompletionAwaitable<Void> awaitable = Multi.just(1L, 2L, 3L)
CompletionAwaitable<Void> awaitable = Multi.just(1L)
.flatMap(Single::just)
.peek(s -> registerCall.run())
.peek(s -> result.add(1))
.map(s -> {
registerCall.run();
result.add(2);
return s;
})
.flatMap(Single::just)
.peek(s -> registerCall.run())
.peek(s -> result.add(3))
.flatMap(Single::just)
.map(s -> {
registerCall.run();
result.add(4);
return s;
})
.flatMap(Single::just)
.forEach(aLong -> registerCall.run())
.whenComplete((s, throwable) -> registerCall.run())
.forEach(aLong -> result.add(5))
.whenComplete((s, throwable) -> result.add(6))
.thenApply(s -> {
registerCall.run();
result.add(7);
return s;
})
.whenComplete((s, throwable) -> registerCall.run());
.whenComplete((s, throwable) -> result.add(8));

//For each operator triggers stream on its own, we are just waiting for it to finish
awaitable.await(SAFE_WAIT_MILLIS, TimeUnit.MILLISECONDS);
assertThat(result, equalTo(IntStream.rangeClosed(1, cnt.get()).boxed().collect(Collectors.toList())));
assertThat(result, equalTo(IntStream.rangeClosed(1, 8).boxed().collect(Collectors.toList())));
}

@Test
Expand Down