Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: custom saga transaction recovery strategy on transaction timeout #2240

Merged
merged 15 commits into from
Feb 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ before_script:

script:
- if [ "$TRAVIS_BRANCH" == "develop" ] && [ "$TRAVIS_PULL_REQUEST" == false ]; then
travis_wait 30 ./mvnw clean install -DskipTests=false -P image -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
travis_wait 30 ./mvnw clean install -DskipTests=false -q -P image -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
else
travis_wait 30 ./mvnw clean install -DskipTests=false -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
travis_wait 30 ./mvnw clean install -DskipTests=false -q -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
fi
after_success:
- bash <(curl -s https://codecov.io/bash)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +32,7 @@
import io.seata.saga.engine.StateMachineConfig;
import io.seata.saga.engine.config.DbStateMachineConfig;
import io.seata.saga.engine.exception.EngineExecutionException;
import io.seata.saga.engine.pcext.utils.EngineUtils;
import io.seata.saga.engine.sequence.SeqGenerator;
import io.seata.saga.engine.serializer.Serializer;
import io.seata.saga.engine.serializer.impl.ExceptionSerializer;
Expand Down Expand Up @@ -82,7 +84,17 @@ public class DbAndReportTcStateLogStore extends AbstractStore implements StateLo
public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) {

if (machineInstance != null) {
beginTransaction(machineInstance, context);
//if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,
//use parent transaction instead.
String parentId = machineInstance.getParentId();
if (StringUtils.hasLength(parentId)) {
if (StringUtils.isEmpty(machineInstance.getId())) {
machineInstance.setId(parentId);
}
} else {
beginTransaction(machineInstance, context);
}


if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) {
machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
Expand All @@ -95,20 +107,10 @@ public void recordStateMachineStarted(StateMachineInstance machineInstance, Proc
}
}

private void beginTransaction(StateMachineInstance machineInstance, ProcessContext context) {
protected void beginTransaction(StateMachineInstance machineInstance, ProcessContext context) {

if (sagaTransactionalTemplate != null) {

//if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,
//use parent transaction instead.
String parentId = machineInstance.getParentId();
if (StringUtils.hasLength(parentId)) {
if (StringUtils.isEmpty(machineInstance.getId())) {
machineInstance.setId(parentId);
}
return;
}

StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
TransactionInfo transactionInfo = new TransactionInfo();
Expand Down Expand Up @@ -156,25 +158,25 @@ public void recordStateMachineFinished(StateMachineInstance machineInstance, Pro
machineInstance.setSerializedException(exceptionSerializer.serialize(machineInstance.getException()));
int effect = executeUpdate(stateLogStoreSqls.getRecordStateMachineFinishedSql(dbType),
STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_UPDATE, machineInstance);
if (effect < 0) {
if (effect < 1) {
LOGGER.warn("StateMachineInstance[{}] is recovery by server, skip recordStateMachineFinished.", machineInstance.getId());
} else {
StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
if (machineInstance.isTimeout(stateMachineConfig.getTransOperationTimeout())) {
if (EngineUtils.isTimeout(machineInstance.getGmtUpdated(), stateMachineConfig.getTransOperationTimeout())) {
LOGGER.warn("StateMachineInstance[{}] is execution timeout, skip report transaction finished to server.", machineInstance.getId());
} else {
} else if (StringUtils.isEmpty(machineInstance.getParentId())) {
//if parentId is not null, machineInstance is a SubStateMachine, do not report global transaction.
reportTransactionFinished(machineInstance, context);
}
}
RootContext.unbind();
}
}

private void reportTransactionFinished(StateMachineInstance machineInstance, ProcessContext context) {
protected void reportTransactionFinished(StateMachineInstance machineInstance, ProcessContext context) {

//if parentId is not null, machineInstance is a SubStateMachine, do not report global transaction.
if (sagaTransactionalTemplate != null && StringUtils.isEmpty(machineInstance.getParentId())) {
if (sagaTransactionalTemplate != null) {

try {
GlobalTransaction globalTransaction = getGlobalTransaction(machineInstance, context);
Expand Down Expand Up @@ -226,8 +228,14 @@ public void recordStateMachineRestarted(StateMachineInstance machineInstance, Pr

if (machineInstance != null) {
//save to db
executeUpdate(stateLogStoreSqls.getUpdateStateMachineRunningStatusSql(dbType), machineInstance.isRunning(), new Timestamp(machineInstance.getGmtUpdated().getTime()),
machineInstance.getId());
Date gmtUpdated = new Date();
int effect = executeUpdate(stateLogStoreSqls.getUpdateStateMachineRunningStatusSql(dbType), machineInstance.isRunning(), new Timestamp(gmtUpdated.getTime()),
machineInstance.getId(), new Timestamp(machineInstance.getGmtUpdated().getTime()));
if (effect < 1) {
throw new EngineExecutionException(
"StateMachineInstance [id:" + machineInstance.getId() + "] is recovered by an other execution, restart denied", FrameworkErrorCode.OperationDenied);
}
machineInstance.setGmtUpdated(gmtUpdated);
}
}

Expand All @@ -236,7 +244,20 @@ public void recordStateStarted(StateInstance stateInstance, ProcessContext conte

if (stateInstance != null) {

branchRegister(stateInstance, context);
//if this state is for retry, do not register branch, but generate id
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {

stateInstance.setId(generateRetryStateInstanceId(stateInstance));
}
//if this state is for compensation, do not register branch, but generate id
else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {

stateInstance.setId(generateCompensateStateInstanceId(stateInstance));
}
else {
branchRegister(stateInstance, context);
}


if (StringUtils.isEmpty(stateInstance.getId()) && seqGenerator != null) {
stateInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_INST));
Expand All @@ -248,49 +269,38 @@ public void recordStateStarted(StateInstance stateInstance, ProcessContext conte
}
}

private void branchRegister(StateInstance stateInstance, ProcessContext context) {
protected void branchRegister(StateInstance stateInstance, ProcessContext context) {

if (sagaTransactionalTemplate != null) {

//if this state is for retry, do not register branch, but generate id
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {

stateInstance.setId(generateRetryStateInstanceId(stateInstance));
}
//if this state is for compensation, do not register branch, but generate id
else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {

stateInstance.setId(generateCompensateStateInstanceId(stateInstance));
} else {
//Register branch
try {
StateMachineInstance machineInstance = stateInstance.getStateMachineInstance();
GlobalTransaction globalTransaction = getGlobalTransaction(machineInstance, context);
if (globalTransaction == null) {
throw new EngineExecutionException("Global transaction is not exists", FrameworkErrorCode.ObjectNotExists);
}

String resourceId = stateInstance.getStateMachineInstance().getStateMachine().getName() + "#" + stateInstance.getName();
long branchId = sagaTransactionalTemplate.branchRegister(resourceId, null, globalTransaction.getXid(), null, null);
stateInstance.setId(String.valueOf(branchId));
} catch (TransactionException e) {
throw new EngineExecutionException(e,
"Branch transaction error: " + e.getCode() + ", StateMachine:" + stateInstance.getStateMachineInstance()
.getStateMachine().getName() + ", XID: " + stateInstance.getStateMachineInstance().getId() + ", State:"
+ stateInstance.getName() + ", stateId: " + stateInstance.getId() + ", Reason: " + e.getMessage(),
FrameworkErrorCode.TransactionManagerError);
} catch (ExecutionException e) {
throw new EngineExecutionException(e,
"Branch transaction error: " + e.getCode() + ", StateMachine:" + stateInstance.getStateMachineInstance()
.getStateMachine().getName() + ", XID: " + stateInstance.getStateMachineInstance().getId() + ", State:"
+ stateInstance.getName() + ", stateId: " + stateInstance.getId() + ", Reason: " + e.getMessage(),
FrameworkErrorCode.TransactionManagerError);
//Register branch
try {
StateMachineInstance machineInstance = stateInstance.getStateMachineInstance();
GlobalTransaction globalTransaction = getGlobalTransaction(machineInstance, context);
if (globalTransaction == null) {
throw new EngineExecutionException("Global transaction is not exists", FrameworkErrorCode.ObjectNotExists);
}

String resourceId = stateInstance.getStateMachineInstance().getStateMachine().getName() + "#" + stateInstance.getName();
long branchId = sagaTransactionalTemplate.branchRegister(resourceId, null, globalTransaction.getXid(), null, null);
stateInstance.setId(String.valueOf(branchId));
} catch (TransactionException e) {
throw new EngineExecutionException(e,
"Branch transaction error: " + e.getCode() + ", StateMachine:" + stateInstance.getStateMachineInstance()
.getStateMachine().getName() + ", XID: " + stateInstance.getStateMachineInstance().getId() + ", State:"
+ stateInstance.getName() + ", stateId: " + stateInstance.getId() + ", Reason: " + e.getMessage(),
FrameworkErrorCode.TransactionManagerError);
} catch (ExecutionException e) {
throw new EngineExecutionException(e,
"Branch transaction error: " + e.getCode() + ", StateMachine:" + stateInstance.getStateMachineInstance()
.getStateMachine().getName() + ", XID: " + stateInstance.getStateMachineInstance().getId() + ", State:"
+ stateInstance.getName() + ", stateId: " + stateInstance.getId() + ", Reason: " + e.getMessage(),
FrameworkErrorCode.TransactionManagerError);
}
}
}

private GlobalTransaction getGlobalTransaction(StateMachineInstance machineInstance, ProcessContext context)
protected GlobalTransaction getGlobalTransaction(StateMachineInstance machineInstance, ProcessContext context)
throws ExecutionException, TransactionException {

GlobalTransaction globalTransaction = (GlobalTransaction) context.getVariable(DomainConstants.VAR_NAME_GLOBAL_TX);
Expand Down Expand Up @@ -392,7 +402,7 @@ public void recordStateFinished(StateInstance stateInstance, ProcessContext cont
}
}

private void branchReport(StateInstance stateInstance, ProcessContext context) {
protected void branchReport(StateInstance stateInstance, ProcessContext context) {

if (sagaTransactionalTemplate != null) {

Expand Down Expand Up @@ -721,8 +731,9 @@ public void toStatement(StateMachineInstance stateMachineInstance, PreparedState
stateMachineInstance.getCompensationStatus() != null ? stateMachineInstance.getCompensationStatus()
.name() : null);
statement.setBoolean(6, stateMachineInstance.isRunning());
statement.setString(7, stateMachineInstance.getId());
statement.setTimestamp(8, new Timestamp(stateMachineInstance.getGmtUpdated().getTime()));
statement.setTimestamp(7, new Timestamp(System.currentTimeMillis()));
statement.setString(8, stateMachineInstance.getId());
statement.setTimestamp(9, new Timestamp(stateMachineInstance.getGmtUpdated().getTime()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.sql.Timestamp;
import java.util.List;

import io.seata.common.util.StringUtils;
import io.seata.saga.engine.store.StateLangStore;
import io.seata.saga.statelang.domain.RecoverStrategy;
import io.seata.saga.statelang.domain.StateMachine;
import io.seata.saga.statelang.domain.StateMachine.Status;
import io.seata.saga.statelang.domain.impl.StateMachineImpl;
Expand Down Expand Up @@ -80,7 +82,10 @@ public StateMachine toObject(ResultSet resultSet) throws SQLException {
stateMachine.setContent(resultSet.getString("content"));
stateMachine.setGmtCreate(resultSet.getTimestamp("gmt_create"));
stateMachine.setType(resultSet.getString("type"));
stateMachine.setRecoverStrategy(resultSet.getString("recover_strategy"));
String recoverStrategy = resultSet.getString("recover_strategy");
if (StringUtils.isNotBlank(recoverStrategy)) {
stateMachine.setRecoverStrategy(RecoverStrategy.valueOf(recoverStrategy));
}
stateMachine.setTenantId(resultSet.getString("tenant_id"));
stateMachine.setStatus(Status.valueOf(resultSet.getString("status")));
return stateMachine;
Expand All @@ -99,7 +104,7 @@ public void toStatement(StateMachine stateMachine, PreparedStatement statement)
statement.setString(7, stateMachine.getVersion());
statement.setString(8, stateMachine.getType());
statement.setString(9, stateMachine.getContent());
statement.setString(10, stateMachine.getRecoverStrategy());
statement.setString(10, stateMachine.getRecoverStrategy() != null ? stateMachine.getRecoverStrategy().name() : null);
statement.setString(11, stateMachine.getComment());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public class StateLogStoreSqls {

private static final String RECORD_STATE_MACHINE_FINISHED_SQL
= "UPDATE ${TABLE_PREFIX}state_machine_inst SET gmt_end = ?, excep = ?, end_params = ?,status = ?, "
+ "compensation_status = ?, is_running = ?, gmt_updated = current_timestamp WHERE id = ? and gmt_updated = ?";
+ "compensation_status = ?, is_running = ?, gmt_updated = ? WHERE id = ? and gmt_updated = ?";

private static final String UPDATE_STATE_MACHINE_RUNNING_STATUS_SQL =
"UPDATE ${TABLE_PREFIX}state_machine_inst SET\n"
+ "is_running = ?, gmt_updated = ? where id = ?";
+ "is_running = ?, gmt_updated = ? where id = ? and gmt_updated = ?";

private static final String GET_STATE_MACHINE_INSTANCE_BY_ID_SQL = "SELECT " + STATE_MACHINE_INSTANCE_FIELDS
+ " FROM ${TABLE_PREFIX}state_machine_inst WHERE id = ?";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,10 @@ public interface StateMachineConfig {
* @return
*/
int getTransOperationTimeout();

/**
* get service invoke timeout
* @return
*/
int getServiceInvokeTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ StateMachineInstance skipAndForwardAsync(String stateMachineInstId, AsyncCallbac
* @return
*/
StateMachineConfig getStateMachineConfig();

/**
* Reload StateMachine Instance
* @param instId
* @return
*/
StateMachineInstance reloadStateMachineInstance(String instId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ public class DefaultStateMachineConfig implements StateMachineConfig, Applicatio

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultStateMachineConfig.class);

private static final int DEFAULT_TRANS_OPER_TIMEOUT = 60000 * 30;
private static final int DEFAULT_TRANS_OPER_TIMEOUT = 60000 * 30;
private static final int DEFAULT_SERVICE_INVOKE_TIMEOUT = 60000 * 5;

private int transOperationTimeout = DEFAULT_TRANS_OPER_TIMEOUT;
private int serviceInvokeTimeout = DEFAULT_SERVICE_INVOKE_TIMEOUT;

private StateLogRepository stateLogRepository;
private StateLogStore stateLogStore;
Expand Down Expand Up @@ -393,4 +395,12 @@ public void setTransOperationTimeout(int transOperationTimeout) {
this.transOperationTimeout = transOperationTimeout;
}

@Override
public int getServiceInvokeTimeout() {
return serviceInvokeTimeout;
}

public void setServiceInvokeTimeout(int serviceInvokeTimeout) {
this.serviceInvokeTimeout = serviceInvokeTimeout;
}
}
Loading