Skip to content
Permalink
Browse files
more mp config handling + metrics support + enhancing of future support
  • Loading branch information
rmannibucau committed Nov 27, 2018
1 parent 4754f4c commit 25df118e2b5862a6f6f54ed3f749e7690c2d5c35
Showing 18 changed files with 619 additions and 187 deletions.
@@ -36,13 +36,8 @@ Apache Safeguard is currently in development; however a 1.0 release was created
```xml
<dependencies>
<dependency>
<artifactId>safeguard-api</artifactId>
<groupId>org.apache.geronimo.safeguard</groupId>
<version>1.0</version>
</dependency>
<dependency>
<artifactId>safeguard-impl</artifactId>
<groupId>org.apache.geronimo.safeguard</groupId>
**<artifactId>safeguard-impl</artifactId>**
<version>1.0</version>
</dependency>
</dependencies>
@@ -52,27 +47,15 @@ Apache Safeguard implements the [MicroProfile Fault Tolerance v1.0 specification

### Integration

The core of Safeguard is wrapped around an `ExecutionManager` which takes care of coordinating and storing the execution state of various methods. It allows some configurability, but if you want to change it your best solution is to create an alternative of `ExecutionManager` with your customizations. For instance, in an EE environment you may want to use a `ManagedScheduledExecutorService` which could be done:
For `@Asynchronous` executor customization you can use:

```java
@ApplicationScoped
@Specializes
@Priority(100)
public class MyExecutionManagerProvider extends FailsafeExecutionManagerProvider{
public class MyExecutionManagerProvider {
@Resource
private ManagedScheduledExecutorService executorService;
@Produces
@ApplicationScoped
public ExecutionManager createExecutionManager() {
FailsafeCircuitBreakerManager circuitBreakerManager = new FailsafeCircuitBreakerManager();
FailsafeRetryManager retryManager = new FailsafeRetryManager();
BulkheadManagerImpl bulkheadManager = new BulkheadManagerImpl();
DefaultExecutorServiceProvider executorServiceProvider = new DefaultExecutorServiceProvider(executorService);
ExecutionPlanFactory executionPlanFactory = new ExecutionPlanFactory(circuitBreakerManager, retryManager, bulkheadManager, mapper,
executorServiceProvider);
return FailsafeExecutionManager(MicroprofileAnnotationMapper.getInstance(), bulkheadManager, circuitBreakerManager,
retryManager, executionPlanFactory, executorServiceProvider);
}
@Safeguard
private ManagedScheduledExecutorService executor;
}
@@ -75,7 +75,7 @@
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<microprofile-fault-tolerance.version>1.1.3</microprofile-fault-tolerance.version>
<owb.version>2.0.8</owb.version>
<owb.version>2.0.9-SNAPSHOT</owb.version>
<arquillian.version>1.1.14.Final</arquillian.version>
<arquillian-weld-embedded.version>2.0.0.Final</arquillian-weld-embedded.version>
<cdi2-api.version>2.0</cdi2-api.version>
@@ -34,7 +34,11 @@ public class AnnotationFinder {
private BeanManager manager;

public <T extends Annotation> T findAnnotation(final Class<T> type, final InvocationContext context) {
Class<?> target = context.getTarget().getClass();
// normally we should use target but validations require the fallback
final Class<?> targetClass = ofNullable(context.getTarget())
.map(Object::getClass)
.orElseGet(() -> Class.class.cast(type));
Class<?> target = targetClass;
while (target.getName().contains("$$")) {
target = target.getSuperclass();
}
@@ -44,6 +48,6 @@ public <T extends Annotation> T findAnnotation(final Class<T> type, final Invoca
.findFirst()
.map(m -> m.getAnnotation(type))
.orElseGet(() -> ofNullable(context.getMethod().getAnnotation(type))
.orElseGet(() -> context.getTarget().getClass().getAnnotation(type)));
.orElseGet(() -> targetClass.getAnnotation(type)));
}
}
@@ -18,32 +18,75 @@
*/
package org.apache.safeguard.impl.asynchronous;

import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;

import javax.annotation.Priority;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.interceptor.AroundInvoke;
import javax.interceptor.Interceptor;
import javax.interceptor.InvocationContext;

import org.apache.safeguard.impl.config.ConfigurationMapper;
import org.apache.safeguard.impl.customizable.Safeguard;
import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
import org.eclipse.microprofile.faulttolerance.Asynchronous;

@Interceptor
@Asynchronous
@Priority(Interceptor.Priority.PLATFORM_AFTER + 1)
@Priority(Interceptor.Priority.PLATFORM_AFTER + 6)
public class AsynchronousInterceptor extends BaseAsynchronousInterceptor {
@Inject
@Safeguard
private Executor executor;
private Cache cache;

@Override
protected Executor getExecutor(final InvocationContext context) {
return executor;
return cache.getExecutor();
}

@AroundInvoke
public Object async(final InvocationContext context) throws Exception {
final Map<Method, Boolean> models = cache.getEnabled();
Boolean enabled = models.get(context.getMethod());
if (enabled == null) {
enabled = cache.getMapper().isEnabled(context.getMethod(), Asynchronous.class);
models.putIfAbsent(context.getMethod(), enabled);
}
if (!enabled) {
return context.proceed();
}
final String key = Asynchronous.class.getName() + ".skip_" +
context.getContextData().get(IdGeneratorInterceptor.class.getName());
if (context.getContextData().putIfAbsent(key, Boolean.TRUE) != null) { // bulkhead or so handling threading
return context.proceed();
}
return around(context);
}

@ApplicationScoped
public static class Cache {
private final Map<Method, Boolean> enabled = new ConcurrentHashMap<>();

@Inject
@Safeguard
private Executor executor;

@Inject
private ConfigurationMapper mapper;

public ConfigurationMapper getMapper() {
return mapper;
}

public Executor getExecutor() {
return executor;
}

public Map<Method, Boolean> getEnabled() {
return enabled;
}
}
}
@@ -18,9 +18,11 @@
*/
package org.apache.safeguard.impl.asynchronous;

import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import java.io.Serializable;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
@@ -35,21 +37,12 @@
import javax.interceptor.InvocationContext;

import org.apache.safeguard.impl.interceptor.IdGeneratorInterceptor;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.exceptions.FaultToleranceDefinitionException;

public abstract class BaseAsynchronousInterceptor implements Serializable {
protected abstract Executor getExecutor(InvocationContext context);

protected Object around(final InvocationContext context) throws Exception {
final String key = Asynchronous.class.getName() +
context.getContextData().get(IdGeneratorInterceptor.class.getName());
if (context.getContextData().get(key) != null) { // bulkhead or so handling threading
return context.proceed();
}

context.getContextData().put(key, "true");

protected Object around(final InvocationContext context) {
final Class<?> returnType = context.getMethod().getReturnType();
if (CompletionStage.class.isAssignableFrom(returnType)) {
final ExtendedCompletableFuture<Object> future = newCompletableFuture(context);
@@ -61,7 +54,20 @@ protected Object around(final InvocationContext context) throws Exception {
stage.handle((r, e) -> {
future.after();
if (e != null) {
future.completeExceptionally(e);
ofNullable(getErrorHandler(context.getContextData()))
.map(eh -> {
if (Exception.class.isInstance(e)) {
try {
eh.apply(Exception.class.cast(e));
} catch (final Exception e1) {
future.completeExceptionally(e1);
}
} else {
future.completeExceptionally(e);
}
return true;
})
.orElseGet(() -> future.completeExceptionally(e));
} else {
future.complete(r);
}
@@ -74,7 +80,7 @@ protected Object around(final InvocationContext context) throws Exception {
return future;
}
if (Future.class.isAssignableFrom(returnType)) {
final FutureWrapper<Object> facade = newFuture(context);
final FutureWrapper<Object> facade = newFuture(context, context.getContextData());
getExecutor(context).execute(() -> {
final Object proceed;
try {
@@ -94,38 +100,55 @@ protected Object around(final InvocationContext context) throws Exception {
"Should be Future or CompletionStage.");
}

protected FutureWrapper<Object> newFuture(final InvocationContext context) {
return new FutureWrapper<>();
private static ErrorHandler<Exception, Future<?>> getErrorHandler(final Map<String, Object> contextData) {
return ErrorHandler.class.cast(
contextData.get(BaseAsynchronousInterceptor.BaseFuture.class.getName() + ".errorHandler_" +
contextData.get(IdGeneratorInterceptor.class.getName())));
}

protected FutureWrapper<Object> newFuture(final InvocationContext context,
final Map<String, Object> data) {
return new FutureWrapper<>(data);
}

protected ExtendedCompletableFuture<Object> newCompletableFuture(final InvocationContext context) {
return new ExtendedCompletableFuture<>();
}

public static class ExtendedCompletableFuture<T> extends CompletableFuture<T> {
public void before() {
// no-op
@FunctionalInterface
public interface ErrorHandler<A, B> {
B apply(A a) throws Exception;
}

public interface BaseFuture {
default void before() {

}

public void after() {
// no-op
default void after() {

}
}

public static class FutureWrapper<T> implements Future<T> {
public static class ExtendedCompletableFuture<T> extends CompletableFuture<T> implements BaseFuture {
}

public static class FutureWrapper<T> implements Future<T>, BaseFuture {
private final AtomicReference<Future<T>> delegate = new AtomicReference<>();
private final AtomicReference<Consumer<Future<T>>> cancelled = new AtomicReference<>();
private final CountDownLatch latch = new CountDownLatch(1);
private final Map<String, Object> data;

public void before() {
// no-op
public FutureWrapper(final Map<String, Object> data) {
this.data = data;
}

public void setDelegate(final Future<T> delegate) {
final Consumer<Future<T>> cancelledTask = cancelled.get();
if (cancelledTask != null) {
cancelledTask.accept(delegate);
}
after();
this.delegate.set(delegate);
this.latch.countDown();
}
@@ -150,7 +173,14 @@ public boolean isDone() {
@Override
public T get() throws InterruptedException, ExecutionException {
latch.await();
return delegate.get().get();
final Future<T> future = delegate.get();
try {
return future.get();
} catch (final ExecutionException ee) {
final Future<T> newFuture = onException(ee);
delegate.set(newFuture);
return newFuture.get();
}
}

@Override
@@ -161,7 +191,50 @@ public T get(final long timeout, final TimeUnit unit) throws InterruptedExceptio
if (!latchWait) {
throw new TimeoutException();
}
return delegate.get().get(unit.toNanos(timeout) - latchWaitDuration, NANOSECONDS);
try {
return delegate.get().get(unit.toNanos(timeout) - latchWaitDuration, NANOSECONDS);
} catch (final ExecutionException ee) {
delegate.set(onException(ee));
final long duration = unit.toNanos(timeout) - (System.nanoTime() - latchWaitDuration);
if (duration < 0) {
throw new TimeoutException();
}
return delegate.get().get(duration, NANOSECONDS);
}
}

protected Future<T> onException(final Throwable cause) throws ExecutionException {
if (!Exception.class.isInstance(cause)) {
if (Error.class.isInstance(cause)) {
throw Error.class.cast(cause);
}
throw new IllegalStateException(cause);
}
final Exception ex = Exception.class.cast(cause);
final ErrorHandler<Exception, Future<?>> handler = getErrorHandler(data);
if (handler != null) {
try {
return (Future<T>) handler.apply(ex);
} catch (final Exception e) {
if (ExecutionException.class.isInstance(e)) {
throw ExecutionException.class.cast(e);
}
if (RuntimeException.class.isInstance(e)) {
throw RuntimeException.class.cast(e);
}
if (Error.class.isInstance(e)) {
throw Error.class.cast(e);
}
throw new IllegalStateException(e);
}
}
if (ExecutionException.class.isInstance(cause)) {
throw ExecutionException.class.cast(cause);
}
if (RuntimeException.class.isInstance(cause)) {
throw RuntimeException.class.cast(cause);
}
throw new IllegalStateException(cause); // unreachable - just for compiler
}
}
}

0 comments on commit 25df118

Please sign in to comment.