Skip to content

Commit

Permalink
bugfix: fix RetryRollbacking return NPE (#4561)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Apr 22, 2022
1 parent 496d08c commit a05e5d9
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 21 deletions.
2 changes: 2 additions & 0 deletions changes/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,10 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [[#4492](https://github.com/seata/seata/pull/4492)] 修复develop分支下eureka注册中心无法动态更新服务列表的问题
- [[#4228](https://github.com/seata/seata/pull/4228)] 修复tc获取不同ip的rm连接导致的xa模式资源悬挂问题
- [[#4535](https://github.com/seata/seata/pull/4535)] 修复FileSessionManagerTest单测错误
- [[#4561](https://github.com/seata/seata/pull/4535)] 修复 allSessions/findGlobalSessions 某些情况下返回null
- [[#4505](https://github.com/seata/seata/pull/4505)] 修复time类型的fastjson序列化问题


### optimize:
- [[#4163](https://github.com/seata/seata/pull/4163)] 完善开发者奉献文档
- [[#3678](https://github.com/seata/seata/pull/3678)] 补充遗漏的配置及新版本pr登记md文件
Expand Down
2 changes: 2 additions & 0 deletions changes/en-us/1.5.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,10 @@
- [[#4474](https://github.com/seata/seata/pull/4474)] fix Mysql multi-bit Bit type field rollback error
- [[#4492](https://github.com/seata/seata/pull/4492)] fix the failure to update cluster list dynamically when use eureka of the develop branch
- [[#4535](https://github.com/seata/seata/pull/4535)] fix FileSessionManagerTest fail
- [[#4561](https://github.com/seata/seata/pull/4535)] fix allSessions/findGlobalSessions may return null and cause npe
- [[#4505](https://github.com/seata/seata/pull/4505)] fix fastjson serialization of time data types


### optimize:
- [[#4163](https://github.com/seata/seata/pull/4163)] improve CONTRIBUTING docs
- [[#3678](https://github.com/seata/seata/pull/3678)] supplement missing configuration and new version documents
Expand Down
2 changes: 1 addition & 1 deletion script/server/db/mysql.sql
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ CREATE TABLE IF NOT EXISTS `lock_table`
`gmt_modified` DATETIME,
PRIMARY KEY (`row_key`),
KEY `idx_status` (`status`),
KEY `idx_branch_id` (`branch_id`)
KEY `idx_branch_id` (`branch_id`),
KEY `idx_xid_and_branch_id` (`xid` , `branch_id`)
) ENGINE = InnoDB
DEFAULT CHARSET = utf8mb4;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ protected void handleRetryRollbacking() {
sessionCondition.setLazyLoadBranch(true);
Collection<GlobalSession> rollbackingSessions =
SessionHolder.getRetryRollbackingSessionManager().findGlobalSessions(sessionCondition);
if (CollectionUtils.isEmpty(rollbackingSessions)) {
return;
}
long now = System.currentTimeMillis();
SessionHelper.forEach(rollbackingSessions, rollbackingSession -> {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.List;
import java.util.Objects;

import io.seata.common.util.CollectionUtils;
import io.seata.common.util.StringUtils;
import io.seata.config.Configuration;
import io.seata.config.ConfigurationFactory;
Expand Down Expand Up @@ -216,6 +217,9 @@ public static boolean isTimeoutGlobalStatus(GlobalStatus status) {
* @since 1.5.0
*/
public static void forEach(Collection<GlobalSession> sessions, GlobalSessionHandler handler) {
if (CollectionUtils.isEmpty(sessions)) {
return;
}
sessions.parallelStream().forEach(globalSession -> {
try {
MDC.put(RootContext.MDC_KEY_XID, globalSession.getXid());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,15 @@ public Collection<GlobalSession> allSessions() {
if (SessionHolder.ASYNC_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(GlobalStatus.AsyncCommitting));
} else if (SessionHolder.RETRY_COMMITTING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.CommitRetrying, GlobalStatus.Committing}));
return findGlobalSessions(new SessionCondition(GlobalStatus.CommitRetrying, GlobalStatus.Committing));
} else if (SessionHolder.RETRY_ROLLBACKING_SESSION_MANAGER_NAME.equalsIgnoreCase(taskName)) {
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {GlobalStatus.RollbackRetrying,
GlobalStatus.Rollbacking, GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying}));
return findGlobalSessions(new SessionCondition(GlobalStatus.RollbackRetrying, GlobalStatus.Rollbacking,
GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying));
} else {
// all data
return findGlobalSessions(new SessionCondition(new GlobalStatus[] {
GlobalStatus.UnKnown, GlobalStatus.Begin,
GlobalStatus.Committing, GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking,
GlobalStatus.RollbackRetrying,
GlobalStatus.TimeoutRollbacking, GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting}));
return findGlobalSessions(new SessionCondition(GlobalStatus.UnKnown, GlobalStatus.Begin, GlobalStatus.Committing,
GlobalStatus.CommitRetrying, GlobalStatus.Rollbacking, GlobalStatus.RollbackRetrying, GlobalStatus.TimeoutRollbacking,
GlobalStatus.TimeoutRollbackRetrying, GlobalStatus.AsyncCommitting));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,20 @@ public List<GlobalSession> readSession(GlobalStatus[] statuses, boolean withBran
}
//global transaction
List<GlobalTransactionDO> globalTransactionDOs = logStore.queryGlobalTransactionDO(states, logQueryLimit);
if (CollectionUtils.isEmpty(globalTransactionDOs)) {
return null;
}
List<String> xids = globalTransactionDOs.stream().map(GlobalTransactionDO::getXid).collect(Collectors.toList());
Map<String, List<BranchTransactionDO>> branchTransactionDOsMap;
if (withBranchSessions) {
List<BranchTransactionDO> branchTransactionDOs = logStore.queryBranchTransactionDO(xids);
branchTransactionDOsMap = branchTransactionDOs.stream()
.collect(Collectors.groupingBy(BranchTransactionDO::getXid, LinkedHashMap::new, Collectors.toList()));
} else {
branchTransactionDOsMap = Collections.emptyMap();
Map<String, List<BranchTransactionDO>> branchTransactionDOsMap = Collections.emptyMap();
if (CollectionUtils.isNotEmpty(globalTransactionDOs)) {
List<String> xids =
globalTransactionDOs.stream().map(GlobalTransactionDO::getXid).collect(Collectors.toList());
if (withBranchSessions) {
List<BranchTransactionDO> branchTransactionDOs = logStore.queryBranchTransactionDO(xids);
branchTransactionDOsMap = branchTransactionDOs.stream().collect(
Collectors.groupingBy(BranchTransactionDO::getXid, LinkedHashMap::new, Collectors.toList()));
}
}
Map<String, List<BranchTransactionDO>> finalBranchTransactionDOsMap = branchTransactionDOsMap;
return globalTransactionDOs.stream()
.map(globalTransactionDO -> getGlobalSession(globalTransactionDO,
branchTransactionDOsMap.get(globalTransactionDO.getXid()), withBranchSessions))
finalBranchTransactionDOsMap.get(globalTransactionDO.getXid()), withBranchSessions))
.collect(Collectors.toList());
}

Expand Down

0 comments on commit a05e5d9

Please sign in to comment.