Skip to content

Commit

Permalink
optimize: when in TCC mode there is no need to delete global locks (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
slievrly committed Mar 14, 2022
1 parent cc9eee8 commit 193a112
Show file tree
Hide file tree
Showing 16 changed files with 126 additions and 76 deletions.
1 change: 1 addition & 0 deletions changes/1.5.0.md
Expand Up @@ -185,6 +185,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#4299](https://github.com/seata/seata/pull/4296)] 优化异常提示
- [[#4300](https://github.com/seata/seata/pull/4300)] 优化NettyRemotingServer的close()由DefaultCoordinator来调用,不再额外注册到ServerRunner
- [[#4270](https://github.com/seata/seata/pull/4270)] 提高全局提交和全局回滚的性能,分支事务清理异步化
- [[#4307](https://github.com/seata/seata/pull/4307)] 优化在TCC模式减少不必要的全局锁删除
- [[#4303](https://github.com/seata/seata/pull/4303)] `tcc_fence_log`表悬挂日志记录异步删除
- [[#4328](https://github.com/seata/seata/pull/4328)] 配置上传脚本支持注释
- [[#4305](https://github.com/seata/seata/pull/4305)] 优化tc端全局锁获取失败时的日志打印
Expand Down
1 change: 1 addition & 0 deletions changes/en-us/1.5.0.md
Expand Up @@ -190,6 +190,7 @@
- [[#4299](https://github.com/seata/seata/pull/4296)] optimize exceptions to make them friendly
- [[#4300](https://github.com/seata/seata/pull/4300)] optimize let DefaultCoordinator invoke NettyRemotingServer's close method,no longer closed by ServerRunner
- [[#4270](https://github.com/seata/seata/pull/4270)] improve the performance of global commit and global rollback, asynchronous branch transaction cleanup
- [[#4307](https://github.com/seata/seata/pull/4307)] when in TCC mode there is no need to delete global locks
- [[#4303](https://github.com/seata/seata/pull/4303)] `tcc_fence_log` table hanging log records are deleted asynchronously
- [[#4328](https://github.com/seata/seata/pull/4328)] upload configuration script support comments
- [[#4305](https://github.com/seata/seata/pull/4305)] optimize acquire global lock fail error log print on tc
Expand Down
Expand Up @@ -438,6 +438,8 @@ protected void handleRetryRollbacking(Collection<GlobalSession> rollbackingSessi
SessionHolder.getRetryRollbackingSessionManager().removeGlobalSession(rollbackingSession);
LOGGER.info("Global transaction rollback retry timeout and has removed [{}]", rollbackingSession.getXid());

SessionHelper.endRollbackFailed(rollbackingSession);

// rollback retry timeout event
SessionHelper.postTcSessionEndEvent(rollbackingSession, GlobalStatus.RollbackRetryTimeout);

Expand Down Expand Up @@ -495,7 +497,7 @@ private boolean isRetryTimeout(long now, long timeout, long beginTime) {

/**
* Handle async committing.
*
*
* @param asyncCommittingSessions
*/
protected void handleAsyncCommitting(Collection<GlobalSession> asyncCommittingSessions) {
Expand Down
18 changes: 7 additions & 11 deletions server/src/main/java/io/seata/server/coordinator/DefaultCore.java
Expand Up @@ -155,7 +155,7 @@ public GlobalStatus commit(String xid) throws TransactionException {
globalSession.asyncCommit();
return false;
} else {
globalSession.changeStatus(GlobalStatus.Committing);
globalSession.changeGlobalStatus(GlobalStatus.Committing);
return true;
}
}
Expand Down Expand Up @@ -204,15 +204,11 @@ public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) thr
SessionHelper.removeBranch(globalSession, branchSession, !retrying);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if (globalSession.canBeCommittedAsync()) {
LOGGER.error(
"Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession.getBranchId());
return CONTINUE;
} else {
SessionHelper.endCommitFailed(globalSession);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;
}
//not at branch
SessionHelper.endCommitFailed(globalSession);
LOGGER.error("Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession.getXid(), branchSession.getBranchId());
return false;

default:
if (!retrying) {
globalSession.queueToRetryCommit();
Expand Down Expand Up @@ -273,7 +269,7 @@ public GlobalStatus rollback(String xid) throws TransactionException {
boolean shouldRollBack = SessionHolder.lockAndExecute(globalSession, () -> {
globalSession.close(); // Highlight: Firstly, close the session, then no more branch can be registered.
if (globalSession.getStatus() == GlobalStatus.Begin) {
globalSession.changeStatus(GlobalStatus.Rollbacking);
globalSession.changeGlobalStatus(GlobalStatus.Rollbacking);
return true;
}
return false;
Expand Down
Expand Up @@ -148,10 +148,15 @@ public void onClose(GlobalSession globalSession) throws TransactionException {
}

@Override
public void onEnd(GlobalSession globalSession) throws TransactionException {
public void onSuccessEnd(GlobalSession globalSession) throws TransactionException {
removeGlobalSession(globalSession);
}

@Override
public void onFailEnd(GlobalSession globalSession) throws TransactionException {
LOGGER.info("xid:{} fail end, transaction:{}",globalSession.getXid(),globalSession.toString());
}

private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
Expand Down
33 changes: 28 additions & 5 deletions server/src/main/java/io/seata/server/session/GlobalSession.java
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import io.seata.common.Constants;
import io.seata.common.DefaultValues;
import io.seata.common.XID;
Expand Down Expand Up @@ -197,7 +198,7 @@ public void begin() throws TransactionException {
}

@Override
public void changeStatus(GlobalStatus status) throws TransactionException {
public void changeGlobalStatus(GlobalStatus status) throws TransactionException {
if (GlobalStatus.Rollbacking == status) {
LockerManagerFactory.getLockManager().updateLockStatus(xid, LockStatus.Rollbacking);
}
Expand Down Expand Up @@ -232,12 +233,25 @@ public void close() throws TransactionException {

@Override
public void end() throws TransactionException {
// Clean locks first
clean();
if (isSuccessEnd()) {
// Clean locks first
clean();
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onSuccessEnd(this);
}
} else {
for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onFailEnd(this);
}
}
}

for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onEnd(this);
public boolean isSuccessEnd() {
if (status == GlobalStatus.Committed || status == GlobalStatus.Rollbacked
|| status == GlobalStatus.TimeoutRollbacked) {
return true;
}
return false;
}

public void clean() throws TransactionException {
Expand Down Expand Up @@ -750,4 +764,13 @@ public void queueToRetryRollback() throws TransactionException {
SessionHolder.getRetryRollbackingSessionManager().addGlobalSession(this);
}

@Override
public String toString() {
return "GlobalSession{" + "xid='" + xid + '\'' + ", transactionId=" + transactionId + ", status=" + status
+ ", applicationId='" + applicationId + '\'' + ", transactionServiceGroup='" + transactionServiceGroup
+ '\'' + ", transactionName='" + transactionName + '\'' + ", timeout=" + timeout + ", beginTime="
+ beginTime + ", applicationData='" + applicationData + '\'' + ", lazyLoadBranch=" + lazyLoadBranch
+ ", active=" + active + ", branchSessions=" + branchSessions + ", globalSessionLock=" + globalSessionLock
+ ", lifecycleListeners=" + lifecycleListeners + '}';
}
}
12 changes: 6 additions & 6 deletions server/src/main/java/io/seata/server/session/SessionHelper.java
Expand Up @@ -116,7 +116,7 @@ public static BranchSession newBranch(BranchType branchType, String xid, long br
* @throws TransactionException the transaction exception
*/
public static void endCommitted(GlobalSession globalSession) throws TransactionException {
globalSession.changeStatus(GlobalStatus.Committed);
globalSession.changeGlobalStatus(GlobalStatus.Committed);
globalSession.end();
postTcSessionEndEvent(globalSession);
}
Expand All @@ -128,7 +128,7 @@ public static void endCommitted(GlobalSession globalSession) throws TransactionE
* @throws TransactionException the transaction exception
*/
public static void endCommitFailed(GlobalSession globalSession) throws TransactionException {
globalSession.changeStatus(GlobalStatus.CommitFailed);
globalSession.changeGlobalStatus(GlobalStatus.CommitFailed);
LOGGER.error("The Global session {} has changed the status to {}, need to be handled it manually.", globalSession.getXid(), globalSession.getStatus());

globalSession.end();
Expand All @@ -144,9 +144,9 @@ public static void endCommitFailed(GlobalSession globalSession) throws Transacti
public static void endRollbacked(GlobalSession globalSession) throws TransactionException {
GlobalStatus currentStatus = globalSession.getStatus();
if (isTimeoutGlobalStatus(currentStatus)) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbacked);
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbacked);
} else {
globalSession.changeStatus(GlobalStatus.Rollbacked);
globalSession.changeGlobalStatus(GlobalStatus.Rollbacked);
}
globalSession.end();
postTcSessionEndEvent(globalSession);
Expand All @@ -161,9 +161,9 @@ public static void endRollbacked(GlobalSession globalSession) throws Transaction
public static void endRollbackFailed(GlobalSession globalSession) throws TransactionException {
GlobalStatus currentStatus = globalSession.getStatus();
if (isTimeoutGlobalStatus(currentStatus)) {
globalSession.changeStatus(GlobalStatus.TimeoutRollbackFailed);
globalSession.changeGlobalStatus(GlobalStatus.TimeoutRollbackFailed);
} else {
globalSession.changeStatus(GlobalStatus.RollbackFailed);
globalSession.changeGlobalStatus(GlobalStatus.RollbackFailed);
}
LOGGER.error("The Global session {} has changed the status to {}, need to be handled it manually.", globalSession.getXid(), globalSession.getStatus());

Expand Down
Expand Up @@ -39,7 +39,7 @@ public interface SessionLifecycle {
* @param status the status
* @throws TransactionException the transaction exception
*/
void changeStatus(GlobalStatus status) throws TransactionException;
void changeGlobalStatus(GlobalStatus status) throws TransactionException;

/**
* Change branch status.
Expand Down Expand Up @@ -81,7 +81,7 @@ public interface SessionLifecycle {
void close() throws TransactionException;

/**
* End.
* end.
*
* @throws TransactionException the transaction exception
*/
Expand Down
Expand Up @@ -86,5 +86,13 @@ void onBranchStatusChange(GlobalSession globalSession, BranchSession branchSessi
* @param globalSession the global session
* @throws TransactionException the transaction exception
*/
void onEnd(GlobalSession globalSession) throws TransactionException;
void onSuccessEnd(GlobalSession globalSession) throws TransactionException;

/**
* On fail end.
*
* @param globalSession the global session
* @throws TransactionException the transaction exception
*/
void onFailEnd(GlobalSession globalSession) throws TransactionException;
}
Expand Up @@ -121,7 +121,7 @@ public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) thr
case PhaseOne_Failed:
LOGGER.error("By [{}], finish SAGA global [{}]", branchStatus, globalSession.getXid());
SessionHelper.removeAllBranch(globalSession, !retrying);
globalSession.changeStatus(GlobalStatus.Finished);
globalSession.changeGlobalStatus(GlobalStatus.Finished);
globalSession.end();
return false;
case PhaseTwo_CommitFailed_Unretryable:
Expand All @@ -137,11 +137,10 @@ public boolean doGlobalCommit(GlobalSession globalSession, boolean retrying) thr
default:
if (!retrying) {
globalSession.queueToRetryCommit();
return false;
} else {
LOGGER.error("Failed to commit SAGA global[{}], will retry later.", globalSession.getXid());
return false;
}
return false;
}
} catch (Exception ex) {
LOGGER.error("Failed to commit global[" + globalSession.getXid() + "]", ex);
Expand Down Expand Up @@ -203,7 +202,7 @@ public void doGlobalReport(GlobalSession globalSession, String xid, GlobalStatus
SessionHelper.endRollbacked(globalSession);
LOGGER.info("Global[{}] rollbacked", globalSession.getXid());
} else {
globalSession.changeStatus(globalStatus);
globalSession.changeGlobalStatus(globalStatus);
LOGGER.info("Global[{}] reporting is successfully done. status[{}]", globalSession.getXid(), globalSession.getStatus());

if (GlobalStatus.RollbackRetrying.equals(globalStatus)
Expand Down
7 changes: 6 additions & 1 deletion server/src/test/java/WriteStoreMultithreadTest.java
Expand Up @@ -167,7 +167,12 @@ public void onClose(GlobalSession globalSession) throws TransactionException {
}

@Override
public void onEnd(GlobalSession globalSession) throws TransactionException {
public void onSuccessEnd(GlobalSession globalSession) throws TransactionException {

}

@Override
public void onFailEnd(GlobalSession globalSession) throws TransactionException {

}
});
Expand Down
7 changes: 6 additions & 1 deletion server/src/test/java/WriteStoreTest.java
Expand Up @@ -183,7 +183,12 @@ public void onClose(GlobalSession globalSession) throws TransactionException {
}

@Override
public void onEnd(GlobalSession globalSession) throws TransactionException {
public void onSuccessEnd(GlobalSession globalSession) throws TransactionException {

}

@Override
public void onFailEnd(GlobalSession globalSession) throws TransactionException {

}
});
Expand Down

0 comments on commit 193a112

Please sign in to comment.