Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of MP FT 2.1.1 using FT SE #2348

Merged
merged 67 commits into from
Sep 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
2a32348
Initial prototype with basic support for fallback, retry and asynchro…
spericas Jul 8, 2020
6b6fd9b
Allow conversion to CompletionStage with completeWithoutValue flag.
spericas Jul 9, 2020
66c4615
Create Single's with nullMeansEmpty flag to handle void methods. Modi…
spericas Jul 9, 2020
dcba7dc
Several improvements for circuit breakers. Handler creation for all F…
spericas Jul 9, 2020
41214c8
Trying new implementation of timeout when running on same thread.
spericas Jul 10, 2020
11280c9
Updates to CommandRunner and ThrowableMapper.
spericas Jul 10, 2020
8792c04
Improvements to better support sync and async timeouts.
spericas Jul 16, 2020
8621fa3
Improved handling of exceptions in async calls.
spericas Jul 16, 2020
afcb9d4
Threshold compared using greater-than instead of greater-or-equal.
spericas Jul 16, 2020
c7d67ae
Changes in this commit:
spericas Jul 20, 2020
6528030
Changes in this commit:
spericas Jul 22, 2020
0ba871d
Changes in this commit:
spericas Jul 23, 2020
1b9a62f
Bulkhead metrics improved.
spericas Jul 24, 2020
d414452
Removed all Hystrix and Failsafe code and dependencies.
spericas Jul 24, 2020
c31b95f
Upgraded to FT 2.1 API. Fixed checkstyle and copyright problems.
spericas Aug 4, 2020
fe91cf3
Fixed used of duration units in Retry.
spericas Aug 4, 2020
54ce27b
Support for request scope propagation for CDI and Jersey. In the FT T…
spericas Aug 5, 2020
f6f1777
Changes to ErrorChecker to implement MP semantics. First checks skipO…
spericas Aug 5, 2020
aeaec54
Handle somewhat unusual case of synchronous method returning a Comple…
spericas Aug 5, 2020
b0929b6
Changes in this commit:
spericas Aug 10, 2020
e73962a
Handle TestNG before and after methods.
spericas Aug 11, 2020
d1af56c
Added toString() method to help debugging.
spericas Aug 11, 2020
ef3f585
Improved implementations of MethodInvoker and TimeoutImpl.
spericas Aug 11, 2020
e00cbce
Async methods cannot throw exceptions. Fixed return type.
spericas Aug 13, 2020
b5abd22
Use FT 2.1.1.
spericas Aug 13, 2020
fbab434
New implementation for async that also simplifies the sync case.
spericas Aug 13, 2020
2d55ff2
Renamed async to currentThread.
spericas Aug 13, 2020
dba5c07
Fixed returned types of async methods in tests.
spericas Aug 14, 2020
56d0b00
Enhanced support for asynchronous methods.
spericas Aug 14, 2020
90e0b6f
Fixed regression in request scope support.
spericas Aug 14, 2020
a59886b
Support for cancellation of Singles created from CompletableFutures a…
spericas Aug 20, 2020
0f41dee
Support for cancellation of async tasks. Update ResultWindow opening …
spericas Aug 20, 2020
a13e077
Use CompletableFuture instead of Future whenever possible. CircuitBre…
spericas Aug 20, 2020
c37a25a
Config property to disable caching of MethodState's. New logic to sup…
spericas Aug 20, 2020
6b5de79
Handle multiple applications with different CCL as required by the TCKs.
spericas Aug 26, 2020
164cc77
Map exception types in applyOn and failOn from MP to SE. This is nece…
spericas Aug 31, 2020
c2dd907
Re-organized FT handlers to run the timeout handler first.
spericas Sep 1, 2020
073e677
Updated semantics for circuit breakers. Thresholds are checked after …
spericas Sep 1, 2020
49abf12
Restored validation of FT annotations.
spericas Sep 1, 2020
b22da53
Fixed problems with fallback and breaker metrics.
spericas Sep 1, 2020
5449921
Bulkhead histograms should only be registered for async methods. Entr…
spericas Sep 2, 2020
d1644aa
New class to use as key into the method states map. Key must include …
spericas Sep 2, 2020
02f1191
Use executor from FT SE after adjusting core sizes. Set queue to 0 fo…
spericas Sep 3, 2020
d857591
Switched composition of handlers for bulkhead and circuit breakers.
spericas Sep 3, 2020
182575c
Ensure proper access to context class loader to access application's …
spericas Sep 3, 2020
cc51f55
Invert timeout and bulkhead handlers. Timeouts also apply to tasks wa…
spericas Sep 8, 2020
b9830c1
On timeout also cancel source single.
spericas Sep 8, 2020
c943a24
New instance of SingleNever for every invocation. It is otherwise pos…
spericas Sep 8, 2020
b51b46c
Clean up tck-suite file.
spericas Sep 8, 2020
8f502f8
Fixed copyright and checkstyle issues.
spericas Sep 8, 2020
be53ab9
Merge branch 'master' into ft-2.
spericas Sep 8, 2020
c79c1f9
Revert "On timeout also cancel source single."
spericas Sep 9, 2020
b72ecc0
Make sure source is cancelled when multi is cancelled.
spericas Sep 9, 2020
0a0e151
Remove old files.
spericas Sep 9, 2020
cfd6fe7
Async flag is no longer needed.
spericas Sep 9, 2020
7de6fbc
Updated comment.
spericas Sep 9, 2020
668d114
Removed unused listener support.
spericas Sep 9, 2020
f0c6cef
More cleanup of unused flags.
spericas Sep 9, 2020
8b7467f
Fixed checkstyle errors.
spericas Sep 9, 2020
5ac8ccc
Fixed Javadoc.
spericas Sep 9, 2020
960897d
Copyright year.
spericas Sep 9, 2020
37cef94
Adjusted breaker window and error ratio based on latest changes.
spericas Sep 9, 2020
06f3669
Use Single returned from onCancel. Minor optimization.
spericas Sep 10, 2020
b6d2e91
Make sure that tests are copied due to mutability.
spericas Sep 10, 2020
24ba047
Merge branch 'master' into ft-2
spericas Sep 10, 2020
06b6c1a
Use latest version of SingleNever after merge with master.
spericas Sep 10, 2020
a01426f
Removed unused import.
spericas Sep 10, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void subscribe(Flow.Subscriber<? super T> subscriber) {

static <T> void subscribe(Flow.Subscriber<? super T> subscriber, CompletionStage<T> source, boolean nullMeansEmpty) {
AtomicBiConsumer<T> watcher = new AtomicBiConsumer<>();
CompletionStageSubscription<T> css = new CompletionStageSubscription<>(subscriber, nullMeansEmpty, watcher);
CompletionStageSubscription<T> css = new CompletionStageSubscription<>(subscriber, nullMeansEmpty, watcher, source);
watcher.lazySet(css);

subscriber.onSubscribe(css);
Expand All @@ -55,10 +55,13 @@ static final class CompletionStageSubscription<T> extends DeferredScalarSubscrip

private final AtomicBiConsumer<T> watcher;

CompletionStageSubscription(Flow.Subscriber<? super T> downstream, boolean nullMeansEmpty, AtomicBiConsumer<T> watcher) {
private CompletionStage<T> source;
CompletionStageSubscription(Flow.Subscriber<? super T> downstream, boolean nullMeansEmpty,
AtomicBiConsumer<T> watcher, CompletionStage<T> source) {
super(downstream);
this.nullMeansEmpty = nullMeansEmpty;
this.watcher = watcher;
this.source = source;
}

@Override
Expand All @@ -77,6 +80,7 @@ public void accept(T t, Throwable throwable) {
@Override
public void cancel() {
super.cancel();
source.toCompletableFuture().cancel(true);
watcher.getAndSet(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -648,8 +648,21 @@ default Single<Optional<T>> toOptionalSingle() {
* @return CompletionStage
*/
default CompletionStage<T> toStage() {
return toStage(false);
}

/**
* Exposes this {@link Single} instance as a {@link CompletionStage}.
* Note that if this {@link Single} completes without a value and {@code completeWithoutValue}
* is set to {@code false}, the resulting {@link CompletionStage} will be completed
* exceptionally with an {@link IllegalStateException}
*
* @param completeWithoutValue Allow completion without a value.
* @return CompletionStage
*/
default CompletionStage<T> toStage(boolean completeWithoutValue) {
try {
SingleToFuture<T> subscriber = new SingleToFuture<>(this, false);
SingleToFuture<T> subscriber = new SingleToFuture<>(this, completeWithoutValue);
this.subscribe(subscriber);
return subscriber;
} catch (Throwable ex) {
Expand Down
2 changes: 1 addition & 1 deletion dependencies/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
<version.lib.microprofile-jwt>1.1.1</version.lib.microprofile-jwt>
<version.lib.microprofile-metrics-api>2.3.2</version.lib.microprofile-metrics-api>
<version.lib.microprofile-openapi-api>1.1.2</version.lib.microprofile-openapi-api>
<version.lib.microprofile-fault-tolerance-api>2.0.2</version.lib.microprofile-fault-tolerance-api>
<version.lib.microprofile-fault-tolerance-api>2.1.1</version.lib.microprofile-fault-tolerance-api>
<version.lib.microprofile-tracing>1.3.3</version.lib.microprofile-tracing>
<version.lib.microprofile-rest-client>1.3.3</version.lib.microprofile-rest-client>
<version.lib.microprofile-reactive-messaging-api>1.0</version.lib.microprofile-reactive-messaging-api>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ public class FtService implements Service {
.name("helidon-example-bulkhead")
.build();
this.breaker = CircuitBreaker.builder()
.volume(10)
.errorRatio(20)
.volume(4)
.errorRatio(40)
.successThreshold(1)
.delay(Duration.ofSeconds(5))
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.Supplier;

import io.helidon.common.LazyValue;
Expand All @@ -35,14 +36,16 @@ public <T> Single<T> invoke(Supplier<T> supplier) {
CompletableFuture<T> future = new CompletableFuture<>();
AsyncTask<T> task = new AsyncTask<>(supplier, future);

Future<?> taskFuture;
try {
executor.get().submit(task);
taskFuture = executor.get().submit(task);
} catch (Throwable e) {
// rejected execution and other executor related issues
return Single.error(e);
}

return Single.create(future);
Single<T> single = Single.create(future, true);
return single.onCancel(() -> taskFuture.cancel(false)); // cancel task
}

private static class AsyncTask<T> implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,14 @@ final class AtomicCycle {
this.maxIndex = maxIndex + 1;
}

int get() {
return atomicInteger.get();
}

void set(int n) {
atomicInteger.set(n);
}

int incrementAndGet() {
return atomicInteger.accumulateAndGet(maxIndex, (current, max) -> (current + 1) % max);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,41 @@ String name() {
}
}

interface Stats {

/**
* Number of concurrent executions at this time.
*
* @return concurrent executions.
*/
long concurrentExecutions();

/**
* Number of calls accepted on the bulkhead.
*
* @return calls accepted.
*/
long callsAccepted();

/**
* Number of calls rejected on the bulkhead.
*
* @return calls rejected.
*/
long callsRejected();

/**
* Size of waiting queue at this time.
*
* @return size of waiting queue.
*/
long waitingQueueSize();
}

/**
* Provides access to internal stats for this bulkhead.
*
* @return internal stats.
*/
Stats stats();
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Flow;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.logging.Logger;

Expand All @@ -38,6 +39,10 @@ class BulkheadImpl implements Bulkhead {
private final Semaphore inProgress;
private final String name;

private final AtomicLong concurrentExecutions = new AtomicLong(0L);
private final AtomicLong callsAccepted = new AtomicLong(0L);
private final AtomicLong callsRejected = new AtomicLong(0L);

BulkheadImpl(Bulkhead.Builder builder) {
this.executor = builder.executor();
this.inProgress = new Semaphore(builder.limit(), true);
Expand All @@ -60,29 +65,65 @@ public <T> Multi<T> invokeMulti(Supplier<? extends Flow.Publisher<T>> supplier)
return invokeTask(DelayedTask.createMulti(supplier));
}

@Override
public Stats stats() {
return new Stats() {
@Override
public long concurrentExecutions() {
return concurrentExecutions.get();
}

@Override
public long callsAccepted() {
return callsAccepted.get();
}

@Override
public long callsRejected() {
return callsRejected.get();
}

@Override
public long waitingQueueSize() {
return queue.size();
}
};
}

// this method must be called while NOT holding a permit
private <R> R invokeTask(DelayedTask<R> task) {
if (inProgress.tryAcquire()) {
LOGGER.finest(() -> name + " invoke immediate: " + task);

// free permit, we can invoke
execute(task);
return task.result();
} else {
// no free permit, let's try to enqueue
if (queue.offer(task)) {
LOGGER.finest(() -> name + " enqueue: " + task);
return task.result();
R result = task.result();
if (result instanceof Single<?>) {
Single<Object> single = (Single<Object>) result;
return (R) single.onCancel(() -> queue.remove(task));
}
return result;
} else {
LOGGER.finest(() -> name + " reject: " + task);
callsRejected.incrementAndGet();
return task.error(new BulkheadException("Bulkhead queue \"" + name + "\" is full"));
}
}
}

// this method must be called while holding a permit
private void execute(DelayedTask<?> task) {
callsAccepted.incrementAndGet();
concurrentExecutions.incrementAndGet();

task.execute()
.handle((it, throwable) -> {
concurrentExecutions.decrementAndGet();
// we do not care about execution, but let's record it in debug
LOGGER.finest(() -> name + " finished execution: " + task
+ " (" + (throwable == null ? "success" : "failure") + ")");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,18 @@ private <U> U invokeTask(DelayedTask<U> task) {
if (state.get() == State.CLOSED) {
// run it!
CompletionStage<Void> completion = task.execute();

completion.handle((it, throwable) -> {
Throwable exception = FaultTolerance.cause(throwable);
if (exception == null || errorChecker.shouldSkip(exception)) {
// success
results.update(SUCCESS);
} else {
results.update(FAILURE);
if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
}

if (results.shouldOpen() && state.compareAndSet(State.CLOSED, State.OPEN)) {
results.reset();
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}
return it;
});
return task.result();
Expand All @@ -111,18 +108,15 @@ private <U> U invokeTask(DelayedTask<U> task) {
// transition to closed
successCounter.set(0);
state.compareAndSet(State.HALF_OPEN, State.CLOSED);
halfOpenInProgress.set(false);
}
halfOpenInProgress.set(false);
} else {
// failure
successCounter.set(0);
state.set(State.OPEN);
halfOpenInProgress.set(false);
// if we successfully switch to open, we need to schedule switch to half-open
scheduleHalf();
}

halfOpenInProgress.set(false);
return it;
});
return task.result();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public CompletionStage<Void> execute() {

@Override
public Single<T> result() {
return Single.create(resultFuture.get());
return Single.create(resultFuture.get(), true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,24 @@
interface ErrorChecker {
boolean shouldSkip(Throwable throwable);

/**
* Returns ErrorChecker that skips if throwable is in skipOnSet or if applyOnSet
* is not empty and throwable is not in it. Note that if applyOnSet is empty, then
* it is equivalent to it containing {@code Throwable.class}. Sets are copied
* because they are mutable.
*
* @param skipOnSet set of throwables to skip logic on.
* @param applyOnSet set of throwables to apply logic on.
* @return An error checker.
*/
static ErrorChecker create(Set<Class<? extends Throwable>> skipOnSet, Set<Class<? extends Throwable>> applyOnSet) {
Set<Class<? extends Throwable>> skipOn = Set.copyOf(skipOnSet);
Set<Class<? extends Throwable>> applyOn = Set.copyOf(applyOnSet);
return throwable -> containsThrowable(skipOn, throwable)
|| !applyOn.isEmpty() && !containsThrowable(applyOn, throwable);
}

if (skipOn.isEmpty()) {
if (applyOn.isEmpty()) {
return throwable -> false;
} else {
return throwable -> !applyOn.contains(throwable.getClass());
}
} else {
if (applyOn.isEmpty()) {
return throwable -> skipOn.contains(throwable.getClass());
} else {
throw new IllegalArgumentException("You have defined both skip and apply set of exception classes. "
+ "This cannot be correctly handled; skipOn: " + skipOn
+ " applyOn: " + applyOn);
}

}
private static boolean containsThrowable(Set<Class<? extends Throwable>> set, Throwable throwable) {
return set.stream().anyMatch(t -> t.isAssignableFrom(throwable.getClass()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,6 @@ public Single<T> invoke(Supplier<? extends CompletionStage<T>> supplier) {
return null;
});

return Single.create(future);
return Single.create(future, true);
}
}