Skip to content

Commit

Permalink
bugfix: fix xa resource suspension (#4228)
Browse files Browse the repository at this point in the history
  • Loading branch information
lcmvs committed Apr 8, 2022
1 parent 660166a commit 4a69bde
Show file tree
Hide file tree
Showing 20 changed files with 238 additions and 21 deletions.
2 changes: 1 addition & 1 deletion changes/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#4459](https://github.com/seata/seata/pull/4459)] 修复develop版本下oracle和pgsql数据库生成前后镜像失败的问题
- [[#4471](https://github.com/seata/seata/pull/4471)] 修复develop分支下,运行时切换事务分组对应集群引起的错误
- [[#4474](https://github.com/seata/seata/pull/4474)] 修复Mysql多位Bit类型字段回滚错误

- [[#4228](https://github.com/seata/seata/pull/4228)] 修复tc获取不同ip的rm连接导致的xa模式资源悬挂问题

### optimize:
- [[#4163](https://github.com/seata/seata/pull/4163)] 完善开发者奉献文档
Expand Down
1 change: 1 addition & 0 deletions changes/en-us/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@
- [[#4355](https://github.com/seata/seata/pull/4355)] fix mysql-loadbalance resource id error
- [[#4310](https://github.com/seata/seata/pull/4310)] fix the problem that failed to obtain the self increment ID of MySQL database through "select last_insert_id"
- [[#4331](https://github.com/seata/seata/pull/4331)] fix dirty write check exception that may occur when using ONLY_CARE_UPDATE_COLUMNS configuration
- [[#4228](https://github.com/seata/seata/pull/4228)] fix resource suspension in xa mode caused by choose other ip as channel alternative
- [[#4408](https://github.com/seata/seata/pull/4408)] fix the invalid environment variable in container env
- [[#4441](https://github.com/seata/seata/pull/4441)] fix the problem that pipelined resources are not closed in redis mode and add branchSession judge branchSessions is not null
- [[#4438](https://github.com/seata/seata/pull/4438)] fix the problem that GlobalSession could not be deleted normally in the case of delayed deletion in the file mode of the develop branch
Expand Down
15 changes: 15 additions & 0 deletions common/src/main/java/io/seata/common/ConfigurationKeys.java
Original file line number Diff line number Diff line change
Expand Up @@ -817,4 +817,19 @@ public interface ConfigurationKeys {
* The constant IS_USE_ENDPOINT_PARSING_RULE.
*/
String IS_USE_ENDPOINT_PARSING_RULE = "isUseEndpointParsingRule";

/**
* The constant XAER_NOTA_RETRY_TIMEOUT
*/
String XAER_NOTA_RETRY_TIMEOUT = SERVER_PREFIX + "xaerNotaRetryTimeout";

/**
* The constant XA_BRANCH_EXECUTION_TIMEOUT
*/
String XA_BRANCH_EXECUTION_TIMEOUT = CLIENT_RM_PREFIX + "branchExecutionTimeoutXA";

/**
* The constant XA_CONNECTION_TWO_PHASE_HOLD_TIMEOUT
*/
String XA_CONNECTION_TWO_PHASE_HOLD_TIMEOUT = CLIENT_RM_PREFIX + "connectionTwoPhaseHoldTimeoutXA";
}
16 changes: 16 additions & 0 deletions common/src/main/java/io/seata/common/DefaultValues.java
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,20 @@ public interface DefaultValues {
* the constant DEFAULT_RPC_TC_REQUEST_TIMEOUT
*/
long DEFAULT_RPC_TC_REQUEST_TIMEOUT = Duration.ofSeconds(30).toMillis();

/**
* the constant DEFAULT_XAER_NOTA_RETRY_TIMEOUT
*/
int DEFAULT_XAER_NOTA_RETRY_TIMEOUT = 60000;

/**
* the constant DEFAULT_XA_BRANCH_EXECUTION_TIMEOUT
*/
int DEFAULT_XA_BRANCH_EXECUTION_TIMEOUT = 60000;

/**
* the constant DEFAULT_XA_TWO_PHASE_WAIT_TIMEOUT
*/
int DEFAULT_XA_CONNECTION_TWO_PHASE_HOLD_TIMEOUT = 10000;

}
13 changes: 13 additions & 0 deletions core/src/main/java/io/seata/core/context/RootContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ private RootContext() {
*/
public static final String KEY_XID = "TX_XID";

/**
* The constant KEY_TIMEOUT.
*/
public static final String KEY_TIMEOUT = "TX_TIMEOUT";

/**
* The constant MDC_KEY_XID for logback
* @since 1.5.0
Expand Down Expand Up @@ -116,6 +121,14 @@ public static void bind(@Nonnull String xid) {
}
}

public static Integer getTimeout() {
return (Integer) CONTEXT_HOLDER.get(KEY_TIMEOUT);
}

public static void setTimeout(Integer timeout) {
CONTEXT_HOLDER.put(KEY_TIMEOUT,timeout);
}

/**
* declare local transactions will use global lock check for update/delete/insert/selectForUpdate SQL
*/
Expand Down
14 changes: 13 additions & 1 deletion core/src/main/java/io/seata/core/model/BranchStatus.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,19 @@ public enum BranchStatus {
* The Phase two rollback failed unretryable.
* description:Rollback logic is failed but NOT retryable.
*/
PhaseTwo_RollbackFailed_Unretryable(10);
PhaseTwo_RollbackFailed_Unretryable(10),

/**
* The Phase two commit failed retryable because of XAException.XAER_NOTA.
* description:Commit logic is failed because of XAException.XAER_NOTA but retryable.
*/
PhaseTwo_CommitFailed_XAER_NOTA_Retryable(11),

/**
* The Phase two rollback failed retryable because of XAException.XAER_NOTA.
* description:rollback logic is failed because of XAException.XAER_NOTA but retryable.
*/
PhaseTwo_RollbackFailed_XAER_NOTA_Retryable(12);

private int code;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class BranchRegisterResponse extends AbstractTransactionResponse implemen

private long branchId;


/**
* Gets branch id.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,4 +488,5 @@ public void close(ChannelHandlerContext ctx, ChannelPromise future) throws Excep
super.close(ctx, future);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public abstract class BaseDataSourceResource<T extends Holdable> implements Seat
private Map<String, T> keeper = new ConcurrentHashMap<>();

private static final Cache<String, BranchStatus> BRANCH_STATUS_CACHE =
CacheBuilder.newBuilder().maximumSize(1024).expireAfterAccess(10, TimeUnit.MINUTES).build();
CacheBuilder.newBuilder().maximumSize(1024).expireAfterAccess(10, TimeUnit.MINUTES).build();

/**
* Gets target data source.
Expand Down Expand Up @@ -213,4 +213,8 @@ public static void remove(String xaBranchXid) {
}
}

public Map<String, T> getKeeper() {
return keeper;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@

import java.sql.Connection;
import java.sql.SQLException;
import javax.sql.PooledConnection;
import javax.sql.XAConnection;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;

import io.seata.common.DefaultValues;
import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus;
import io.seata.core.model.BranchType;
Expand All @@ -31,6 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.seata.common.ConfigurationKeys.XA_BRANCH_EXECUTION_TIMEOUT;

/**
* Connection proxy for XA mode.
*
Expand All @@ -40,6 +45,9 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold

private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class);

private static final int BRANCH_EXECUTION_TIMEOUT = ConfigurationFactory.getInstance().getInt(XA_BRANCH_EXECUTION_TIMEOUT,
DefaultValues.DEFAULT_XA_BRANCH_EXECUTION_TIMEOUT);

private volatile boolean currentAutoCommitStatus = true;

private volatile XAXid xaBranchXid;
Expand All @@ -49,7 +57,13 @@ public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Hold
private volatile boolean kept = false;

private volatile boolean rollBacked = false;


private volatile Long branchRegisterTime = null;

private volatile Long prepareTime = null;

private volatile Integer timeout = null;

/**
* Constructor of Connection Proxy for XA mode.
*
Expand All @@ -69,6 +83,11 @@ public void init() {
if (!currentAutoCommitStatus) {
throw new IllegalStateException("Connection[autocommit=false] as default is NOT supported");
}
Integer transactionTimeout = RootContext.getTimeout();
if (transactionTimeout == null) {
transactionTimeout = DefaultValues.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT;
}
timeout = Math.max(BRANCH_EXECUTION_TIMEOUT, transactionTimeout);
} catch (SQLException e) {
throw new RuntimeException(e);
}
Expand All @@ -85,7 +104,6 @@ private void releaseIfNecessary() {
if (isHeld()) {
resource.release(xaBranchXid, this);
}
BaseDataSourceResource.remove(xaBranchXid);
}
}

Expand Down Expand Up @@ -143,9 +161,10 @@ public void setAutoCommit(boolean autoCommit) throws SQLException {
// Start a XA branch
long branchId;
try {
// 1. register branch to TC then get the branchId
// 1. register branch to TC then get the branch message
branchRegisterTime = System.currentTimeMillis();
branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null,
null);
null);
} catch (TransactionException te) {
cleanXABranchContext();
throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te);
Expand Down Expand Up @@ -184,6 +203,9 @@ public synchronized void commit() throws SQLException {
}
try {
end(XAResource.TMSUCCESS);
long now = System.currentTimeMillis();
checkTimeout(now);
setPrepareTime(now);
xaResource.prepare(xaBranchXid);
} catch (XAException xe) {
try {
Expand Down Expand Up @@ -254,12 +276,22 @@ private synchronized void end(int flags) throws XAException, SQLException {
}

private void cleanXABranchContext() {
branchRegisterTime = null;
prepareTime = null;
timeout = null;
xaActive = false;
if (!isHeld()) {
xaBranchXid = null;
}
}

private void checkTimeout(Long now) throws XAException {
if (now - branchRegisterTime > timeout) {
xaRollback(xaBranchXid);
throw new XAException("XA branch timeout error");
}
}

@Override
public synchronized void close() throws SQLException {
rollBacked = false;
Expand All @@ -271,6 +303,19 @@ public synchronized void close() throws SQLException {
originalConnection.close();
}

protected synchronized void closeForce() throws SQLException {
Connection physicalConn = getWrappedConnection();
if (physicalConn instanceof PooledConnection) {
physicalConn = ((PooledConnection) physicalConn).getConnection();
}
// Force close the physical connection
physicalConn.close();
rollBacked = false;
cleanXABranchContext();
originalConnection.close();
releaseIfNecessary();
}

@Override
public void setHeld(boolean kept) {
this.kept = kept;
Expand All @@ -287,6 +332,14 @@ public boolean shouldBeHeld() {
|| StringUtils.isBlank(resource.getDbType());
}

public Long getPrepareTime() {
return prepareTime;
}

private void setPrepareTime(Long prepareTime) {
this.prepareTime = prepareTime;
}

private void termination() throws SQLException {
termination(this.xaBranchXid.toString());
}
Expand All @@ -297,7 +350,7 @@ private void termination(String xaBranchXid) throws SQLException {
if (branchStatus != null) {
releaseIfNecessary();
throw new SQLException("failed xa branch " + xid
+ " the global transaction has finish, branch status: " + branchStatus.getCode());
+ " the global transaction has finish, branch status: " + branchStatus.getCode());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
*/
package io.seata.rm.datasource.xa;

import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.sql.SQLException;
import javax.transaction.xa.XAException;

import io.seata.common.DefaultValues;
import io.seata.common.thread.NamedThreadFactory;
import io.seata.config.ConfigurationFactory;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.BranchStatus;
import io.seata.core.model.BranchType;
Expand All @@ -27,6 +33,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.seata.core.constants.ConfigurationKeys.XA_CONNECTION_TWO_PHASE_HOLD_TIMEOUT;

/**
* RM for XA mode.
*
Expand All @@ -36,9 +44,43 @@ public class ResourceManagerXA extends AbstractDataSourceCacheResourceManager {

private static final Logger LOGGER = LoggerFactory.getLogger(ResourceManagerXA.class);

private static final int TWO_PHASE_HOLD_TIMEOUT = ConfigurationFactory.getInstance().getInt(XA_CONNECTION_TWO_PHASE_HOLD_TIMEOUT,
DefaultValues.DEFAULT_XA_CONNECTION_TWO_PHASE_HOLD_TIMEOUT);

private static final long SCHEDULE_DELAY_MILLS = 60 * 1000L;
private static final long SCHEDULE_INTERVAL_MILLS = 1000L;
/**
* The Timer check xa branch two phase hold timeout.
*/
protected final ScheduledExecutorService xaTwoPhaseTimeoutChecker = new ScheduledThreadPoolExecutor(1,
new NamedThreadFactory("xaTwoPhaseTimeoutChecker", 1, true));

@Override
public void init() {
LOGGER.info("ResourceManagerXA init ...");
xaTwoPhaseTimeoutChecker.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
for (Map.Entry<String, Resource> entry : dataSourceCache.entrySet()) {
BaseDataSourceResource resource = (BaseDataSourceResource) entry.getValue();
Map<String, ConnectionProxyXA> keeper = resource.getKeeper();
for (Map.Entry<String, ConnectionProxyXA> connectionEntry : keeper.entrySet()) {
ConnectionProxyXA connection = connectionEntry.getValue();
long now = System.currentTimeMillis();
synchronized (connection) {
if (connection.getPrepareTime() != null &&
now - connection.getPrepareTime() > TWO_PHASE_HOLD_TIMEOUT) {
try {
connection.closeForce();
} catch (SQLException e) {
LOGGER.info("Force close the xa physical connection fail", e);
}
}
}
}
}
}
}, SCHEDULE_DELAY_MILLS, SCHEDULE_INTERVAL_MILLS, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -76,17 +118,17 @@ private BranchStatus finishBranch(boolean committed, BranchType branchType, Stri
}
} catch (XAException | SQLException sqle) {
if (sqle instanceof XAException) {
if (((XAException)sqle).errorCode == XAException.XAER_NOTA) {
try {
try {
if (((XAException) sqle).errorCode == XAException.XAER_NOTA) {
if (committed) {
return BranchStatus.PhaseTwo_Committed;
return BranchStatus.PhaseTwo_CommitFailed_XAER_NOTA_Retryable;
} else {
return BranchStatus.PhaseTwo_Rollbacked;
return BranchStatus.PhaseTwo_RollbackFailed_XAER_NOTA_Retryable;
}
} finally {
BaseDataSourceResource.setBranchStatus(xaBranchXid.toString(),
committed ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_Rollbacked);
}
} finally {
BaseDataSourceResource.setBranchStatus(xaBranchXid.toString(),
committed ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_Rollbacked);
}
}
if (committed) {
Expand Down

0 comments on commit 4a69bde

Please sign in to comment.