From 06c2a50473fd59deefd6cb42d01ffd095eef20a7 Mon Sep 17 00:00:00 2001 From: justabug Date: Mon, 22 Aug 2022 14:06:16 +0800 Subject: [PATCH] optimize : tccfence log table deleted should be optimized (#4490) --- changes/en-us/develop.md | 2 + changes/zh-cn/develop.md | 2 + .../java/io/seata/rm/tcc/TCCFenceHandler.java | 47 +++++++++++++------ .../io/seata/rm/tcc/store/TCCFenceStore.java | 20 +++++++- .../store/db/TCCFenceStoreDataBaseDAO.java | 46 +++++++++++++++++- .../tcc/store/db/sql/TCCFenceStoreSqls.java | 33 ++++++++++++- 6 files changed, 133 insertions(+), 17 deletions(-) diff --git a/changes/en-us/develop.md b/changes/en-us/develop.md index fd86649f618..9b4ba9e7a8f 100644 --- a/changes/en-us/develop.md +++ b/changes/en-us/develop.md @@ -25,6 +25,7 @@ Add changes here for all PR submitted to the develop branch. - [[#4836](https://github.com/seata/seata/pull/4836)] optimize BaseTransactionalExecutor#buildLockKey(TableRecords rowsIncludingPK) method more readable - [[#4865](https://github.com/seata/seata/pull/4865)] fix some security vulnerabilities in GGEditor - [[#4590](https://github.com/seata/seata/pull/4590)] auto degrade enable to dynamic configure +- [[#4490](https://github.com/seata/seata/pull/4490)] tccfence log table delete by index ### test: - [[#4794](https://github.com/seata/seata/pull/4794)] try to fix the test `DataSourceProxyTest.getResourceIdTest()` @@ -42,5 +43,6 @@ Thanks to these contributors for their code commits. Please report an unintended - [pengten](https://github.com/pengten) - [liuqiufeng](https://github.com/liuqiufeng) - [yujianfei1986](https://github.com/yujianfei1986) +- [Bughue](https://github.com/Bughue) Also, we receive many valuable issues, questions and advices from our community. Thanks for you all. diff --git a/changes/zh-cn/develop.md b/changes/zh-cn/develop.md index a446b28f130..e1d50f83c7e 100644 --- a/changes/zh-cn/develop.md +++ b/changes/zh-cn/develop.md @@ -25,6 +25,7 @@ - [[#4836](https://github.com/seata/seata/pull/4836)] 优化 BaseTransactionalExecutor#buildLockKey(TableRecords rowsIncludingPK) 方法可读性 - [[#4865](https://github.com/seata/seata/pull/4865)] 修复 Saga 可视化设计器 GGEditor 安全漏洞 - [[#4590](https://github.com/seata/seata/pull/4590)] 自动降级支持开关支持动态配置 +- [[#4490](https://github.com/seata/seata/pull/4490)] tccfence 记录表优化成按索引删除 ### test: - [[#4794](https://github.com/seata/seata/pull/4794)] 重构代码,尝试修复单元测试 `DataSourceProxyTest.getResourceIdTest()` @@ -42,5 +43,6 @@ - [pengten](https://github.com/pengten) - [liuqiufeng](https://github.com/liuqiufeng) - [yujianfei1986](https://github.com/yujianfei1986) +- [Bughue](https://github.com/Bughue) 同时,我们收到了社区反馈的很多有价值的issue和建议,非常感谢大家。 diff --git a/tcc/src/main/java/io/seata/rm/tcc/TCCFenceHandler.java b/tcc/src/main/java/io/seata/rm/tcc/TCCFenceHandler.java index f5a4dee1021..f0abd3d48cf 100644 --- a/tcc/src/main/java/io/seata/rm/tcc/TCCFenceHandler.java +++ b/tcc/src/main/java/io/seata/rm/tcc/TCCFenceHandler.java @@ -17,7 +17,9 @@ import java.lang.reflect.Method; import java.sql.Connection; +import java.util.ArrayList; import java.util.Date; +import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -62,6 +64,11 @@ private TCCFenceHandler() { private static final int MAX_QUEUE_SIZE = 500; + /** + * limit of delete record by date (per sql) + */ + private static final int LIMIT_DELETE = 1000; + private static final LinkedBlockingQueue LOG_QUEUE = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE); private static FenceLogCleanRunnable fenceLogCleanRunnable; @@ -76,6 +83,10 @@ private TCCFenceHandler() { } } + public static DataSource getDataSource() { + return TCCFenceHandler.dataSource; + } + public static void setDataSource(DataSource dataSource) { TCCFenceHandler.dataSource = dataSource; } @@ -275,22 +286,30 @@ public static boolean deleteFence(String xid, Long branchId) { }); } - /** - * Delete TCC Fence By Datetime - * - * @param datetime datetime - * @return the deleted row count - */ + + public static int deleteFenceByDate(Date datetime) { - return transactionTemplate.execute(status -> { - try { - Connection conn = DataSourceUtils.getConnection(dataSource); - return TCC_FENCE_DAO.deleteTCCFenceDOByDate(conn, datetime); - } catch (RuntimeException e) { - status.setRollbackOnly(); - throw e; + DataSource dataSource = TCCFenceHandler.getDataSource(); + Connection connection = null; + int total = 0; + try { + connection = DataSourceUtils.getConnection(dataSource); + while (true) { + Set xidSet = TCC_FENCE_DAO.queryEndStatusXidsByDate(connection, datetime, LIMIT_DELETE); + if (xidSet.isEmpty()) { + break; + } + total += TCC_FENCE_DAO.deleteTCCFenceDO(connection, new ArrayList<>(xidSet)); } - }); + } catch (RuntimeException e) { + LOGGER.error("delete fence log failed ", e); + } finally { + if (connection != null) { + DataSourceUtils.releaseConnection(connection, dataSource); + } + } + return total; + } private static void initLogCleanExecutor() { diff --git a/tcc/src/main/java/io/seata/rm/tcc/store/TCCFenceStore.java b/tcc/src/main/java/io/seata/rm/tcc/store/TCCFenceStore.java index 9cbd6ba255e..ef527a80833 100644 --- a/tcc/src/main/java/io/seata/rm/tcc/store/TCCFenceStore.java +++ b/tcc/src/main/java/io/seata/rm/tcc/store/TCCFenceStore.java @@ -17,6 +17,8 @@ import java.sql.Connection; import java.util.Date; +import java.util.List; +import java.util.Set; /** * The TCC Fence Store @@ -33,6 +35,14 @@ public interface TCCFenceStore { */ TCCFenceDO queryTCCFenceDO(Connection conn, String xid, Long branchId); + /** + * Query xid. + * @param datetime the datetime + * @param limit the limit size + * @return the tcc fence do + */ + Set queryEndStatusXidsByDate(Connection conn, Date datetime, int limit); + /** * Insert tcc fence do boolean. * @param tccFenceDO the tcc fence do @@ -57,12 +67,20 @@ public interface TCCFenceStore { */ boolean deleteTCCFenceDO(Connection conn, String xid, Long branchId); + /** + * Delete tcc fence do boolean. + * @param xids the global transaction ids + * @return the boolean + */ + int deleteTCCFenceDO(Connection conn, List xids); + /** * Delete tcc fence by datetime. * @param datetime datetime + * @param limit limit * @return the deleted row count */ - int deleteTCCFenceDOByDate(Connection conn, Date datetime); + int deleteTCCFenceDOByDate(Connection conn, Date datetime, int limit); /** * Set LogTable Name diff --git a/tcc/src/main/java/io/seata/rm/tcc/store/db/TCCFenceStoreDataBaseDAO.java b/tcc/src/main/java/io/seata/rm/tcc/store/db/TCCFenceStoreDataBaseDAO.java index 10e6c2979fd..98b96662687 100644 --- a/tcc/src/main/java/io/seata/rm/tcc/store/db/TCCFenceStoreDataBaseDAO.java +++ b/tcc/src/main/java/io/seata/rm/tcc/store/db/TCCFenceStoreDataBaseDAO.java @@ -32,6 +32,9 @@ import java.sql.Timestamp; import java.sql.SQLIntegrityConstraintViolationException; import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; /** * The type TCC Fence store data base dao @@ -86,6 +89,28 @@ public TCCFenceDO queryTCCFenceDO(Connection conn, String xid, Long branchId) { } } + @Override + public Set queryEndStatusXidsByDate(Connection conn, Date datetime, int limit) { + PreparedStatement ps = null; + ResultSet rs = null; + try { + String sql = TCCFenceStoreSqls.getQueryEndStatusSQLByDate(logTableName); + ps = conn.prepareStatement(sql); + ps.setTimestamp(1, new Timestamp(datetime.getTime())); + ps.setInt(2, limit); + rs = ps.executeQuery(); + Set xids = new HashSet<>(limit); + while (rs.next()) { + xids.add(rs.getString("xid")); + } + return xids; + } catch (SQLException e) { + throw new DataAccessException(e); + } finally { + IOUtil.close(rs, ps); + } + } + @Override public boolean insertTCCFenceDO(Connection conn, TCCFenceDO tccFenceDO) { PreparedStatement ps = null; @@ -149,12 +174,31 @@ public boolean deleteTCCFenceDO(Connection conn, String xid, Long branchId) { } @Override - public int deleteTCCFenceDOByDate(Connection conn, Date datetime) { + public int deleteTCCFenceDO(Connection conn, List xids) { + PreparedStatement ps = null; + try { + String paramsPlaceHolder = org.apache.commons.lang.StringUtils.repeat("?", ",", xids.size()); + String sql = TCCFenceStoreSqls.getDeleteSQLByXids(logTableName, paramsPlaceHolder); + ps = conn.prepareStatement(sql); + for (int i = 0; i < xids.size(); i++) { + ps.setString(i + 1, xids.get(i)); + } + return ps.executeUpdate(); + } catch (SQLException e) { + throw new StoreException(e); + } finally { + IOUtil.close(ps); + } + } + + @Override + public int deleteTCCFenceDOByDate(Connection conn, Date datetime, int limit) { PreparedStatement ps = null; try { String sql = TCCFenceStoreSqls.getDeleteSQLByDateAndStatus(logTableName); ps = conn.prepareStatement(sql); ps.setTimestamp(1, new Timestamp(datetime.getTime())); + ps.setInt(2, limit); return ps.executeUpdate(); } catch (SQLException e) { throw new StoreException(e); diff --git a/tcc/src/main/java/io/seata/rm/tcc/store/db/sql/TCCFenceStoreSqls.java b/tcc/src/main/java/io/seata/rm/tcc/store/db/sql/TCCFenceStoreSqls.java index 130cb132c46..f1b43b2540e 100644 --- a/tcc/src/main/java/io/seata/rm/tcc/store/db/sql/TCCFenceStoreSqls.java +++ b/tcc/src/main/java/io/seata/rm/tcc/store/db/sql/TCCFenceStoreSqls.java @@ -33,6 +33,12 @@ private TCCFenceStoreSqls() { */ public static final String LOCAL_TCC_LOG_PLACEHOLD = " #local_tcc_log# "; + /** + * The constant PRAMETER_PLACEHOLD. + * format: ?, ?, ? + */ + public static final String PRAMETER_PLACEHOLD = " #PRAMETER_PLACEHOLD# "; + /** * The constant INSERT_LOCAL_TCC_LOG. */ @@ -47,6 +53,15 @@ private TCCFenceStoreSqls() { + "from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid = ? and branch_id = ? for update"; + /** + * The constant QUERY_END_STATUS_BY_DATE. + */ + protected static final String QUERY_END_STATUS_BY_DATE = "select xid, branch_id, status, gmt_create, gmt_modified " + + "from " + LOCAL_TCC_LOG_PLACEHOLD + + " where gmt_modified < ? " + + " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")" + + " limit ?"; + /** * The constant UPDATE_STATUS_BY_BRANCH_ID_AND_XID. */ @@ -58,12 +73,19 @@ private TCCFenceStoreSqls() { */ protected static final String DELETE_BY_BRANCH_ID_AND_XID = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid = ? and branch_id = ? "; + /** + * The constant DELETE_BY_BRANCH_ID_AND_XID. + */ + protected static final String DELETE_BY_BRANCH_XIDS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where xid in (" + PRAMETER_PLACEHOLD + ")"; + + /** * The constant DELETE_BY_DATE_AND_STATUS. */ protected static final String DELETE_BY_DATE_AND_STATUS = "delete from " + LOCAL_TCC_LOG_PLACEHOLD + " where gmt_modified < ? " - + " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")"; + + " and status in (" + TCCFenceConstant.STATUS_COMMITTED + " , " + TCCFenceConstant.STATUS_ROLLBACKED + " , " + TCCFenceConstant.STATUS_SUSPENDED + ")" + + " limit ?"; public static String getInsertLocalTCCLogSQL(String localTccTable) { return INSERT_LOCAL_TCC_LOG.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable); @@ -73,6 +95,10 @@ public static String getQuerySQLByBranchIdAndXid(String localTccTable) { return QUERY_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable); } + public static String getQueryEndStatusSQLByDate(String localTccTable) { + return QUERY_END_STATUS_BY_DATE.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable); + } + public static String getUpdateStatusSQLByBranchIdAndXid(String localTccTable) { return UPDATE_STATUS_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable); } @@ -81,6 +107,11 @@ public static String getDeleteSQLByBranchIdAndXid(String localTccTable) { return DELETE_BY_BRANCH_ID_AND_XID.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable); } + public static String getDeleteSQLByXids(String localTccTable, String paramsPlaceHolder) { + return DELETE_BY_BRANCH_XIDS.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable) + .replace(PRAMETER_PLACEHOLD, paramsPlaceHolder); + } + public static String getDeleteSQLByDateAndStatus(String localTccTable) { return DELETE_BY_DATE_AND_STATUS.replace(LOCAL_TCC_LOG_PLACEHOLD, localTccTable); }