From f8ce2e59d0a811d949b3141ce5fc5a97ba157f0a Mon Sep 17 00:00:00 2001 From: lorne <1991wangliang@gmail.com> Date: Tue, 11 Nov 2025 23:36:01 +0800 Subject: [PATCH] add Listener order --- README.md | 11 +++++- pom.xml | 2 +- .../interceptor/SQLRunningContext.java | 7 ++-- .../listener/SQLDeleteExecuteListener.java | 5 +++ .../dbstream/listener/SQLExecuteListener.java | 6 +++ .../listener/SQLInsertExecuteListener.java | 5 +++ .../listener/SQLUpdateExecuteListener.java | 5 +++ .../codingapi/dbstream/stream/DBEvent.java | 9 ++++- .../dbstream/stream/DefaultDBEventPusher.java | 5 ++- .../dbstream/listener/MySQLListener.java | 5 +++ .../dbstream/repository/User1Repository.java | 6 ++- .../dbstream/tests/User1RepositoryTest.java | 38 ++++++++++++++++++- 12 files changed, 94 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index c127609..617f879 100644 --- a/README.md +++ b/README.md @@ -106,6 +106,12 @@ import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.listener.SQLExecuteListener; public class MySQLListener implements SQLExecuteListener { + + @Override + public int order() { + return 0; + } + @Override public void before(SQLExecuteState executeState) { System.out.println("执行前 - SQL: " + executeState.getSql()); @@ -258,8 +264,9 @@ mvn clean test -P travis - 手动事务模式下,事件会在 `commit()` 时批量推送 - 事务回滚时,相关事件会被丢弃 -3. **数据表限制**: - - 执行数据拦截事件的分析,要求表必须存在主键的定义 +3. **使用场景限制**: + - 数据库表必须有主键的定义,在DELETE事件需要明确主键信息,主键物理表不存在时可通过外部key文件配置的方式添加。 + - 若INSERT INTO SELECT 语句中,采用主键自增模式,受限于JDBC的支持将无法解析到自增ID,建议修改单条保存或修改ID为手动传递。 4. **元数据缓存**: - 数据库元数据会在首次连接时自动扫描并缓存 diff --git a/pom.xml b/pom.xml index 9348ef9..9cd12be 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.codingapi.dbstream dbstream-driver - 1.0.6 + 1.0.7 https://github.com/codingapi/dbstream-driver dbstream-driver diff --git a/src/main/java/com/codingapi/dbstream/interceptor/SQLRunningContext.java b/src/main/java/com/codingapi/dbstream/interceptor/SQLRunningContext.java index 097c18d..f9fc8b3 100644 --- a/src/main/java/com/codingapi/dbstream/interceptor/SQLRunningContext.java +++ b/src/main/java/com/codingapi/dbstream/interceptor/SQLRunningContext.java @@ -4,10 +4,11 @@ import lombok.Getter; import java.sql.SQLException; +import java.util.Comparator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; -public class SQLRunningContext implements SQLExecuteListener { +public class SQLRunningContext { @Getter private final static SQLRunningContext instance = new SQLRunningContext(); @@ -23,10 +24,11 @@ public void addListener(SQLExecuteListener listener) { if (listener != null) { listeners.add(listener); } + + listeners.sort(Comparator.comparingInt(SQLExecuteListener::order)); } - @Override public void after(SQLExecuteState executeState, Object result) throws SQLException { executeState.setResult(result); executeState.after(); @@ -36,7 +38,6 @@ public void after(SQLExecuteState executeState, Object result) throws SQLExcepti } - @Override public void before(SQLExecuteState executeState) throws SQLException { executeState.begin(); for (SQLExecuteListener listener : listeners) { diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java index 32074b5..c32ba81 100644 --- a/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java +++ b/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java @@ -16,6 +16,11 @@ public class SQLDeleteExecuteListener implements SQLExecuteListener { private final static ThreadLocal threadLocal = new ThreadLocal<>(); + @Override + public int order() { + return 100; + } + @Override public void before(SQLExecuteState executeState) throws SQLException { String sql = executeState.getSql(); diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLExecuteListener.java index 8347eab..b0ca376 100644 --- a/src/main/java/com/codingapi/dbstream/listener/SQLExecuteListener.java +++ b/src/main/java/com/codingapi/dbstream/listener/SQLExecuteListener.java @@ -6,6 +6,12 @@ public interface SQLExecuteListener { + /** + * 执行顺序,越小越靠前 + * @return index + */ + int order(); + /** * before sql execute * diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java index abb2ab6..53c7d65 100644 --- a/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java +++ b/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java @@ -16,6 +16,11 @@ public class SQLInsertExecuteListener implements SQLExecuteListener { private final static ThreadLocal threadLocal = new ThreadLocal<>(); + @Override + public int order() { + return 100; + } + @Override public void before(SQLExecuteState executeState) throws SQLException { String sql = executeState.getSql(); diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java index b29fd66..2a05b10 100644 --- a/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java +++ b/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java @@ -16,6 +16,11 @@ public class SQLUpdateExecuteListener implements SQLExecuteListener { private final static ThreadLocal threadLocal = new ThreadLocal<>(); + @Override + public int order() { + return 100; + } + @Override public void before(SQLExecuteState executeState) throws SQLException { String sql = executeState.getSql(); diff --git a/src/main/java/com/codingapi/dbstream/stream/DBEvent.java b/src/main/java/com/codingapi/dbstream/stream/DBEvent.java index 6eb52c7..139bbb5 100644 --- a/src/main/java/com/codingapi/dbstream/stream/DBEvent.java +++ b/src/main/java/com/codingapi/dbstream/stream/DBEvent.java @@ -81,7 +81,14 @@ void setTransactionKey(String transactionKey) { } public void addPrimaryKey(String primaryKey) { - this.primaryKeys.add(primaryKey); + if(!this.primaryKeys.contains(primaryKey)) { + this.primaryKeys.add(primaryKey); + } + } + + + public boolean hasPrimaryKeys(){ + return this.primaryKeys!=null && !this.primaryKeys.isEmpty(); } } diff --git a/src/main/java/com/codingapi/dbstream/stream/DefaultDBEventPusher.java b/src/main/java/com/codingapi/dbstream/stream/DefaultDBEventPusher.java index b79ea5e..d4483f0 100644 --- a/src/main/java/com/codingapi/dbstream/stream/DefaultDBEventPusher.java +++ b/src/main/java/com/codingapi/dbstream/stream/DefaultDBEventPusher.java @@ -6,6 +6,9 @@ public class DefaultDBEventPusher implements DBEventPusher { @Override public void push(List events) { - System.out.println(events); + System.out.println("<=== DBStream DBEvent Total "+events.size()+" ===> "); + for(DBEvent event:events){ + System.out.println(event); + } } } diff --git a/src/test/java/com/example/dbstream/listener/MySQLListener.java b/src/test/java/com/example/dbstream/listener/MySQLListener.java index 7a366a9..9738536 100644 --- a/src/test/java/com/example/dbstream/listener/MySQLListener.java +++ b/src/test/java/com/example/dbstream/listener/MySQLListener.java @@ -9,6 +9,11 @@ @Slf4j public class MySQLListener implements SQLExecuteListener { + @Override + public int order() { + return 0; + } + @Override public void after(SQLExecuteState executeState, Object result) throws SQLException { log.info("after sql:{},params:{},execute timestamp:{}", executeState.getSql(), executeState.getListParams(), executeState.getExecuteTimestamp()); diff --git a/src/test/java/com/example/dbstream/repository/User1Repository.java b/src/test/java/com/example/dbstream/repository/User1Repository.java index bcc782e..09bc07c 100644 --- a/src/test/java/com/example/dbstream/repository/User1Repository.java +++ b/src/test/java/com/example/dbstream/repository/User1Repository.java @@ -24,8 +24,12 @@ public interface User1Repository extends JpaRepository { @Query("select count(u) from User1 as u") int counts(); + @Modifying + @Query("delete from User1 where id > ?1") + int clearData(long id); + @Modifying @Query("insert into User1(email,username,password,nickname) select u.email,u.username,u.password,u.nickname from User1 u") - int insertIntoFromSelect(); + void insertIntoFromSelect(); } diff --git a/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java b/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java index 6091cf9..e85ea3b 100644 --- a/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java +++ b/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java @@ -14,9 +14,10 @@ import org.springframework.test.annotation.Rollback; import javax.transaction.Transactional; +import java.util.ArrayList; import java.util.List; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertThrows; @SpringBootTest @@ -25,6 +26,7 @@ class User1RepositoryTest { @Autowired private User1Repository userRepository; + @BeforeEach void setUp() { DBStreamContext.getInstance().addEventPusher(new DBEventPusher() { @@ -164,4 +166,38 @@ void test7() { }); } + /** + * 异常回滚测试 + */ + @Test + @Transactional + @Rollback(false) + void test8() { + + DBStreamContext.getInstance().addListener(new MySQLListener()); + + List list = new ArrayList<>(); + + + for (int i=0;i<10;i++){ + User1 user1 = new User1(); + user1.setUsername("admin1"); + user1.setPassword("admin1"); + user1.setEmail("admin1@example.com"); + user1.setNickname("admin1"); + list.add(user1); + + User1 user2 = new User1(); + user2.setUsername("admin2"); + user2.setPassword("admin2"); + user2.setEmail("admin2@example.com"); + user2.setNickname("admin2"); + list.add(user2); + } + + userRepository.saveAll(list); + + + } + }