Skip to content

Commit

Permalink
feature: StateMachine ServiceTask supports asynchronous execution (#1844
Browse files Browse the repository at this point in the history
)
  • Loading branch information
long187 authored and zhangthen committed Nov 1, 2019
1 parent 5473a62 commit 29f805a
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 49 deletions.
Expand Up @@ -23,6 +23,7 @@
import io.seata.saga.engine.expression.seq.SequenceExpressionFactory;
import io.seata.saga.engine.expression.spel.SpringELExpressionFactory;
import io.seata.saga.engine.invoker.ServiceInvokerManager;
import io.seata.saga.engine.invoker.impl.SpringBeanServiceInvoker;
import io.seata.saga.engine.pcext.StateMachineProcessHandler;
import io.seata.saga.engine.pcext.StateMachineProcessRouter;
import io.seata.saga.engine.repo.StateLogRepository;
Expand Down Expand Up @@ -176,6 +177,11 @@ protected void init() throws Exception {

if (this.serviceInvokerManager == null) {
this.serviceInvokerManager = new ServiceInvokerManager();

SpringBeanServiceInvoker springBeanServiceInvoker = new SpringBeanServiceInvoker();
springBeanServiceInvoker.setApplicationContext(getApplicationContext());
springBeanServiceInvoker.setThreadPoolExecutor(threadPoolExecutor);
this.serviceInvokerManager.putServiceInvoker(DomainConstants.SERVICE_TYPE_SPRING_BEAN, springBeanServiceInvoker);
}
}

Expand Down
Expand Up @@ -15,7 +15,6 @@
*/
package io.seata.saga.engine.invoker;

import io.seata.saga.engine.invoker.impl.SpringBeanServiceInvoker;
import io.seata.saga.statelang.domain.DomainConstants;
import org.springframework.util.StringUtils;

