Skip to content

Commit

Permalink
optimize : tccfence log table deleted should be optimized (#4490)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bughue committed Aug 22, 2022
1 parent e3a4640 commit 06c2a50
Show file tree
Hide file tree
Showing 6 changed files with 133 additions and 17 deletions.
2 changes: 2 additions & 0 deletions changes/en-us/develop.md
Expand Up @@ -25,6 +25,7 @@ Add changes here for all PR submitted to the develop branch.
- [[#4836](https://github.com/seata/seata/pull/4836)] optimize BaseTransactionalExecutor#buildLockKey(TableRecords rowsIncludingPK) method more readable
- [[#4865](https://github.com/seata/seata/pull/4865)] fix some security vulnerabilities in GGEditor
- [[#4590](https://github.com/seata/seata/pull/4590)] auto degrade enable to dynamic configure
- [[#4490](https://github.com/seata/seata/pull/4490)] tccfence log table delete by index

### test:
- [[#4794](https://github.com/seata/seata/pull/4794)] try to fix the test `DataSourceProxyTest.getResourceIdTest()`
Expand All @@ -42,5 +43,6 @@ Thanks to these contributors for their code commits. Please report an unintended
- [pengten](https://github.com/pengten)
- [liuqiufeng](https://github.com/liuqiufeng)
- [yujianfei1986](https://github.com/yujianfei1986)
- [Bughue](https://github.com/Bughue)

Also, we receive many valuable issues, questions and advices from our community. Thanks for you all.
2 changes: 2 additions & 0 deletions changes/zh-cn/develop.md
Expand Up @@ -25,6 +25,7 @@
- [[#4836](https://github.com/seata/seata/pull/4836)] 优化 BaseTransactionalExecutor#buildLockKey(TableRecords rowsIncludingPK) 方法可读性
- [[#4865](https://github.com/seata/seata/pull/4865)] 修复 Saga 可视化设计器 GGEditor 安全漏洞
- [[#4590](https://github.com/seata/seata/pull/4590)] 自动降级支持开关支持动态配置
- [[#4490](https://github.com/seata/seata/pull/4490)] tccfence 记录表优化成按索引删除

### test:
- [[#4794](https://github.com/seata/seata/pull/4794)] 重构代码,尝试修复单元测试 `DataSourceProxyTest.getResourceIdTest()`
Expand All @@ -42,5 +43,6 @@
- [pengten](https://github.com/pengten)
- [liuqiufeng](https://github.com/liuqiufeng)
- [yujianfei1986](https://github.com/yujianfei1986)
- [Bughue](https://github.com/Bughue)

同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。
47 changes: 33 additions & 14 deletions tcc/src/main/java/io/seata/rm/tcc/TCCFenceHandler.java
Expand Up @@ -17,7 +17,9 @@

import java.lang.reflect.Method;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.Date;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -62,6 +64,11 @@ private TCCFenceHandler() {

private static final int MAX_QUEUE_SIZE = 500;

/**
* limit of delete record by date (per sql)
*/
private static final int LIMIT_DELETE = 1000;

private static final LinkedBlockingQueue<FenceLogIdentity> LOG_QUEUE = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE);

private static FenceLogCleanRunnable fenceLogCleanRunnable;
Expand All @@ -76,6 +83,10 @@ private TCCFenceHandler() {
}
}

public static DataSource getDataSource() {
return TCCFenceHandler.dataSource;
}

public static void setDataSource(DataSource dataSource) {
TCCFenceHandler.dataSource = dataSource;
}
Expand Down Expand Up @@ -275,22 +286,30 @@ public static boolean deleteFence(String xid, Long branchId) {
});
}

/**
* Delete TCC Fence By Datetime
*
* @param datetime datetime
* @return the deleted row count
*/


public static int deleteFenceByDate(Date datetime) {
return transactionTemplate.execute(status -> {
try {
Connection conn = DataSourceUtils.getConnection(dataSource);
return TCC_FENCE_DAO.deleteTCCFenceDOByDate(conn, datetime);
} catch (RuntimeException e) {
status.setRollbackOnly();
throw e;
DataSource dataSource = TCCFenceHandler.getDataSource();
Connection connection = null;
int total = 0;
try {
connection = DataSourceUtils.getConnection(dataSource);
while (true) {
Set<String> xidSet = TCC_FENCE_DAO.queryEndStatusXidsByDate(connection, datetime, LIMIT_DELETE);
if (xidSet.isEmpty()) {
break;
}
total += TCC_FENCE_DAO.deleteTCCFenceDO(connection, new ArrayList<>(xidSet));
}
});
} catch (RuntimeException e) {
LOGGER.error("delete fence log failed ", e);
} finally {
if (connection != null) {
DataSourceUtils.releaseConnection(connection, dataSource);
}
}
return total;

}

private static void initLogCleanExecutor() {
Expand Down
20 changes: 19 additions & 1 deletion tcc/src/main/java/io/seata/rm/tcc/store/TCCFenceStore.java
Expand Up @@ -17,6 +17,8 @@

import java.sql.Connection;
import java.util.Date;
import java.util.List;
import java.util.Set;

/**
* The TCC Fence Store
Expand All @@ -33,6 +35,14 @@ public interface TCCFenceStore {
*/
TCCFenceDO queryTCCFenceDO(Connection conn, String xid, Long branchId);

/**
* Query xid.
* @param datetime the datetime
* @param limit the limit size
* @return the tcc fence do
*/
Set<String> queryEndStatusXidsByDate(Connection conn, Date datetime, int limit);

/**
* Insert tcc fence do boolean.
* @param tccFenceDO the tcc fence do
Expand All @@ -57,12 +67,20 @@ public interface TCCFenceStore {
*/
boolean deleteTCCFenceDO(Connection conn, String xid, Long branchId);

/**
* Delete tcc fence do boolean.
* @param xids the global transaction ids
* @return the boolean
*/
int deleteTCCFenceDO(Connection conn, List<String> xids);

/**
* Delete tcc fence by datetime.
* @param datetime datetime
* @param limit limit
* @return the deleted row count
*/
int deleteTCCFenceDOByDate(Connection conn, Date datetime);
int deleteTCCFenceDOByDate(Connection conn, Date datetime, int limit);

/**
* Set LogTable Name
Expand Down
Expand Up @@ -32,6 +32,9 @@
import java.sql.Timestamp;
import java.sql.SQLIntegrityConstraintViolationException;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* The type TCC Fence store data base dao
Expand Down Expand Up @@ -86,6 +89,28 @@ public TCCFenceDO queryTCCFenceDO(Connection conn, String xid, Long branchId) {
}
}

@Override
public Set<String> queryEndStatusXidsByDate(Connection conn, Date datetime, int limit) {
PreparedStatement ps = null;
ResultSet rs = null;
try {
String sql = TCCFenceStoreSqls.getQueryEndStatusSQLByDate(logTableName);
ps = conn.prepareStatement(sql);
ps.setTimestamp(1, new Timestamp(datetime.getTime()));
ps.setInt(2, limit);
rs = ps.executeQuery();
Set<String> xids = new HashSet<>(limit);
while (rs.next()) {
xids.add(rs.getString("xid"));
}
return xids;
} catch (SQLException e) {
throw new DataAccessException(e);
} finally {
IOUtil.close(rs, ps);
}
}

@Override
public boolean insertTCCFenceDO(Connection conn, TCCFenceDO tccFenceDO) {
PreparedStatement ps = null;
Expand Down Expand Up @@ -149,12 +174,31 @@ public boolean deleteTCCFenceDO(Connection conn, String xid, Long branchId) {
}

@Override
public int deleteTCCFenceDOByDate(Connection conn, Date datetime) {
public int deleteTCCFenceDO(Connection conn, List<String> xids) {
PreparedStatement ps = null;
try {
String paramsPlaceHolder = org.apache.commons.lang.StringUtils.repeat("?", ",", xids.size());
String sql = TCCFenceStoreSqls.getDeleteSQLByXids(logTableName, paramsPlaceHolder);
ps = conn.prepareStatement(sql);
for (int i = 0; i < xids.size(); i++) {
ps.setString(i + 1, xids.get(i));
}
return ps.executeUpdate();
} catch (SQLException e) {
throw new StoreException(e);
} finally {
IOUtil.close(ps);
}
}

@Override
public int deleteTCCFenceDOByDate(Connection conn, Date datetime, int limit) {
PreparedStatement ps = null;
try {
String sql = TCCFenceStoreSqls.getDeleteSQLByDateAndStatus(logTableName);
ps = conn.prepareStatement(sql);
ps.setTimestamp(1, new Timestamp(datetime.getTime()));
ps.setInt(2, limit);
return ps.executeUpdate();
} catch (SQLException e) {
throw new StoreException(e);
Expand Down
Expand Up @@ -33,6 +33,12 @@ private TCCFenceStoreSqls() {
*/
public static final String LOCAL_TCC_LOG_PLACEHOLD = " #local_tcc_log# ";

/**
* The constant PRAMETER_PLACEHOLD.
* format: ?, ?, ?
*/
public static final String PRAMETER_PLACEHOLD = " #PRAMETER_PLACEHOLD# ";

/**
* The constant INSERT_LOCAL_TCC_LOG.
*/
Expand All @@ -47,6 +53,15 @@ private TCCFenceStoreSqls() {
+ "from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where xid = ? and branch_id = ? for update";

/**
* The constant QUERY_END_STATUS_BY_DATE.
*/
protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified "
+ "from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where gmt_modified < ? "
+ " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")"
+ " limit ?";

/**
* The constant UPDATE_STATUS_BY_BRANCH_ID_AND_XID.
*/
Expand All @@ -58,12 +73,19 @@ private TCCFenceStoreSqls() {
*/
protected static final String DELETE_BY_BRANCH_ID_AND_XID = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid = ? and branch_id = ? ";

/**
* The constant DELETE_BY_BRANCH_ID_AND_XID.
*/
protected static final String DELETE_BY_BRANCH_XIDS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid in (" + PRAMETER_PLACEHOLD + ")";


/**
* The constant DELETE_BY_DATE_AND_STATUS.
*/
protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD
+ " where gmt_modified < ? "
+ " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")";
+ " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")"
+ " limit ?";

public static String getInsertLocalTCCLogSQL(String localTccTable) {
return INSERT_LOCAL_TCC_LOG.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
Expand All @@ -73,6 +95,10 @@ public static String getQuerySQLByBranchIdAndXid(String localTccTable) {
return QUERY_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}

public static String getQueryEndStatusSQLByDate(String localTccTable) {
return QUERY_END_STATUS_BY_DATE.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}

public static String getUpdateStatusSQLByBranchIdAndXid(String localTccTable) {
return UPDATE_STATUS_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}
Expand All @@ -81,6 +107,11 @@ public static String getDeleteSQLByBranchIdAndXid(String localTccTable) {
return DELETE_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}

public static String getDeleteSQLByXids(String localTccTable, String paramsPlaceHolder) {
return DELETE_BY_BRANCH_XIDS.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable)
.replace(PRAMETER_PLACEHOLD, paramsPlaceHolder);
}

public static String getDeleteSQLByDateAndStatus(String localTccTable) {
return DELETE_BY_DATE_AND_STATUS.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable);
}
Expand Down

0 comments on commit 06c2a50

Please sign in to comment.