Skip to content

Commit

Permalink
Implementation of MP FT 2.1.1 using FT SE (#2348)
Browse files Browse the repository at this point in the history
* Replacing FailSafe and Hystrix by our own implementation of FT primitives. Some minor changes to our first version of these primitive operations was necessary to be fully compatible with MP and pass all the TCKs.

Signed-off-by: Santiago Pericasgeertsen <santiago.pericasgeertsen@oracle.com>
  • Loading branch information
spericas committed Sep 14, 2020
1 parent ec0a126 commit 74956be
Show file tree
Hide file tree
Showing 63 changed files with 1,699 additions and 2,814 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {

static <T> void subscribe(Flow.Subscriber<? super T> subscriber, CompletionStage<T> source, boolean nullMeansEmpty) {
AtomicBiConsumer<T> watcher = new AtomicBiConsumer<>();
CompletionStageSubscription<T> css = new CompletionStageSubscription<>(subscriber, nullMeansEmpty, watcher);
CompletionStageSubscription<T> css = new CompletionStageSubscription<>(subscriber, nullMeansEmpty, watcher, source);
watcher.lazySet(css);

subscriber.onSubscribe(css);
Expand All @@ -55,10 +55,13 @@ static final class CompletionStageSubscription<T> extends DeferredScalarSubscrip

private final AtomicBiConsumer<T> watcher;

CompletionStageSubscription(Flow.Subscriber<? super T> downstream, boolean nullMeansEmpty, AtomicBiConsumer<T> watcher) {
private CompletionStage<T> source;
CompletionStageSubscription(Flow.Subscriber<? super T> downstream, boolean nullMeansEmpty,
AtomicBiConsumer<T> watcher, CompletionStage<T> source) {
super(downstream);
this.nullMeansEmpty = nullMeansEmpty;
this.watcher = watcher;
this.source = source;
}

@Override
Expand All @@ -77,6 +80,7 @@ public void accept(T t, Throwable throwable) {
@Override
public void cancel() {
super.cancel();
source.toCompletableFuture().cancel(true);
watcher.getAndSet(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,21 @@ default Single<Optional<T>> toOptionalSingle() {
* @return CompletionStage
*/
default CompletionStage<T> toStage() {
return toStage(false);
}

/**
* Exposes this {@link Single} instance as a {@link CompletionStage}.
* Note that if this {@link Single} completes without a value and {@code completeWithoutValue}
* is set to {@code false}, the resulting {@link CompletionStage} will be completed
* exceptionally with an {@link IllegalStateException}
*
* @param completeWithoutValue Allow completion without a value.
* @return CompletionStage
*/
default CompletionStage<T> toStage(boolean completeWithoutValue) {
try {
SingleToFuture<T> subscriber = new SingleToFuture<>(this, false);
SingleToFuture<T> subscriber = new SingleToFuture<>(this, completeWithoutValue);
this.subscribe(subscriber);
return subscriber;
} catch (Throwable ex) {
Expand Down
2 changes: 1 addition & 1 deletion dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<version.lib.microprofile-jwt>1.1.1</version.lib.microprofile-jwt>
<version.lib.microprofile-metrics-api>2.3.2</version.lib.microprofile-metrics-api>
<version.lib.microprofile-openapi-api>1.1.2</version.lib.microprofile-openapi-api>
<version.lib.microprofile-fault-tolerance-api>2.0.2</version.lib.microprofile-fault-tolerance-api>
<version.lib.microprofile-fault-tolerance-api>2.1.1</version.lib.microprofile-fault-tolerance-api>
<version.lib.microprofile-tracing>1.3.3</version.lib.microprofile-tracing>
<version.lib.microprofile-rest-client>1.3.3</version.lib.microprofile-rest-client>
<version.lib.microprofile-reactive-messaging-api>1.0</version.lib.microprofile-reactive-messaging-api>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class FtService implements Service {
.name("helidon-example-bulkhead")
.build();
this.breaker = CircuitBreaker.builder()
.volume(10)
.errorRatio(20)
.volume(4)
.errorRatio(40)
.successThreshold(1)
.delay(Duration.ofSeconds(5))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;

import io.helidon.common.LazyValue;
Expand All @@ -35,14 +36,16 @@ public <T> Single<T> invoke(Supplier<T> supplier) {
CompletableFuture<T> future = new CompletableFuture<>();
AsyncTask<T> task = new AsyncTask<>(supplier, future);

Future<?> taskFuture;
try {
executor.get().submit(task);
taskFuture = executor.get().submit(task);
} catch (Throwable e) {
// rejected execution and other executor related issues
return Single.error(e);
}

return Single.create(future);
Single<T> single = Single.create(future, true);
return single.onCancel(() -> taskFuture.cancel(false)); // cancel task
}

private static class AsyncTask<T> implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ final class AtomicCycle {
this.maxIndex = maxIndex + 1;
}

int get() {
return atomicInteger.get();
}

void set(int n) {
atomicInteger.set(n);
}

int incrementAndGet() {
return atomicInteger.accumulateAndGet(maxIndex, (current, max) -> (current + 1) % max);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,41 @@ String name() {
}
}

interface Stats {

/**
* Number of concurrent executions at this time.
*
* @return concurrent executions.
*/
long concurrentExecutions();

/**
* Number of calls accepted on the bulkhead.
*
* @return calls accepted.
*/
long callsAccepted();

/**
* Number of calls rejected on the bulkhead.
*
* @return calls rejected.
*/
long callsRejected();

/**
* Size of waiting queue at this time.
*
* @return size of waiting queue.
*/
long waitingQueueSize();
}

/**
* Provides access to internal stats for this bulkhead.
*
* @return internal stats.
*/
Stats stats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Logger;

Expand All @@ -38,6 +39,10 @@ class BulkheadImpl implements Bulkhead {
private final Semaphore inProgress;
private final String name;

private final AtomicLong concurrentExecutions = new AtomicLong(0L);
private final AtomicLong callsAccepted = new AtomicLong(0L);
private final AtomicLong callsRejected = new AtomicLong(0L);

BulkheadImpl(Bulkhead.Builder builder) {
this.executor = builder.executor();
this.inProgress = new Semaphore(builder.limit(), true);
Expand All @@ -60,29 +65,65 @@ public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier)
return invokeTask(DelayedTask.createMulti(supplier));
}

@Override
public Stats stats() {
return new Stats() {
@Override
public long concurrentExecutions() {
return concurrentExecutions.get();
}

@Override
public long callsAccepted() {
return callsAccepted.get();
}

@Override
public long callsRejected() {
return callsRejected.get();
}

@Override
public long waitingQueueSize() {
return queue.size();
}
};
}

// this method must be called while NOT holding a permit
private <R> R invokeTask(DelayedTask<R> task) {
if (inProgress.tryAcquire()) {
LOGGER.finest(() -> name + " invoke immediate: " + task);

// free permit, we can invoke
execute(task);
return task.result();
} else {
// no free permit, let's try to enqueue
if (queue.offer(task)) {
LOGGER.finest(() -> name + " enqueue: " + task);
return task.result();
R result = task.result();
if (result instanceof Single<?>) {
Single<Object> single = (Single<Object>) result;
return (R) single.onCancel(() -> queue.remove(task));
}
return result;
} else {
LOGGER.finest(() -> name + " reject: " + task);
callsRejected.incrementAndGet();
return task.error(new BulkheadException("Bulkhead queue \"" + name + "\" is full"));
}
}
}

// this method must be called while holding a permit
private void execute(DelayedTask<?> task) {
callsAccepted.incrementAndGet();
concurrentExecutions.incrementAndGet();

task.execute()
.handle((it, throwable) -> {
concurrentExecutions.decrementAndGet();
// we do not care about execution, but let's record it in debug
LOGGER.finest(() -> name + " finished execution: " + task
+ " (" + (throwable == null ? "success" : "failure") + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,18 @@ private <U> U invokeTask(DelayedTask<U> task) {
if (state.get() == State.CLOSED) {
// run it!
CompletionStage<Void> completion = task.execute();

completion.handle((it, throwable) -> {
Throwable exception = FaultTolerance.cause(throwable);
if (exception == null || errorChecker.shouldSkip(exception)) {
// success
results.update(SUCCESS);
} else {
results.update(FAILURE);
if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
}

if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
return it;
});
return task.result();
Expand All @@ -111,18 +108,15 @@ private <U> U invokeTask(DelayedTask<U> task) {
// transition to closed
successCounter.set(0);
state.compareAndSet(State.HALF_OPEN, State.CLOSED);
halfOpenInProgress.set(false);
}
halfOpenInProgress.set(false);
} else {
// failure
successCounter.set(0);
state.set(State.OPEN);
halfOpenInProgress.set(false);
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}

halfOpenInProgress.set(false);
return it;
});
return task.result();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public CompletionStage<Void> execute() {

@Override
public Single<T> result() {
return Single.create(resultFuture.get());
return Single.create(resultFuture.get(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,24 @@
interface ErrorChecker {
boolean shouldSkip(Throwable throwable);

/**
* Returns ErrorChecker that skips if throwable is in skipOnSet or if applyOnSet
* is not empty and throwable is not in it. Note that if applyOnSet is empty, then
* it is equivalent to it containing {@code Throwable.class}. Sets are copied
* because they are mutable.
*
* @param skipOnSet set of throwables to skip logic on.
* @param applyOnSet set of throwables to apply logic on.
* @return An error checker.
*/
static ErrorChecker create(Set<Class<? extends Throwable>> skipOnSet, Set<Class<? extends Throwable>> applyOnSet) {
Set<Class<? extends Throwable>> skipOn = Set.copyOf(skipOnSet);
Set<Class<? extends Throwable>> applyOn = Set.copyOf(applyOnSet);
return throwable -> containsThrowable(skipOn, throwable)
|| !applyOn.isEmpty() && !containsThrowable(applyOn, throwable);
}

if (skipOn.isEmpty()) {
if (applyOn.isEmpty()) {
return throwable -> false;
} else {
return throwable -> !applyOn.contains(throwable.getClass());
}
} else {
if (applyOn.isEmpty()) {
return throwable -> skipOn.contains(throwable.getClass());
} else {
throw new IllegalArgumentException("You have defined both skip and apply set of exception classes. "
+ "This cannot be correctly handled; skipOn: " + skipOn
+ " applyOn: " + applyOn);
}

}
private static boolean containsThrowable(Set<Class<? extends Throwable>> set, Throwable throwable) {
return set.stream().anyMatch(t -> t.isAssignableFrom(throwable.getClass()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ public Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return null;
});

return Single.create(future);
return Single.create(future, true);
}
}
Loading

0 comments on commit 74956be

Please sign in to comment.