Skip to content
Permalink
Browse files
GERONIMO-6591 - Adding fallback support.
  • Loading branch information
johnament committed Oct 5, 2017
1 parent adee0a9 commit 0f811c373a782d40f87da7fc736289b80b7f5f12
Showing 12 changed files with 178 additions and 33 deletions.
@@ -44,20 +44,20 @@ public FailsafeExecutionManager() {

public Object execute(InvocationContext invocationContext) {
Method method = invocationContext.getMethod();
return executionPlanFactory.locateExecutionPlan(method).execute(invocationContext::proceed);
return executionPlanFactory.locateExecutionPlan(method).execute(invocationContext::proceed, invocationContext);
}

@Override
public <T> T execute(String name, Callable<T> callable) {
return executionPlanFactory.locateExecutionPlan(name, null, false).execute(callable);
return executionPlanFactory.locateExecutionPlan(name, null, false).execute(callable, null);
}

public <T> T executeAsync(String name, Callable<T> callable) {
return executionPlanFactory.locateExecutionPlan(name, null, true).execute(callable);
return executionPlanFactory.locateExecutionPlan(name, null, true).execute(callable, null);
}

public <T> T executeAsync(String name, Callable<T> callable, Duration timeout) {
return executionPlanFactory.locateExecutionPlan(name, timeout, true).execute(callable);
return executionPlanFactory.locateExecutionPlan(name, timeout, true).execute(callable, null);
}

public ExecutionPlanFactory getExecutionPlanFactory() {
@@ -22,31 +22,32 @@
import net.jodah.failsafe.AsyncFailsafe;
import net.jodah.failsafe.CircuitBreakerOpenException;
import org.apache.safeguard.impl.circuitbreaker.FailsafeCircuitBreaker;
import org.apache.safeguard.impl.fallback.FallbackRunner;
import org.apache.safeguard.impl.retry.FailsafeRetryDefinition;

import javax.interceptor.InvocationContext;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AsyncFailsafeExecutionPlan extends SyncFailsafeExecutionPlan {
private final ScheduledExecutorService executorService;
private final Duration timeout;

public AsyncFailsafeExecutionPlan(FailsafeRetryDefinition retryDefinition,
FailsafeCircuitBreaker failsafeCircuitBreaker,
FallbackRunner fallback,
ScheduledExecutorService executorService,
Duration timeout) {
super(retryDefinition, failsafeCircuitBreaker);
super(retryDefinition, failsafeCircuitBreaker, fallback);
this.executorService = executorService;
this.timeout = timeout;
}

@Override
public <T> T execute(Callable<T> callable) {
AsyncFailsafe<?> asyncFailsafe = getSyncFailsafe().with(executorService);
public <T> T execute(Callable<T> callable, InvocationContext invocationContext) {
AsyncFailsafe<?> asyncFailsafe = getSyncFailsafe(invocationContext).with(executorService);
try {
if (this.timeout == null) {
return asyncFailsafe.get(callable).get();
@@ -21,6 +21,7 @@

import org.apache.safeguard.exception.AsyncException;

import javax.interceptor.InvocationContext;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -34,7 +35,7 @@ class AsyncOnlyExecutionPlan implements ExecutionPlan {
}

@Override
public <T> T execute(Callable<T> callable) {
public <T> T execute(Callable<T> callable, InvocationContext invocationContext) {
Future<T> submitted = executorService.submit(callable);
try {
return submitted.get();
@@ -19,6 +19,7 @@

package org.apache.safeguard.impl.executionPlans;

import javax.interceptor.InvocationContext;
import java.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -37,7 +38,7 @@ class AsyncTimeoutExecutionPlan implements ExecutionPlan {
}

@Override
public <T> T execute(Callable<T> callable) {
public <T> T execute(Callable<T> callable, InvocationContext invocationContext) {
Future<T> future = executorService.submit(callable);
try {
return future.get(timeout.toMillis(), TimeUnit.MILLISECONDS);
@@ -19,8 +19,9 @@

package org.apache.safeguard.impl.executionPlans;

import javax.interceptor.InvocationContext;
import java.util.concurrent.Callable;

public interface ExecutionPlan {
<T> T execute(Callable<T> callable);
<T> T execute(Callable<T> function, InvocationContext invocationContext);
}
@@ -22,20 +22,23 @@
import org.apache.safeguard.impl.circuitbreaker.FailsafeCircuitBreaker;
import org.apache.safeguard.impl.circuitbreaker.FailsafeCircuitBreakerBuilder;
import org.apache.safeguard.impl.circuitbreaker.FailsafeCircuitBreakerManager;
import org.apache.safeguard.impl.fallback.FallbackRunner;
import org.apache.safeguard.impl.retry.FailsafeRetryBuilder;
import org.apache.safeguard.impl.retry.FailsafeRetryDefinition;
import org.apache.safeguard.impl.retry.FailsafeRetryManager;
import org.apache.safeguard.impl.util.AnnotationUtil;
import org.apache.safeguard.impl.util.NamingUtil;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
import org.eclipse.microprofile.faulttolerance.CircuitBreaker;
import org.eclipse.microprofile.faulttolerance.Fallback;
import org.eclipse.microprofile.faulttolerance.Retry;
import org.eclipse.microprofile.faulttolerance.Timeout;

import java.lang.reflect.Method;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

import static org.apache.safeguard.impl.executionPlans.MicroprofileAnnotationMapper.mapCircuitBreaker;
@@ -58,7 +61,7 @@ public ExecutionPlan locateExecutionPlan(String name, Duration timeout, boolean
if (circuitBreaker == null && retryDefinition == null) {
return null;
} else {
return new SyncFailsafeExecutionPlan(retryDefinition, circuitBreaker);
return new SyncFailsafeExecutionPlan(retryDefinition, circuitBreaker, null);
}
});
}
@@ -76,6 +79,7 @@ public ExecutionPlan locateExecutionPlan(Method method) {
}
boolean isAsync = isAsync(method);
Duration timeout = readTimeout(method);
FallbackRunner fallbackRunner = this.createFallback(method);
if(circuitBreaker == null && retryDefinition == null && isAsync) {
if(timeout == null) {
return new AsyncOnlyExecutionPlan(null);
@@ -90,9 +94,9 @@ else if(circuitBreaker == null && retryDefinition == null && timeout != null) {
}
else {
if (isAsync || timeout != null) {
return new AsyncFailsafeExecutionPlan(retryDefinition, circuitBreaker, Executors.newScheduledThreadPool(5), timeout);
return new AsyncFailsafeExecutionPlan(retryDefinition, circuitBreaker, fallbackRunner, Executors.newScheduledThreadPool(5), timeout);
} else {
return new SyncFailsafeExecutionPlan(retryDefinition, circuitBreaker);
return new SyncFailsafeExecutionPlan(retryDefinition, circuitBreaker, fallbackRunner);
}
}
});
@@ -116,6 +120,15 @@ private FailsafeCircuitBreaker createCBDefinition(String name, Method method) {
return new FailsafeCircuitBreaker(mapCircuitBreaker(circuitBreaker, circuitBreakerBuilder));
}

private FallbackRunner createFallback(Method method) {
Fallback fallback = AnnotationUtil.getAnnotation(method, Fallback.class);
if(fallback == null) {
return null;
}
String methodName = "".equals(fallback.fallbackMethod()) ? null : fallback.fallbackMethod();
return new FallbackRunner(fallback.value(), methodName);
}

private boolean isAsync(Method method) {
return AnnotationUtil.getAnnotation(method, Asynchronous.class) != null;
}
@@ -23,17 +23,22 @@
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.SyncFailsafe;
import org.apache.safeguard.impl.circuitbreaker.FailsafeCircuitBreaker;
import org.apache.safeguard.impl.fallback.FallbackRunner;
import org.apache.safeguard.impl.retry.FailsafeRetryDefinition;

import javax.interceptor.InvocationContext;
import java.util.concurrent.Callable;
import java.util.function.Function;

public class SyncFailsafeExecutionPlan implements ExecutionPlan {
private final FailsafeRetryDefinition retryDefinition;
private final FailsafeCircuitBreaker failsafeCircuitBreaker;
private final FallbackRunner fallback;

SyncFailsafeExecutionPlan(FailsafeRetryDefinition retryDefinition, FailsafeCircuitBreaker failsafeCircuitBreaker) {
SyncFailsafeExecutionPlan(FailsafeRetryDefinition retryDefinition, FailsafeCircuitBreaker failsafeCircuitBreaker, FallbackRunner fallback) {
this.retryDefinition = retryDefinition;
this.failsafeCircuitBreaker = failsafeCircuitBreaker;
this.fallback = fallback;
validateConfig();
}

@@ -44,17 +49,18 @@ private void validateConfig() {
}

@Override
public <T> T execute(Callable<T> callable) {
SyncFailsafe<?> syncFailsafe = getSyncFailsafe();
public <T> T execute(Callable<T> callable, InvocationContext invocationContext) {
SyncFailsafe<?> syncFailsafe = getSyncFailsafe(invocationContext);
try {
return syncFailsafe.get(callable);
} catch (CircuitBreakerOpenException e) {
throw new org.eclipse.microprofile.faulttolerance.exceptions.CircuitBreakerOpenException(e);
}
}

SyncFailsafe<?> getSyncFailsafe() {
SyncFailsafe<?> getSyncFailsafe(InvocationContext invocationContext) {
SyncFailsafe<?> syncFailsafe;
Callable callable = () -> fallback.executeFallback(invocationContext);
if(retryDefinition == null) {
syncFailsafe = Failsafe.with(failsafeCircuitBreaker.getDefinition().getCircuitBreaker());
}
@@ -67,6 +73,9 @@ SyncFailsafe<?> getSyncFailsafe() {
.with(failsafeCircuitBreaker.getDefinition().getCircuitBreaker());
}
}
if(this.fallback != null) {
syncFailsafe = syncFailsafe.withFallback(callable);
}
return syncFailsafe;
}
}
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.safeguard.impl.fallback;

import org.eclipse.microprofile.faulttolerance.ExecutionContext;
import org.eclipse.microprofile.faulttolerance.FallbackHandler;

import javax.enterprise.inject.spi.CDI;
import javax.interceptor.InvocationContext;
import java.lang.reflect.Method;
import java.lang.reflect.Parameter;

public class FallbackRunner {
private final Class<? extends FallbackHandler<?>> handlerClass;
private final String method;

public FallbackRunner(Class<? extends FallbackHandler<?>> handlerClass, String method) {
this.handlerClass = handlerClass;
this.method = method;
}

public Object executeFallback(InvocationContext invocationContext) {
if(method != null) {
try {
Method method = getMethod(invocationContext.getTarget().getClass());
Parameter[] parameters = method.getParameters();
if(parameters.length == 0) {
return method.invoke(invocationContext.getTarget());
}
else {
return method.invoke(invocationContext.getTarget(), invocationContext.getParameters());
}
} catch (Exception e) {
throw new IllegalArgumentException(e);
}
}
else {
SafeguardExecutionContext executionContext = new SafeguardExecutionContext(invocationContext.getMethod(),
invocationContext.getParameters());
CDI<Object> cdi = CDI.current();
FallbackHandler fallbackHandler = null;
try {
fallbackHandler = cdi.select(handlerClass).get();
}
catch (Exception e) {
try {
fallbackHandler = handlerClass.newInstance();
} catch (InstantiationException | IllegalAccessException e1) {
throw new IllegalArgumentException(e);
}
}
return fallbackHandler.handle(executionContext);
}
}

private Method getMethod(Class<?> aClass) {
for(Method method : aClass.getMethods()) {
if(method.getName().equals(this.method)) {
return method;
}
}
return null;
}

private static class SafeguardExecutionContext implements ExecutionContext {

private final Method method;
private final Object[] parameters;

private SafeguardExecutionContext(Method method, Object[] parameters) {
this.method = method;
this.parameters = parameters;
}

@Override
public Method getMethod() {
return method;
}

@Override
public Object[] getParameters() {
return parameters;
}
}
}
@@ -32,7 +32,7 @@ public class AsyncOnlyExecutionPlanTest {
public void shouldExecuteAsncWithoutTimeout() {
AsyncOnlyExecutionPlan asyncOnlyExecutionPlan = new AsyncOnlyExecutionPlan(Executors.newFixedThreadPool(2));
MyCallable callable = new MyCallable();
asyncOnlyExecutionPlan.execute(callable);
asyncOnlyExecutionPlan.execute(callable, null);
assertThat(callable.calledThread).isNotEqualTo(Thread.currentThread().getName());
}

@@ -35,7 +35,7 @@ public void shouldExecuteSimpleCallable() {
AsyncTimeoutExecutionPlan asyncTimeoutExecutionPlan = new AsyncTimeoutExecutionPlan(Duration.ofMillis(1000), Executors.newSingleThreadExecutor());
DelayedCaller callable = new DelayedCaller(200);

asyncTimeoutExecutionPlan.execute(callable);
asyncTimeoutExecutionPlan.execute(callable, null);

String myThreadName = Thread.currentThread().getName();
assertThat(callable.executedThread).isNotEqualTo(myThreadName);
@@ -46,7 +46,7 @@ public void shouldThrowTimeoutWhenDelayHit() {
AsyncTimeoutExecutionPlan asyncTimeoutExecutionPlan = new AsyncTimeoutExecutionPlan(Duration.ofMillis(100), Executors.newSingleThreadExecutor());
DelayedCaller callable = new DelayedCaller(200);

assertThatThrownBy(() -> asyncTimeoutExecutionPlan.execute(callable)).isInstanceOf(TimeoutException.class);
assertThatThrownBy(() -> asyncTimeoutExecutionPlan.execute(callable, null)).isInstanceOf(TimeoutException.class);
}

private static class DelayedCaller implements Callable<Object> {
@@ -26,7 +26,7 @@
public class SyncFailsafeExecutionPlanTest {
@Test
public void shouldThrowExceptionWithInvalidConfig() {
assertThatThrownBy(() -> new SyncFailsafeExecutionPlan(null, null))
assertThatThrownBy(() -> new SyncFailsafeExecutionPlan(null, null, null))
.isInstanceOf(IllegalStateException.class)
.hasMessage("For non-async invocations, must have at least one of RetryDefintion or CircuitBreaker defined");
}

0 comments on commit 0f811c3

Please sign in to comment.