Expand All @@ -31,10 +30,6 @@ public class ServiceInvokerManager {

private Map<String, ServiceInvoker> serviceInvokerMap = new ConcurrentHashMap<>();

public ServiceInvokerManager() {
serviceInvokerMap.put(DomainConstants.SERVICE_TYPE_SPRING_BEAN, new SpringBeanServiceInvoker());
}

public ServiceInvoker getServiceInvoker(String serviceType) {
if (StringUtils.isEmpty(serviceType)) {
serviceType = DomainConstants.SERVICE_TYPE_SPRING_BEAN;
Expand Down
Expand Up @@ -34,6 +34,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

/**
* SpringBean Service Invoker
Expand All @@ -45,11 +46,36 @@ public class SpringBeanServiceInvoker implements ServiceInvoker, ApplicationCont
private static final Logger LOGGER = LoggerFactory.getLogger(SpringBeanServiceInvoker.class);

private ApplicationContext applicationContext;
private ThreadPoolExecutor threadPoolExecutor;

@Override
public Object invoke(ServiceTaskState serviceTaskState, Object... input) {

ServiceTaskStateImpl state = (ServiceTaskStateImpl) serviceTaskState;
if(state.isAsync()){
if(threadPoolExecutor == null){
if(LOGGER.isWarnEnabled()){
LOGGER.warn("threadPoolExecutor is null, Service[{}.{}] cannot execute asynchronously, executing synchronously now. stateName: {}", state.getServiceName(), state.getServiceMethod(), state.getName());
}
return doInvoke(state, input);
}

if(LOGGER.isInfoEnabled()){
LOGGER.info("Submit Service[{}.{}] to asynchronously executing. stateName: {}", state.getServiceName(), state.getServiceMethod(), state.getName());
}
threadPoolExecutor.execute(new Runnable() {
@Override
public void run() {
doInvoke(state, input);
}
});
return null;
}
else{
return doInvoke(state, input);
}
}

protected Object doInvoke(ServiceTaskStateImpl state, Object[] input) {

Object bean = applicationContext.getBean(state.getServiceName());

Expand All @@ -68,6 +94,7 @@ public Object invoke(ServiceTaskState serviceTaskState, Object... input) {

if (method == null) {
throw new EngineExecutionException("No such method[" + state.getServiceMethod() + "] on BeanClass[" + bean.getClass() + "]", FrameworkErrorCode.NoSuchMethod);

}

Object[] args = new Object[method.getParameterCount()];
Expand All @@ -91,6 +118,10 @@ public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}

public void setThreadPoolExecutor(ThreadPoolExecutor threadPoolExecutor) {
this.threadPoolExecutor = threadPoolExecutor;
}

protected Method findMethod(Class<?> clazz, String methodName, List<String> parameterTypes) {

if (parameterTypes == null || parameterTypes.size() == 0) {
Expand Down
Expand Up @@ -369,59 +369,67 @@ private void decideExecutionStatus(ProcessContext context, StateInstance stateIn
Map<String, String> statusMatchList = state.getStatus();
if (statusMatchList != null && statusMatchList.size() > 0) {

StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);

Map<Object, String> statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
synchronized (state) {
statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
statusEvaluators = new LinkedHashMap<>(statusMatchList.size());
for (String expressionStr : statusMatchList.keySet()) {

String statusVal = statusMatchList.get(expressionStr);
Evaluator evaluator = createEvaluator(stateMachineConfig.getEvaluatorFactoryManager(), expressionStr);
if (evaluator != null) {
statusEvaluators.put(evaluator, statusVal);
if(state.isAsync()){
if(LOGGER.isWarnEnabled()){
LOGGER.warn("Service[{}.{}] is execute asynchronously, null return value collected, so user defined Status Matching skipped. stateName: {}, branchId: {}", state.getServiceName(), state.getServiceMethod(), state.getName(), stateInstance.getId());
}
}
else{

StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);

Map<Object, String> statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
synchronized (state) {
statusEvaluators = state.getStatusEvaluators();
if (statusEvaluators == null) {
statusEvaluators = new LinkedHashMap<>(statusMatchList.size());
for (String expressionStr : statusMatchList.keySet()) {

String statusVal = statusMatchList.get(expressionStr);
Evaluator evaluator = createEvaluator(stateMachineConfig.getEvaluatorFactoryManager(), expressionStr);
if (evaluator != null) {
statusEvaluators.put(evaluator, statusVal);
}
}
}
state.setStatusEvaluators(statusEvaluators);
}
state.setStatusEvaluators(statusEvaluators);
}
}

for (Object evaluatorObj : statusEvaluators.keySet()) {
Evaluator evaluator = (Evaluator) evaluatorObj;
String statusVal = statusEvaluators.get(evaluator);
if (evaluator.evaluate(context.getVariables())) {
stateInstance.setStatus(ExecutionStatus.valueOf(statusVal));
break;
for (Object evaluatorObj : statusEvaluators.keySet()) {
Evaluator evaluator = (Evaluator) evaluatorObj;
String statusVal = statusEvaluators.get(evaluator);
if (evaluator.evaluate(context.getVariables())) {
stateInstance.setStatus(ExecutionStatus.valueOf(statusVal));
break;
}
}
}

if (exp == null && (stateInstance.getStatus() == null || ExecutionStatus.RU.equals(stateInstance.getStatus()))) {
if (exp == null && (stateInstance.getStatus() == null || ExecutionStatus.RU.equals(stateInstance.getStatus()))) {

if (state.isForUpdate()) {
stateInstance.setStatus(ExecutionStatus.UN);
} else {
stateInstance.setStatus(ExecutionStatus.FA);
}
stateInstance.setGmtEnd(new Date());
if (state.isForUpdate()) {
stateInstance.setStatus(ExecutionStatus.UN);
} else {
stateInstance.setStatus(ExecutionStatus.FA);
}
stateInstance.setGmtEnd(new Date());

StateMachineInstance stateMachineInstance = stateInstance.getStateMachineInstance();
StateMachineInstance stateMachineInstance = stateInstance.getStateMachineInstance();

if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist() && stateMachineConfig.getStateLogStore() != null) {
stateMachineConfig.getStateLogStore().recordStateFinished(stateInstance, context);
}
if (stateMachineInstance.getStateMachine().isPersist() && state.isPersist() && stateMachineConfig.getStateLogStore() != null) {
stateMachineConfig.getStateLogStore().recordStateFinished(stateInstance, context);
}

EngineExecutionException exception = new EngineExecutionException("State [" + state.getName() + "] execute finished, but cannot matching status, pls check its status manually",
FrameworkErrorCode.NoMatchedStatus);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("State[{}] execute finish with status[{}]", state.getName(), stateInstance.getStatus());
}
EngineUtils.failStateMachine(context, exception);
EngineExecutionException exception = new EngineExecutionException("State [" + state.getName() + "] execute finished, but cannot matching status, pls check its status manually",
FrameworkErrorCode.NoMatchedStatus);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("State[{}] execute finish with status[{}]", state.getName(), stateInstance.getStatus());
}
EngineUtils.failStateMachine(context, exception);

throw exception;
throw exception;
}
}
}

Expand Down Expand Up @@ -449,8 +457,8 @@ private void decideExecutionStatus(ProcessContext context, StateInstance stateIn
}
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("State[{}] finish with status[{}]", state.getName(), stateInstance.getStatus());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("State[{}] finish with status[{}]", state.getName(), stateInstance.getStatus());
}
}

Expand Down
Expand Up @@ -35,6 +35,7 @@ public class ServiceTaskStateImpl extends AbstractTaskState implements ServiceTa
private List<Object> inputExpressions;
private Map<String, Object> outputExpressions;
private Map<Object, String> statusEvaluators;
private boolean isAsync;

public ServiceTaskStateImpl() {
setType(DomainConstants.STATE_TYPE_SERVICE_TASK);
Expand Down Expand Up @@ -107,4 +108,12 @@ public Map<Object, String> getStatusEvaluators() {
public void setStatusEvaluators(Map<Object, String> statusEvaluators) {
this.statusEvaluators = statusEvaluators;
}

public boolean isAsync() {
return isAsync;
}

public void setAsync(boolean async) {
isAsync = async;
}
}
Expand Up @@ -40,6 +40,10 @@ public ServiceTaskState parse(Object node) {
serviceTaskState.setServiceMethod((String)nodeMap.get("ServiceMethod"));
serviceTaskState.setServiceType((String)nodeMap.get("ServiceType"));
serviceTaskState.setParameterTypes((List<String>)nodeMap.get("ParameterTypes"));
Object isAsync = nodeMap.get("IsAsync");
if(isAsync != null && Boolean.TRUE.equals(isAsync)){
serviceTaskState.setAsync(true);
}

return serviceTaskState;
}
Expand Down
Expand Up @@ -159,6 +159,32 @@ public void testStateMachineWithComplextParams() {
Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));
}

@Test
public void testSimpleStateMachineWithAsyncState() {

long start = System.currentTimeMillis();

Map<String, Object> paramMap = new HashMap<>(1);
paramMap.put("a", 1);

String stateMachineName = "simpleStateMachineWithAsyncState";

StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, callback);

waittingForFinish(inst);

long cost = System.currentTimeMillis() - start;
System.out.println("====== cost :" + cost);

Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void waittingForFinish(StateMachineInstance inst){
synchronized (lock){
if(ExecutionStatus.RU.equals(inst.getStatus())){
Expand Down
24 changes: 24 additions & 0 deletions test/src/test/java/io/seata/saga/engine/StateMachineTests.java
Expand Up @@ -228,4 +228,28 @@ public void testStateMachineWithComplextParams() {

Assertions.assertTrue(ExecutionStatus.SU.equals(instance.getStatus()));
}

@Test
public void testSimpleStateMachineWithAsyncState() {

long start = System.currentTimeMillis();

Map<String, Object> paramMap = new HashMap<>(1);
paramMap.put("a", 1);

String stateMachineName = "simpleStateMachineWithAsyncState";

StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap);

long cost = System.currentTimeMillis() - start;
System.out.println("====== cost :" + cost);

Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Expand Up @@ -269,6 +269,30 @@ public void testReloadStateMachineInstance(){
System.out.println(instance);
}

@Test
public void testSimpleStateMachineWithAsyncState() {

long start = System.currentTimeMillis();

Map<String, Object> paramMap = new HashMap<>(1);
paramMap.put("a", 1);

String stateMachineName = "simpleStateMachineWithAsyncState";

StateMachineInstance inst = stateMachineEngine.start(stateMachineName, null, paramMap);

long cost = System.currentTimeMillis() - start;
System.out.println("====== cost :" + cost);

Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));

try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

@Test
public void testSimpleCatchesStateMachineAsync() throws Exception {

Expand Down Expand Up @@ -374,6 +398,31 @@ public void testCompensationAndSubStateMachineAsync() throws Exception {
Assertions.assertTrue(GlobalStatus.CommitRetrying.equals(globalTransaction.getStatus()));
}

@Test
public void testAsyncStartSimpleStateMachineWithAsyncState() {

long start = System.currentTimeMillis();

Map<String, Object> paramMap = new HashMap<>(1);
paramMap.put("a", 1);

String stateMachineName = "simpleStateMachineWithAsyncState";

StateMachineInstance inst = stateMachineEngine.startAsync(stateMachineName, null, paramMap, callback);

waittingForFinish(inst);

Assertions.assertTrue(ExecutionStatus.SU.equals(inst.getStatus()));

long cost = System.currentTimeMillis() - start;
System.out.println("====== cost :" + cost);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

private void waittingForFinish(StateMachineInstance inst){
synchronized (lock){
if(ExecutionStatus.RU.equals(inst.getStatus())){
Expand Down

0 comments on commit 29f805a

Please sign in to comment.