Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: fix the logic of rollback to savepoint and release to savepoint #3413

Merged
merged 19 commits into from Jan 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 5 additions & 1 deletion changes/1.5.0.md
Expand Up @@ -18,19 +18,22 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
### feature:

- [[#3172](https://github.com/seata/seata/pull/3172)] 支持undolog压缩
- [[#3372](https://github.com/seata/seata/pull/3372)] saga模式下支撑用户自定义是否更新最后一次重试日志
- [[#3372](https://github.com/seata/seata/pull/3372)] saga模式下支持用户自定义是否更新最后一次重试日志
- [[#3411](https://github.com/seata/seata/pull/3411)] 支持配置seata服务器的线程池参数
- [[#3348](https://github.com/seata/seata/pull/3348)] 支持redis哨兵模式
- [[#2667](https://github.com/seata/seata/pull/2667)] 支持db和redis密码加解密


### bugfix:

- [[#3258](https://github.com/seata/seata/pull/3258)] 修复AsyncWorker潜在的OOM问题
- [[#3293](https://github.com/seata/seata/pull/3293)] 修复配置缓存获取值时类型不匹配的bug
- [[#3241](https://github.com/seata/seata/pull/3241)] 禁止在多SQL的情况下使用 limit 和 order by 语法
- [[#3406](https://github.com/seata/seata/pull/3406)] 修复当config.txt中包含特殊字符时,键值对无法被推上nacos
- [[#3418](https://github.com/seata/seata/pull/3418)] 修复 getGeneratedKeys 可能会取到历史的主键的问题
- [[#3408](https://github.com/seata/seata/pull/3408)] 修复jar运行模式, 当第三方依赖分开打包时, this.getClass().getClassLoader()是null, 会报空指针异常
- [[#3431](https://github.com/seata/seata/pull/3431)] 修复在读取配置时Property Bean可能还未初始化
- [[#3413](https://github.com/seata/seata/pull/3413)] 修复回滚到savepoint以及releaseSavepoint的逻辑



Expand Down Expand Up @@ -79,6 +82,7 @@ Seata 是一款开源的分布式事务解决方案,提供高性能和简单
- [wangliang181230](https://github.com/wangliang181230)
- [xingfudeshi](https://github.com/xingfudeshi)
- [MentosL](https://github.com/MentosL)
- [lian88jian](https://github.com/lian88jian)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。

Expand Down
3 changes: 3 additions & 0 deletions changes/en-us/1.5.0.md
Expand Up @@ -30,7 +30,9 @@
- [[#3241](https://github.com/seata/seata/pull/3241)] forbidden use order by or limit in multi sql
- [[#3406](https://github.com/seata/seata/pull/3406)] fix the value can not be push to nacos when special charset in config.txt
- [[#3418](https://github.com/seata/seata/pull/3418)] fix getGeneratedKeys may get history pk
- [[#3408](https://github.com/seata/seata/pull/3408)] run with jar file and not package third lib into jar file, this.getClass().getClassLoader() will be null
- [[#3431](https://github.com/seata/seata/pull/3431)] fix property bean may not be initialized when reading configuration
- [[#3413](https://github.com/seata/seata/pull/3413)] fix the logic of rollback to savepoint and release to savepoint


### optimize:
Expand Down Expand Up @@ -75,6 +77,7 @@
- [wangliang181230](https://github.com/wangliang181230)
- [xingfudeshi](https://github.com/xingfudeshi)
- [MentosL](https://github.com/MentosL)
- [lian88jian](https://github.com/lian88jian)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.

Expand Down
Expand Up @@ -34,7 +34,6 @@
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.SQLXML;
import java.sql.Savepoint;
import java.sql.Statement;
import java.sql.Struct;
import java.util.List;
Expand Down Expand Up @@ -244,28 +243,6 @@ public int getHoldability() throws SQLException {
return targetConnection.getHoldability();
}

@Override
public Savepoint setSavepoint() throws SQLException {
return targetConnection.setSavepoint();
}

@Override
public Savepoint setSavepoint(String name) throws SQLException {
return targetConnection.setSavepoint(name);
}

@Override
public void rollback(Savepoint savepoint) throws SQLException {
targetConnection.rollback(savepoint);

}

@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
targetConnection.releaseSavepoint(savepoint);

}

@Override
public Statement createStatement(int resultSetType, int resultSetConcurrency, int resultSetHoldability)
throws SQLException {
Expand Down
Expand Up @@ -15,13 +15,19 @@
*/
package io.seata.rm.datasource;

import java.sql.SQLException;
import java.sql.Savepoint;
import java.util.Set;
import java.util.Map;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;


import io.seata.common.exception.ShouldNeverHappenException;
import io.seata.common.util.CollectionUtils;
import io.seata.rm.datasource.undo.SQLUndoLog;

/**
Expand All @@ -30,15 +36,33 @@
* @author sharajava
*/
public class ConnectionContext {
private static final Savepoint DEFAULT_SAVEPOINT = new Savepoint() {
@Override
public int getSavepointId() throws SQLException {
return 0;
}

@Override
public String getSavepointName() throws SQLException {
return "DEFAULT_SEATA_SAVEPOINT";
}
};

private String xid;
private Long branchId;
private boolean isGlobalLockRequire;
private Savepoint currentSavepoint = DEFAULT_SAVEPOINT;

/**
* Table and primary key should not be duplicated.
* the lock keys buffer
*/
private final Map<Savepoint, Set<String>> lockKeysBuffer = new LinkedHashMap<>();
/**
* the undo items buffer
*/
private Set<String> lockKeysBuffer = new HashSet<>();
private List<SQLUndoLog> sqlUndoItemsBuffer = new ArrayList<>();
private final Map<Savepoint, List<SQLUndoLog>> sqlUndoItemsBuffer = new LinkedHashMap<>();

private final List<Savepoint> savepoints = new ArrayList<>(8);

/**
* whether requires global lock in this connection
Expand All @@ -64,7 +88,7 @@ void setGlobalLockRequire(boolean isGlobalLockRequire) {
* @param lockKey the lock key
*/
void appendLockKey(String lockKey) {
lockKeysBuffer.add(lockKey);
lockKeysBuffer.computeIfAbsent(currentSavepoint, k -> new HashSet<>()).add(lockKey);
}

/**
Expand All @@ -73,7 +97,55 @@ void appendLockKey(String lockKey) {
* @param sqlUndoLog the sql undo log
*/
void appendUndoItem(SQLUndoLog sqlUndoLog) {
sqlUndoItemsBuffer.add(sqlUndoLog);
sqlUndoItemsBuffer.computeIfAbsent(currentSavepoint, k -> new ArrayList<>()).add(sqlUndoLog);
}

/**
* Append savepoint
* @param savepoint the savepoint
*/
void appendSavepoint(Savepoint savepoint) {
savepoints.add(savepoint);
this.currentSavepoint = savepoint;
}

public void removeSavepoint(Savepoint savepoint) {
List<Savepoint> afterSavepoints = getAfterSavepoints(savepoint);

if (null == savepoint) {
sqlUndoItemsBuffer.clear();
lockKeysBuffer.clear();
} else {

for (Savepoint sp : afterSavepoints) {
sqlUndoItemsBuffer.remove(sp);
lockKeysBuffer.remove(sp);
}
}

savepoints.removeAll(afterSavepoints);
currentSavepoint = savepoints.size() == 0 ? DEFAULT_SAVEPOINT : savepoints.get(savepoints.size() - 1);
}

public void releaseSavepoint(Savepoint savepoint) {
List<Savepoint> afterSavepoints = getAfterSavepoints(savepoint);
savepoints.removeAll(afterSavepoints);
currentSavepoint = savepoints.size() == 0 ? DEFAULT_SAVEPOINT : savepoints.get(savepoints.size() - 1);

// move the undo items & lock keys to current savepoint
for (Savepoint sp : afterSavepoints) {
List<SQLUndoLog> savepointSQLUndoLogs = sqlUndoItemsBuffer.remove(sp);
if (CollectionUtils.isNotEmpty(savepointSQLUndoLogs)) {
sqlUndoItemsBuffer.computeIfAbsent(currentSavepoint, k -> new ArrayList<>(savepointSQLUndoLogs.size()))
.addAll(savepointSQLUndoLogs);
}

Set<String> savepointLockKeys = lockKeysBuffer.remove(sp);
if (CollectionUtils.isNotEmpty(savepointLockKeys)) {
lockKeysBuffer.computeIfAbsent(currentSavepoint, k -> new HashSet<>())
.addAll(savepointLockKeys);
}
}
}

/**
Expand Down Expand Up @@ -121,6 +193,15 @@ public boolean hasUndoLog() {
return !sqlUndoItemsBuffer.isEmpty();
}

/**
* Gets lock keys buffer.
*
* @return the lock keys buffer
*/
public boolean hasLockKey() {
return !lockKeysBuffer.isEmpty();
}

/**
* Gets xid.
*
Expand Down Expand Up @@ -174,6 +255,7 @@ void reset(String xid) {
this.xid = xid;
branchId = null;
this.isGlobalLockRequire = false;
savepoints.clear();
lockKeysBuffer.clear();
sqlUndoItemsBuffer.clear();
}
Expand All @@ -187,8 +269,17 @@ public String buildLockKeys() {
if (lockKeysBuffer.isEmpty()) {
return null;
}
Set<String> lockKeysBufferSet = new HashSet<>();
for (Set<String> lockKeys : lockKeysBuffer.values()) {
lockKeysBufferSet.addAll(lockKeys);
}

if (lockKeysBufferSet.isEmpty()) {
return null;
}

StringBuilder appender = new StringBuilder();
Iterator<String> iterable = lockKeysBuffer.iterator();
Iterator<String> iterable = lockKeysBufferSet.iterator();
while (iterable.hasNext()) {
appender.append(iterable.next());
if (iterable.hasNext()) {
Expand All @@ -204,16 +295,25 @@ public String buildLockKeys() {
* @return the undo items
*/
public List<SQLUndoLog> getUndoItems() {
return sqlUndoItemsBuffer;
List<SQLUndoLog> undoItems = new ArrayList<>();
for (List<SQLUndoLog> items : sqlUndoItemsBuffer.values()) {
undoItems.addAll(items);
}
return undoItems;
}


/**
* Gets lock keys buffer.
*
* @return the lock keys buffer
* Get the savepoints after target savepoint(include the param savepoint)
* @param savepoint the target savepoint
* @return after savepoints
*/
public Set<String> getLockKeysBuffer() {
return lockKeysBuffer;
private List<Savepoint> getAfterSavepoints(Savepoint savepoint) {
if (null == savepoint) {
return new ArrayList<>(savepoints);
}

return new ArrayList<>(savepoints.subList(savepoints.indexOf(savepoint), savepoints.size()));
}

@Override
Expand Down
Expand Up @@ -17,6 +17,7 @@

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.util.concurrent.Callable;

import io.seata.common.util.StringUtils;
Expand Down Expand Up @@ -194,6 +195,33 @@ public void commit() throws SQLException {
}
}

@Override
public Savepoint setSavepoint() throws SQLException {
Savepoint savepoint = targetConnection.setSavepoint();
context.appendSavepoint(savepoint);
return savepoint;
}

@Override
public Savepoint setSavepoint(String name) throws SQLException {
Savepoint savepoint = targetConnection.setSavepoint(name);
context.appendSavepoint(savepoint);
return savepoint;
}

@Override
public void rollback(Savepoint savepoint) throws SQLException {
targetConnection.rollback(savepoint);
context.removeSavepoint(savepoint);
}

@Override
public void releaseSavepoint(Savepoint savepoint) throws SQLException {
targetConnection.releaseSavepoint(savepoint);
context.releaseSavepoint(savepoint);
}


private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
Expand Down Expand Up @@ -235,7 +263,7 @@ private void processGlobalTransactionCommit() throws SQLException {
}

private void register() throws TransactionException {
if (!context.hasUndoLog() || context.getLockKeysBuffer().isEmpty()) {
if (!context.hasUndoLog() || !context.hasLockKey()) {
return;
}
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(),
Expand Down
Expand Up @@ -191,8 +191,7 @@ public <T> T execute(Callable<T> callable) throws Exception {
protected void onException(Exception e) throws Exception {
ConnectionContext context = connection.getContext();
//UndoItems can't use the Set collection class to prevent ABA
context.getUndoItems().clear();
context.getLockKeysBuffer().clear();
context.removeSavepoint(null);
connection.getTargetConnection().rollback();
}

Expand Down