From edfa8e5a4458a0d4f3dc11f0f6313d3b211f413b Mon Sep 17 00:00:00 2001 From: lorne <1991wangliang@gmail.com> Date: Wed, 12 Nov 2025 12:35:02 +0800 Subject: [PATCH 1/3] batch test --- pom.xml | 14 ++++-- scripts/.gitignore | 1 + scripts/db.yaml | 23 ++++++++++ .../dbstream/interceptor/SQLExecuteState.java | 8 ++-- .../proxy/PreparedStatementProxy.java | 3 ++ .../com/example/dbstream/entity/User1.java | 7 +-- .../com/example/dbstream/entity/User2.java | 2 +- .../dbstream/repository/User2Repository.java | 2 +- .../dbstream/tests/User1RepositoryTest.java | 46 +++++++------------ .../dbstream/tests/User2RepositoryTest.java | 2 +- src/test/resources/application.properties | 18 +++++++- 11 files changed, 81 insertions(+), 45 deletions(-) create mode 100644 scripts/.gitignore create mode 100644 scripts/db.yaml diff --git a/pom.xml b/pom.xml index d562688..2e5d9a2 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.codingapi.dbstream dbstream-driver - 1.0.8 + 1.0.9 https://github.com/codingapi/dbstream-driver dbstream-driver @@ -34,8 +34,7 @@ 1.18.42 - 2.7.18 - 2.2.222 + 3.5.7 @@ -55,7 +54,14 @@ com.h2database h2 - ${h2.version} + 2.2.222 + test + + + + org.postgresql + postgresql + 42.7.3 test diff --git a/scripts/.gitignore b/scripts/.gitignore new file mode 100644 index 0000000..6320cd2 --- /dev/null +++ b/scripts/.gitignore @@ -0,0 +1 @@ +data \ No newline at end of file diff --git a/scripts/db.yaml b/scripts/db.yaml new file mode 100644 index 0000000..6bd4a47 --- /dev/null +++ b/scripts/db.yaml @@ -0,0 +1,23 @@ +version: "3.9" + +services: + postgres: + image: postgres:15 + container_name: postgres-db + restart: always + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: 123456 + POSTGRES_DB: demo + TZ: Asia/Shanghai + ports: + - "5432:5432" + volumes: + # 数据持久化路径 + - ./data:/var/lib/postgresql/data + networks: + - pg-network + +networks: + pg-network: + driver: bridge \ No newline at end of file diff --git a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java index 5984a88..67f0355 100644 --- a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java +++ b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java @@ -216,10 +216,12 @@ public List> getStatementGenerateKeys(DbTable dbTable) { Map map = new HashMap<>(); ResultSetMetaData resultSetMetaData = rs.getMetaData(); int columnCount = resultSetMetaData.getColumnCount(); - List primaryKeyColumns = dbTable.getPrimaryColumns(); for (int i = 1; i <= columnCount; i++) { - DbColumn dbColumn = primaryKeyColumns.get(i - 1); - map.put(dbColumn.getName(), rs.getObject(i)); + String columName = resultSetMetaData.getColumnName(i); + DbColumn dbColumn = dbTable.getColumnByName(columName); + if(dbColumn!=null) { + map.put(dbColumn.getName(), rs.getObject(i)); + } } list.add(map); } diff --git a/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java b/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java index 8c132b1..82104ec 100644 --- a/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java +++ b/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java @@ -163,6 +163,7 @@ public void setObject(int parameterIndex, Object x) throws SQLException { @Override public boolean execute() throws SQLException { + System.out.println("execute:"+this.executeState.getSql()); SQLRunningContext.getInstance().before(this.executeState); boolean result = preparedStatement.execute(); SQLRunningContext.getInstance().after(this.executeState, result); @@ -171,6 +172,7 @@ public boolean execute() throws SQLException { @Override public void addBatch() throws SQLException { + System.out.println("addBatch:"+this.executeState.getSql()); preparedStatement.addBatch(); } @@ -390,6 +392,7 @@ public ResultSet executeQuery(String sql) throws SQLException { @Override public int executeUpdate(String sql) throws SQLException { this.executeState.setSql(sql); + System.out.println("executeUpdate:"+sql); SQLRunningContext.getInstance().before(this.executeState); int result = preparedStatement.executeUpdate(sql); SQLRunningContext.getInstance().after(this.executeState, result); diff --git a/src/test/java/com/example/dbstream/entity/User1.java b/src/test/java/com/example/dbstream/entity/User1.java index 5801620..3cf9956 100644 --- a/src/test/java/com/example/dbstream/entity/User1.java +++ b/src/test/java/com/example/dbstream/entity/User1.java @@ -1,8 +1,8 @@ package com.example.dbstream.entity; +import jakarta.persistence.*; import lombok.Data; -import javax.persistence.*; @Data @Entity @@ -10,8 +10,9 @@ public class User1 { @Id - @GeneratedValue(strategy = GenerationType.IDENTITY) - private long id; + @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "user_seq") + @SequenceGenerator(name = "user_seq", sequenceName = "user_seq", allocationSize = 50) + private Long id; private String username; diff --git a/src/test/java/com/example/dbstream/entity/User2.java b/src/test/java/com/example/dbstream/entity/User2.java index 55c50d1..dd63f9c 100644 --- a/src/test/java/com/example/dbstream/entity/User2.java +++ b/src/test/java/com/example/dbstream/entity/User2.java @@ -2,7 +2,7 @@ import lombok.Data; -import javax.persistence.*; +import jakarta.persistence.*; @Data @Entity diff --git a/src/test/java/com/example/dbstream/repository/User2Repository.java b/src/test/java/com/example/dbstream/repository/User2Repository.java index c126588..cbdd231 100644 --- a/src/test/java/com/example/dbstream/repository/User2Repository.java +++ b/src/test/java/com/example/dbstream/repository/User2Repository.java @@ -25,7 +25,7 @@ public interface User2Repository extends JpaRepository { int counts(); @Modifying - @Query("update User2 set password = now() where username = ?1") + @Query("update User2 set password = '123' where username = ?1") int resetPasswordByUsername1(String username); @Modifying diff --git a/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java b/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java index e85ea3b..748e90f 100644 --- a/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java +++ b/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java @@ -2,21 +2,17 @@ import com.codingapi.dbstream.DBStreamContext; -import com.codingapi.dbstream.stream.DBEvent; -import com.codingapi.dbstream.stream.DBEventPusher; import com.example.dbstream.entity.User1; import com.example.dbstream.listener.MySQLListener; import com.example.dbstream.repository.User1Repository; +import jakarta.persistence.EntityManager; +import jakarta.transaction.Transactional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; -import javax.transaction.Transactional; -import java.util.ArrayList; -import java.util.List; - import static org.junit.jupiter.api.Assertions.assertThrows; @@ -26,15 +22,18 @@ class User1RepositoryTest { @Autowired private User1Repository userRepository; + @Autowired + private EntityManager entityManager; + @BeforeEach void setUp() { - DBStreamContext.getInstance().addEventPusher(new DBEventPusher() { - @Override - public void push(List events) { - System.out.println(events); - } - }); +// DBStreamContext.getInstance().addEventPusher(new DBEventPusher() { +// @Override +// public void push(List events) { +// System.out.println(events); +// } +// }); } /** @@ -171,32 +170,19 @@ void test7() { */ @Test @Transactional - @Rollback(false) + @Rollback(value = false) void test8() { - - DBStreamContext.getInstance().addListener(new MySQLListener()); - - List list = new ArrayList<>(); - - + entityManager.createQuery("DELETE FROM User1").executeUpdate(); for (int i=0;i<10;i++){ User1 user1 = new User1(); user1.setUsername("admin1"); user1.setPassword("admin1"); user1.setEmail("admin1@example.com"); user1.setNickname("admin1"); - list.add(user1); - - User1 user2 = new User1(); - user2.setUsername("admin2"); - user2.setPassword("admin2"); - user2.setEmail("admin2@example.com"); - user2.setNickname("admin2"); - list.add(user2); + entityManager.persist(user1); } - - userRepository.saveAll(list); - + entityManager.flush(); + entityManager.clear(); } diff --git a/src/test/java/com/example/dbstream/tests/User2RepositoryTest.java b/src/test/java/com/example/dbstream/tests/User2RepositoryTest.java index 4251ec3..6c45119 100644 --- a/src/test/java/com/example/dbstream/tests/User2RepositoryTest.java +++ b/src/test/java/com/example/dbstream/tests/User2RepositoryTest.java @@ -5,12 +5,12 @@ import com.example.dbstream.entity.User2; import com.example.dbstream.listener.MySQLListener; import com.example.dbstream.repository.User2Repository; +import jakarta.transaction.Transactional; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; -import javax.transaction.Transactional; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index 6b969cb..8b2e11e 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -1,9 +1,23 @@ spring.application.name=example spring.datasource.driver-class-name=com.codingapi.dbstream.driver.DBStreamProxyDriver -spring.datasource.url=jdbc:h2:file:./test.db +#spring.datasource.url=jdbc:h2:file:./test.db + +spring.datasource.url=jdbc:postgresql://localhost:5432/demo +spring.datasource.username=postgres +spring.datasource.password=123456 spring.jpa.show-sql=true spring.jpa.hibernate.ddl-auto=update -spring.jpa.database-platform=org.hibernate.dialect.H2Dialect +#spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQLDialect +#spring.jpa.database-platform=org.hibernate.dialect.H2Dialect + +spring.jpa.properties.hibernate.jdbc.batch_size=1000 +spring.jpa.properties.hibernate.order_inserts=true +spring.jpa.properties.hibernate.order_updates=true +spring.jpa.properties.hibernate.batch_versioned_data=true +spring.jpa.properties.hibernate.flush.mode=COMMIT +logging.level.org.hibernate.engine.jdbc.batch.internal.BatchingBatch=DEBUG +logging.level.org.hibernate.engine.jdbc.spi.SqlStatementLogger=DEBUG + # # From 6c76d04af76e3500ab546d59424d49aa4ea484fa Mon Sep 17 00:00:00 2001 From: lorne <1991wangliang@gmail.com> Date: Wed, 12 Nov 2025 14:15:20 +0800 Subject: [PATCH 2/3] add #2 --- .../dbstream/interceptor/SQLExecuteParam.java | 82 ++++++++++ .../dbstream/interceptor/SQLExecuteState.java | 151 +++++++++++++++--- .../dbstream/parser/DeleteDBEventParser.java | 4 + .../dbstream/parser/InsertDBEventParser.java | 4 + .../dbstream/parser/UpdateDBEventParser.java | 4 + .../proxy/CallableStatementProxy.java | 4 + .../proxy/PreparedStatementProxy.java | 7 +- .../dbstream/tests/User1RepositoryTest.java | 12 +- src/test/resources/application.properties | 1 + 9 files changed, 240 insertions(+), 29 deletions(-) create mode 100644 src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteParam.java diff --git a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteParam.java b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteParam.java new file mode 100644 index 0000000..6f73a6d --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteParam.java @@ -0,0 +1,82 @@ +package com.codingapi.dbstream.interceptor; + +import lombok.Getter; +import lombok.Setter; + +import java.util.*; + +/** + * SQL执行参数信息 + */ +public class SQLExecuteParam { + + + /** + * SQL参数,integer index模式 + */ + @Getter + private final Map indexParams; + /** + * SQL参数,string key 模型 + */ + @Getter + private final Map mapParams; + + /** + * 执行的sql + */ + @Getter + @Setter + private String sql; + + public SQLExecuteParam() { + this.indexParams = new HashMap<>(); + this.mapParams = new HashMap<>(); + } + + + /** + * 更新sql参数 + * + * @param key 参数key + * @param value 参数值 + */ + public void setParam(String key, Object value) { + mapParams.put(key, value); + } + + /** + * 更新sql参数 + * + * @param index 参数索引 + * @param value 参数值 + */ + public void setParam(int index, Object value) { + indexParams.put(index, value); + } + + /** + * 清理参数 + */ + public void cleanParams(){ + this.indexParams.clear(); + this.mapParams.clear(); + } + + /** + * 获取参数列表 + * @return List + */ + public List getListParams(){ + List list = new ArrayList<>(); + if (indexParams.isEmpty()) { + return list; + } + List keys = new ArrayList<>(indexParams.keySet()); + Collections.sort(keys); + for(Integer key: keys){ + list.add(indexParams.get(key)); + } + return list; + } +} diff --git a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java index 67f0355..503b124 100644 --- a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java +++ b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java @@ -17,21 +17,25 @@ public class SQLExecuteState { /** - * SQL参数,integer index模式 + * 执行SQL队列 */ - @Getter - private final Map indexParams; + private final List sqlExecuteParams; + + /** + * 当前执行对象 + */ + private SQLExecuteParam currentExecute; + /** - * SQL参数,string key 模型 + * 模式判断 */ @Getter - private final Map mapParams; + private boolean batchMode = false; /** - * 执行的sql + * 当前绑定sql */ @Getter - @Setter private String sql; /** @@ -78,9 +82,56 @@ public SQLExecuteState(String sql, ConnectionProxy connectionProxy, Statement st this.connectionProxy = connectionProxy; this.statement = statement; this.metaData = metaData; + this.sqlExecuteParams = new ArrayList<>(); + + this.currentExecute = new SQLExecuteParam(); + this.currentExecute.setSql(sql); + this.sqlExecuteParams.add(currentExecute); + } - this.indexParams = new HashMap<>(); - this.mapParams = new HashMap<>(); + public void setSql(String sql){ + this.sql = sql; + if(this.currentExecute!=null) { + this.currentExecute.setSql(sql); + } + } + + /** + * 添加任务队列 + * + * @param sql 执行sql + */ + public void addBatch(String sql) { + batchMode = true; + SQLExecuteParam executeParam = new SQLExecuteParam(); + executeParam.setSql(sql); + this.sqlExecuteParams.add(executeParam); + this.currentExecute = executeParam; + } + + /** + * 添加任务队列 + * + */ + public void addBatch() { + this.addBatch(this.sql); + } + + /** + * 清空队列 + */ + public void clearBatch() { + this.sqlExecuteParams.clear(); + this.currentExecute = null; + } + + /** + * 清理参数设置 + */ + public void cleanParams(){ + if(this.currentExecute!=null) { + this.currentExecute.cleanParams(); + } } /** @@ -114,7 +165,9 @@ public long getExecuteTimestamp() { * @param value 参数值 */ public void setParam(String key, Object value) { - mapParams.put(key, value); + if(this.currentExecute!=null) { + currentExecute.setParam(key, value); + } } /** @@ -124,24 +177,68 @@ public void setParam(String key, Object value) { * @param value 参数值 */ public void setParam(int index, Object value) { - indexParams.put(index, value); + if(this.currentExecute!=null) { + currentExecute.setParam(index, value); + } } /** * 获取参数列表 + * * @return List */ - public List getListParams(){ - List list = new ArrayList<>(); - if (indexParams.isEmpty()) { - return list; + public List getListParams() { + if(batchMode){ + if(this.sqlExecuteParams.isEmpty()){ + return new ArrayList<>(); + } + int size = this.sqlExecuteParams.size(); + return this.sqlExecuteParams.get(size-2).getListParams(); } - List keys = new ArrayList<>(indexParams.keySet()); - Collections.sort(keys); - for(Integer key: keys){ - list.add(indexParams.get(key)); + if(this.currentExecute!=null) { + return currentExecute.getListParams(); } - return list; + return new ArrayList<>(); + } + + /** + * 获取执行的SQL队列 + * @return List + */ + public List getBatchExecuteSQLParamList(){ + if(this.batchMode){ + if(this.sqlExecuteParams.isEmpty()){ + return new ArrayList<>(); + } + int size = this.sqlExecuteParams.size(); + return this.sqlExecuteParams.subList(0,size-1); + + } + return new ArrayList<>(); + } + + + /** + * 获取Batch的SQLExecuteState + * @return List + */ + public List getBatchSQLExecuteStateList(){ + if(this.batchMode){ + if(this.sqlExecuteParams.isEmpty()){ + return new ArrayList<>(); + } + int size = this.sqlExecuteParams.size(); + List list = new ArrayList<>(); + List paramList = this.sqlExecuteParams.subList(0,size-1); + for(SQLExecuteParam executeParam:paramList){ + SQLExecuteState executeState = new SQLExecuteState(executeParam.getSql(), connectionProxy,statement,metaData); + executeState.currentExecute = executeParam; + list.add(executeState); + } + return list; + + } + return new ArrayList<>(); } @@ -167,13 +264,15 @@ public String getTransactionKey() { /** * 查询 + * * @param sql sql * @return 查询结果 * @throws SQLException 查询异常 */ public List> query(String sql) throws SQLException { - return this.query(sql,new ArrayList<>()); + return this.query(sql, new ArrayList<>()); } + /** * 查询 * @@ -219,7 +318,7 @@ public List> getStatementGenerateKeys(DbTable dbTable) { for (int i = 1; i <= columnCount; i++) { String columName = resultSetMetaData.getColumnName(i); DbColumn dbColumn = dbTable.getColumnByName(columName); - if(dbColumn!=null) { + if (dbColumn != null) { map.put(dbColumn.getName(), rs.getObject(i)); } } @@ -234,6 +333,7 @@ public List> getStatementGenerateKeys(DbTable dbTable) { /** * 获取驱动配置信息 + * * @return Properties */ public Properties getDriverProperties() { @@ -246,6 +346,7 @@ public Properties getDriverProperties() { /** * 获取数据库的jdbcUrl + * * @return jdbcUrl */ public String getJdbcUrl() { @@ -258,10 +359,11 @@ public String getJdbcUrl() { /** * 获取数据库的jdbcKey + * * @return jdbcKey */ - public String getJdbcKey(){ - if(metaData==null){ + public String getJdbcKey() { + if (metaData == null) { return null; } return metaData.getKeyJdbcKey(); @@ -270,6 +372,7 @@ public String getJdbcKey(){ /** * 更新数据库的元数据信息 + * * @param tableName 表名 */ public void updateMetaData(String tableName) throws SQLException { diff --git a/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java index 1914483..aaa2cf4 100644 --- a/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.parser; +import com.codingapi.dbstream.interceptor.SQLExecuteParam; import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; @@ -31,6 +32,9 @@ public DeleteDBEventParser(SQLExecuteState executeState, DeleteSQLParser sqlPars } public void prepare() throws SQLException { + System.out.println("delete batch-mode:"+this.executeState.isBatchMode()); + List executeParamList = this.executeState.getBatchExecuteSQLParamList(); + System.out.println(executeParamList.size()); this.updateRows(); } diff --git a/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java index 4179ad9..9d2ded6 100644 --- a/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.parser; +import com.codingapi.dbstream.interceptor.SQLExecuteParam; import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; @@ -80,6 +81,9 @@ private List loadDataEvents() { } public void prepare() throws SQLException { + System.out.println("insert batch-mode:"+this.executeState.isBatchMode()); + List executeParamList = this.executeState.getBatchExecuteSQLParamList(); + System.out.println(executeParamList.size()); boolean defaultInsertSQL = this.sqlParser.isDefaultInsertSQL(); if (defaultInsertSQL) { this.loadDefaultInsertDataList(); diff --git a/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java index abf0d62..deb047d 100644 --- a/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.parser; +import com.codingapi.dbstream.interceptor.SQLExecuteParam; import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; @@ -32,6 +33,9 @@ public UpdateDBEventParser(SQLExecuteState executeState, UpdateSQLParser sqlPars } public void prepare() throws SQLException { + System.out.println("update batch-mode:"+this.executeState.isBatchMode()); + List executeParamList = this.executeState.getBatchExecuteSQLParamList(); + System.out.println(executeParamList.size()); this.updateRows(); } diff --git a/src/main/java/com/codingapi/dbstream/proxy/CallableStatementProxy.java b/src/main/java/com/codingapi/dbstream/proxy/CallableStatementProxy.java index 595f835..c1a25c3 100644 --- a/src/main/java/com/codingapi/dbstream/proxy/CallableStatementProxy.java +++ b/src/main/java/com/codingapi/dbstream/proxy/CallableStatementProxy.java @@ -799,6 +799,7 @@ public void setBinaryStream(int parameterIndex, InputStream x, int length) throw @Override public void clearParameters() throws SQLException { callableStatement.clearParameters(); + this.executeState.cleanParams(); } @Override @@ -824,6 +825,7 @@ public boolean execute() throws SQLException { @Override public void addBatch() throws SQLException { callableStatement.addBatch(); + this.executeState.addBatch(); } @Override @@ -1153,11 +1155,13 @@ public int getResultSetType() throws SQLException { @Override public void addBatch(String sql) throws SQLException { callableStatement.addBatch(sql); + this.executeState.addBatch(sql); } @Override public void clearBatch() throws SQLException { callableStatement.clearBatch(); + this.executeState.clearBatch(); } @Override diff --git a/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java b/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java index 82104ec..4f94925 100644 --- a/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java +++ b/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java @@ -147,6 +147,7 @@ public void setBinaryStream(int parameterIndex, InputStream x, int length) throw @Override public void clearParameters() throws SQLException { preparedStatement.clearParameters(); + this.executeState.cleanParams(); } @Override @@ -163,7 +164,6 @@ public void setObject(int parameterIndex, Object x) throws SQLException { @Override public boolean execute() throws SQLException { - System.out.println("execute:"+this.executeState.getSql()); SQLRunningContext.getInstance().before(this.executeState); boolean result = preparedStatement.execute(); SQLRunningContext.getInstance().after(this.executeState, result); @@ -172,8 +172,8 @@ public boolean execute() throws SQLException { @Override public void addBatch() throws SQLException { - System.out.println("addBatch:"+this.executeState.getSql()); preparedStatement.addBatch(); + this.executeState.addBatch(); } @Override @@ -392,7 +392,6 @@ public ResultSet executeQuery(String sql) throws SQLException { @Override public int executeUpdate(String sql) throws SQLException { this.executeState.setSql(sql); - System.out.println("executeUpdate:"+sql); SQLRunningContext.getInstance().before(this.executeState); int result = preparedStatement.executeUpdate(sql); SQLRunningContext.getInstance().after(this.executeState, result); @@ -512,11 +511,13 @@ public int getResultSetType() throws SQLException { @Override public void addBatch(String sql) throws SQLException { preparedStatement.addBatch(sql); + this.executeState.addBatch(sql); } @Override public void clearBatch() throws SQLException { preparedStatement.clearBatch(); + this.executeState.clearBatch(); } @Override diff --git a/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java b/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java index 748e90f..f1a5920 100644 --- a/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java +++ b/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java @@ -13,6 +13,8 @@ import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; +import java.util.List; + import static org.junit.jupiter.api.Assertions.assertThrows; @@ -172,8 +174,14 @@ void test7() { @Transactional @Rollback(value = false) void test8() { - entityManager.createQuery("DELETE FROM User1").executeUpdate(); - for (int i=0;i<10;i++){ + List list = userRepository.findAll(); + for (User1 user:list){ + user.setEmail("111"); + } + userRepository.saveAll(list); + + userRepository.deleteAll(); + for (int i=0;i<5;i++){ User1 user1 = new User1(); user1.setUsername("admin1"); user1.setPassword("admin1"); diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index 8b2e11e..7c380aa 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -13,6 +13,7 @@ spring.jpa.hibernate.ddl-auto=update spring.jpa.properties.hibernate.jdbc.batch_size=1000 spring.jpa.properties.hibernate.order_inserts=true spring.jpa.properties.hibernate.order_updates=true +spring.jpa.properties.hibernate.order_deletes=true spring.jpa.properties.hibernate.batch_versioned_data=true spring.jpa.properties.hibernate.flush.mode=COMMIT logging.level.org.hibernate.engine.jdbc.batch.internal.BatchingBatch=DEBUG From 64af8c898ea2cfcf6f594d0d358445ed6387218b Mon Sep 17 00:00:00 2001 From: lorne <1991wangliang@gmail.com> Date: Wed, 12 Nov 2025 18:49:32 +0800 Subject: [PATCH 3/3] fix #2 --- scripts/.gitignore | 1 - scripts/db.yaml | 23 ------ .../dbstream/driver/DBStreamProxyDriver.java | 6 +- .../dbstream/interceptor/SQLExecuteState.java | 16 ---- .../listener/SQLDeleteExecuteListener.java | 61 -------------- .../listener/SQLInsertExecuteListener.java | 60 -------------- .../listener/SQLUpdateExecuteListener.java | 61 -------------- .../stream/SQLDeleteExecuteListener.java | 33 ++++++++ .../stream/SQLInsertExecuteListener.java | 34 ++++++++ .../stream/SQLStreamExecuteListener.java | 80 +++++++++++++++++++ .../stream/SQLUpdateExecuteListener.java | 33 ++++++++ .../listener/stream/ThreadLocalContext.java | 50 ++++++++++++ .../dbstream/parser/DBEventParser.java | 13 +++ .../dbstream/parser/DeleteDBEventParser.java | 7 +- .../DeleteSQLParser.java | 4 +- .../dbstream/parser/InsertDBEventParser.java | 10 +-- .../InsertSQLParser.java | 4 +- .../codingapi/dbstream/parser/SQLParser.java | 6 ++ .../dbstream/parser/UpdateDBEventParser.java | 7 +- .../UpdateSQLParser.java | 4 +- .../sqlparser/DeleteSQLParserTest.java | 1 + .../sqlparser/InsertSQLParserTest.java | 1 + .../sqlparser/UpdateSQLParserTest.java | 1 + .../com/example/dbstream/entity/User1.java | 3 +- src/test/resources/application.properties | 36 ++------- 25 files changed, 274 insertions(+), 281 deletions(-) delete mode 100644 scripts/.gitignore delete mode 100644 scripts/db.yaml delete mode 100644 src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java delete mode 100644 src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java delete mode 100644 src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java create mode 100644 src/main/java/com/codingapi/dbstream/listener/stream/SQLDeleteExecuteListener.java create mode 100644 src/main/java/com/codingapi/dbstream/listener/stream/SQLInsertExecuteListener.java create mode 100644 src/main/java/com/codingapi/dbstream/listener/stream/SQLStreamExecuteListener.java create mode 100644 src/main/java/com/codingapi/dbstream/listener/stream/SQLUpdateExecuteListener.java create mode 100644 src/main/java/com/codingapi/dbstream/listener/stream/ThreadLocalContext.java create mode 100644 src/main/java/com/codingapi/dbstream/parser/DBEventParser.java rename src/main/java/com/codingapi/dbstream/{sqlparser => parser}/DeleteSQLParser.java (95%) rename src/main/java/com/codingapi/dbstream/{sqlparser => parser}/InsertSQLParser.java (97%) create mode 100644 src/main/java/com/codingapi/dbstream/parser/SQLParser.java rename src/main/java/com/codingapi/dbstream/{sqlparser => parser}/UpdateSQLParser.java (97%) diff --git a/scripts/.gitignore b/scripts/.gitignore deleted file mode 100644 index 6320cd2..0000000 --- a/scripts/.gitignore +++ /dev/null @@ -1 +0,0 @@ -data \ No newline at end of file diff --git a/scripts/db.yaml b/scripts/db.yaml deleted file mode 100644 index 6bd4a47..0000000 --- a/scripts/db.yaml +++ /dev/null @@ -1,23 +0,0 @@ -version: "3.9" - -services: - postgres: - image: postgres:15 - container_name: postgres-db - restart: always - environment: - POSTGRES_USER: postgres - POSTGRES_PASSWORD: 123456 - POSTGRES_DB: demo - TZ: Asia/Shanghai - ports: - - "5432:5432" - volumes: - # 数据持久化路径 - - ./data:/var/lib/postgresql/data - networks: - - pg-network - -networks: - pg-network: - driver: bridge \ No newline at end of file diff --git a/src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java b/src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java index 712b7a1..d1306fc 100644 --- a/src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java +++ b/src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java @@ -1,9 +1,9 @@ package com.codingapi.dbstream.driver; import com.codingapi.dbstream.interceptor.SQLRunningContext; -import com.codingapi.dbstream.listener.SQLDeleteExecuteListener; -import com.codingapi.dbstream.listener.SQLInsertExecuteListener; -import com.codingapi.dbstream.listener.SQLUpdateExecuteListener; +import com.codingapi.dbstream.listener.stream.SQLDeleteExecuteListener; +import com.codingapi.dbstream.listener.stream.SQLInsertExecuteListener; +import com.codingapi.dbstream.listener.stream.SQLUpdateExecuteListener; import com.codingapi.dbstream.proxy.ConnectionProxy; import com.codingapi.dbstream.scanner.DBMetaContext; import com.codingapi.dbstream.scanner.DBMetaData; diff --git a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java index 503b124..eba1f1d 100644 --- a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java +++ b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java @@ -201,22 +201,6 @@ public List getListParams() { return new ArrayList<>(); } - /** - * 获取执行的SQL队列 - * @return List - */ - public List getBatchExecuteSQLParamList(){ - if(this.batchMode){ - if(this.sqlExecuteParams.isEmpty()){ - return new ArrayList<>(); - } - int size = this.sqlExecuteParams.size(); - return this.sqlExecuteParams.subList(0,size-1); - - } - return new ArrayList<>(); - } - /** * 获取Batch的SQLExecuteState diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java deleted file mode 100644 index c32ba81..0000000 --- a/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.codingapi.dbstream.listener; - -import com.codingapi.dbstream.DBStreamContext; -import com.codingapi.dbstream.interceptor.SQLExecuteState; -import com.codingapi.dbstream.parser.DeleteDBEventParser; -import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.DeleteSQLParser; -import com.codingapi.dbstream.stream.DBEvent; -import com.codingapi.dbstream.stream.TransactionEventPools; -import com.codingapi.dbstream.utils.SQLUtils; - -import java.sql.SQLException; -import java.util.List; - -public class SQLDeleteExecuteListener implements SQLExecuteListener { - - private final static ThreadLocal threadLocal = new ThreadLocal<>(); - - @Override - public int order() { - return 100; - } - - @Override - public void before(SQLExecuteState executeState) throws SQLException { - String sql = executeState.getSql(); - if (SQLUtils.isDeleteSQL(sql)) { - try { - threadLocal.remove(); - DeleteSQLParser sqlParser = new DeleteSQLParser(sql); - String tableName = sqlParser.getTableName(); - executeState.updateMetaData(tableName); - DbTable dbTable = executeState.getDbTable(tableName); - if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) { - DeleteDBEventParser dataParser = new DeleteDBEventParser(executeState, sqlParser, dbTable); - dataParser.prepare(); - threadLocal.set(dataParser); - } - } catch (Exception e) { - threadLocal.remove(); - throw new SQLException(e); - } - } else { - threadLocal.remove(); - } - } - - @Override - public void after(SQLExecuteState executeState, Object result) throws SQLException { - String sql = executeState.getSql(); - String transactionKey = executeState.getTransactionKey(); - if (SQLUtils.isDeleteSQL(sql)) { - DeleteDBEventParser dataParser = threadLocal.get(); - if (dataParser != null) { - List eventList = dataParser.loadEvents(result); - TransactionEventPools.getInstance().addEvents(transactionKey, eventList); - threadLocal.remove(); - } - } - } -} diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java deleted file mode 100644 index 53c7d65..0000000 --- a/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.codingapi.dbstream.listener; - -import com.codingapi.dbstream.DBStreamContext; -import com.codingapi.dbstream.interceptor.SQLExecuteState; -import com.codingapi.dbstream.parser.InsertDBEventParser; -import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.InsertSQLParser; -import com.codingapi.dbstream.stream.DBEvent; -import com.codingapi.dbstream.stream.TransactionEventPools; -import com.codingapi.dbstream.utils.SQLUtils; - -import java.sql.SQLException; -import java.util.List; - -public class SQLInsertExecuteListener implements SQLExecuteListener { - - private final static ThreadLocal threadLocal = new ThreadLocal<>(); - - @Override - public int order() { - return 100; - } - - @Override - public void before(SQLExecuteState executeState) throws SQLException { - String sql = executeState.getSql(); - if (SQLUtils.isInsertSQL(sql)) { - try { - threadLocal.remove(); - InsertSQLParser sqlParser = new InsertSQLParser(sql); - String tableName = sqlParser.getTableName(); - executeState.updateMetaData(tableName); - DbTable dbTable = executeState.getDbTable(tableName); - if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) { - InsertDBEventParser dataParser = new InsertDBEventParser(executeState, sqlParser, dbTable); - dataParser.prepare(); - threadLocal.set(dataParser); - } - } catch (Exception e) { - threadLocal.remove(); - throw new SQLException(e); - } - } - } - - @Override - public void after(SQLExecuteState executeState, Object result) throws SQLException { - String sql = executeState.getSql(); - String transactionKey = executeState.getTransactionKey(); - if (SQLUtils.isInsertSQL(sql)) { - InsertDBEventParser dataParser = threadLocal.get(); - if (dataParser != null) { - List eventList = dataParser.loadEvents(result); - TransactionEventPools.getInstance().addEvents(transactionKey, eventList); - threadLocal.remove(); - } - } - } - -} diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java deleted file mode 100644 index 2a05b10..0000000 --- a/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.codingapi.dbstream.listener; - -import com.codingapi.dbstream.DBStreamContext; -import com.codingapi.dbstream.interceptor.SQLExecuteState; -import com.codingapi.dbstream.parser.UpdateDBEventParser; -import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.UpdateSQLParser; -import com.codingapi.dbstream.stream.DBEvent; -import com.codingapi.dbstream.stream.TransactionEventPools; -import com.codingapi.dbstream.utils.SQLUtils; - -import java.sql.SQLException; -import java.util.List; - -public class SQLUpdateExecuteListener implements SQLExecuteListener { - - private final static ThreadLocal threadLocal = new ThreadLocal<>(); - - @Override - public int order() { - return 100; - } - - @Override - public void before(SQLExecuteState executeState) throws SQLException { - String sql = executeState.getSql(); - if (SQLUtils.isUpdateSQL(sql)) { - try { - threadLocal.remove(); - UpdateSQLParser sqlParser = new UpdateSQLParser(sql); - String tableName = sqlParser.getTableName(); - executeState.updateMetaData(tableName); - DbTable dbTable = executeState.getDbTable(tableName); - if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) { - UpdateDBEventParser dataParser = new UpdateDBEventParser(executeState, sqlParser, dbTable); - dataParser.prepare(); - threadLocal.set(dataParser); - } - } catch (Exception e) { - threadLocal.remove(); - throw new SQLException(e); - } - } else { - threadLocal.remove(); - } - } - - @Override - public void after(SQLExecuteState executeState, Object result) throws SQLException { - String sql = executeState.getSql(); - String transactionKey = executeState.getTransactionKey(); - if (SQLUtils.isUpdateSQL(sql)) { - UpdateDBEventParser dataParser = threadLocal.get(); - if (dataParser != null) { - List eventList = dataParser.loadEvents(result); - TransactionEventPools.getInstance().addEvents(transactionKey, eventList); - threadLocal.remove(); - } - } - } -} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/SQLDeleteExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/stream/SQLDeleteExecuteListener.java new file mode 100644 index 0000000..5a3de5b --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/SQLDeleteExecuteListener.java @@ -0,0 +1,33 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.interceptor.SQLExecuteState; +import com.codingapi.dbstream.parser.DBEventParser; +import com.codingapi.dbstream.parser.DeleteDBEventParser; +import com.codingapi.dbstream.parser.DeleteSQLParser; +import com.codingapi.dbstream.parser.SQLParser; +import com.codingapi.dbstream.scanner.DbTable; +import com.codingapi.dbstream.utils.SQLUtils; + +public class SQLDeleteExecuteListener extends SQLStreamExecuteListener { + + @Override + public int order() { + return 100; + } + + @Override + public boolean isSupport(String sql) { + return SQLUtils.isDeleteSQL(sql); + } + + @Override + public SQLParser createSQLParser(String sql) { + return new DeleteSQLParser(sql); + } + + @Override + public DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState, SQLParser sqlParser, DbTable dbTable) { + return new DeleteDBEventParser(sqlExecuteState, (DeleteSQLParser) sqlParser, dbTable); + } + +} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/SQLInsertExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/stream/SQLInsertExecuteListener.java new file mode 100644 index 0000000..d951cde --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/SQLInsertExecuteListener.java @@ -0,0 +1,34 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.interceptor.SQLExecuteState; +import com.codingapi.dbstream.parser.DBEventParser; +import com.codingapi.dbstream.parser.InsertDBEventParser; +import com.codingapi.dbstream.parser.InsertSQLParser; +import com.codingapi.dbstream.parser.SQLParser; +import com.codingapi.dbstream.scanner.DbTable; +import com.codingapi.dbstream.utils.SQLUtils; + +public class SQLInsertExecuteListener extends SQLStreamExecuteListener { + + @Override + public int order() { + return 100; + } + + @Override + public boolean isSupport(String sql) { + return SQLUtils.isInsertSQL(sql); + } + + @Override + public SQLParser createSQLParser(String sql) { + return new InsertSQLParser(sql); + } + + @Override + public DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState, SQLParser sqlParser, DbTable dbTable) { + return new InsertDBEventParser(sqlExecuteState, (InsertSQLParser) sqlParser, dbTable); + } + + +} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/SQLStreamExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/stream/SQLStreamExecuteListener.java new file mode 100644 index 0000000..b140f05 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/SQLStreamExecuteListener.java @@ -0,0 +1,80 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.DBStreamContext; +import com.codingapi.dbstream.interceptor.SQLExecuteState; +import com.codingapi.dbstream.listener.SQLExecuteListener; +import com.codingapi.dbstream.parser.*; +import com.codingapi.dbstream.scanner.DbTable; +import com.codingapi.dbstream.stream.DBEvent; +import com.codingapi.dbstream.stream.TransactionEventPools; + +import java.sql.SQLException; +import java.util.List; + +public abstract class SQLStreamExecuteListener implements SQLExecuteListener { + + public abstract boolean isSupport(String sql); + + public abstract SQLParser createSQLParser(String sql); + + public abstract DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState,SQLParser sqlParser,DbTable dbTable); + + @Override + public void before(SQLExecuteState executeState) throws SQLException { + String sql = executeState.getSql(); + if (this.isSupport(sql)) { + try { + ThreadLocalContext.getInstance().remove(); + SQLParser sqlParser = this.createSQLParser(sql); + String tableName = sqlParser.getTableName(); + executeState.updateMetaData(tableName); + DbTable dbTable = executeState.getDbTable(tableName); + if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) { + if (executeState.isBatchMode()) { + List executeStateList = executeState.getBatchSQLExecuteStateList(); + for (int i = 0; i < executeStateList.size(); i++) { + SQLExecuteState sqlExecuteState = executeStateList.get(i); + DBEventParser dataParser = this.createDbEventParser(sqlExecuteState,sqlParser,dbTable); + dataParser.prepare(); + ThreadLocalContext.getInstance().push(i, dataParser); + } + } else { + DBEventParser dataParser = this.createDbEventParser(executeState,sqlParser,dbTable); + dataParser.prepare(); + ThreadLocalContext.getInstance().push(dataParser); + } + } + } catch (Exception e) { + ThreadLocalContext.getInstance().remove(); + throw new SQLException(e); + } + } + } + + @Override + public void after(SQLExecuteState executeState, Object result) throws SQLException { + String sql = executeState.getSql(); + String transactionKey = executeState.getTransactionKey(); + if (this.isSupport(sql)) { + if (executeState.isBatchMode()) { + List executeStateList = executeState.getBatchSQLExecuteStateList(); + for (int i = 0; i < executeStateList.size(); i++) { + DBEventParser dataParser = ThreadLocalContext.getInstance().get(i); + if (dataParser != null) { + List eventList = dataParser.loadEvents(result); + TransactionEventPools.getInstance().addEvents(transactionKey, eventList); + } + } + ThreadLocalContext.getInstance().remove(); + } else { + DBEventParser dataParser = ThreadLocalContext.getInstance().get(); + if (dataParser != null) { + List eventList = dataParser.loadEvents(result); + TransactionEventPools.getInstance().addEvents(transactionKey, eventList); + } + ThreadLocalContext.getInstance().remove(); + } + } + } + +} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/SQLUpdateExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/stream/SQLUpdateExecuteListener.java new file mode 100644 index 0000000..0b7f809 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/SQLUpdateExecuteListener.java @@ -0,0 +1,33 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.interceptor.SQLExecuteState; +import com.codingapi.dbstream.parser.DBEventParser; +import com.codingapi.dbstream.parser.SQLParser; +import com.codingapi.dbstream.parser.UpdateDBEventParser; +import com.codingapi.dbstream.parser.UpdateSQLParser; +import com.codingapi.dbstream.scanner.DbTable; +import com.codingapi.dbstream.utils.SQLUtils; + +public class SQLUpdateExecuteListener extends SQLStreamExecuteListener { + + @Override + public int order() { + return 100; + } + + @Override + public boolean isSupport(String sql) { + return SQLUtils.isUpdateSQL(sql); + } + + @Override + public SQLParser createSQLParser(String sql) { + return new UpdateSQLParser(sql); + } + + @Override + public DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState, SQLParser sqlParser, DbTable dbTable) { + return new UpdateDBEventParser(sqlExecuteState, (UpdateSQLParser) sqlParser, dbTable); + } + +} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/ThreadLocalContext.java b/src/main/java/com/codingapi/dbstream/listener/stream/ThreadLocalContext.java new file mode 100644 index 0000000..0eae441 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/ThreadLocalContext.java @@ -0,0 +1,50 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.parser.DBEventParser; +import lombok.Getter; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class ThreadLocalContext { + + private final Map cache; + + private final ThreadLocal threadLocal = new ThreadLocal<>(); + + private ThreadLocalContext(){ + this.cache = new ConcurrentHashMap<>(); + } + + @Getter + private final static ThreadLocalContext instance = new ThreadLocalContext(); + + public void remove(){ + this.threadLocal.remove(); + } + + public void push(DBEventParser eventParser){ + this.push(0,eventParser); + } + + public void push(int index,DBEventParser eventParser){ + ThreadLocalContext context = threadLocal.get(); + if(context==null){ + context = new ThreadLocalContext(); + threadLocal.set(context); + } + context.cache.put(index,eventParser); + } + + public DBEventParser get() { + return this.get(0); + } + + public DBEventParser get(int index) { + ThreadLocalContext context = threadLocal.get(); + if(context==null){ + return null; + } + return context.cache.get(index); + } +} diff --git a/src/main/java/com/codingapi/dbstream/parser/DBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/DBEventParser.java new file mode 100644 index 0000000..15ad526 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/parser/DBEventParser.java @@ -0,0 +1,13 @@ +package com.codingapi.dbstream.parser; + +import com.codingapi.dbstream.stream.DBEvent; + +import java.sql.SQLException; +import java.util.List; + +public interface DBEventParser { + + void prepare() throws SQLException; + + List loadEvents(Object result) throws SQLException; +} diff --git a/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java index aaa2cf4..29e62d7 100644 --- a/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java @@ -1,10 +1,8 @@ package com.codingapi.dbstream.parser; -import com.codingapi.dbstream.interceptor.SQLExecuteParam; import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.DeleteSQLParser; import com.codingapi.dbstream.stream.DBEvent; import com.codingapi.dbstream.stream.EventType; import com.codingapi.dbstream.utils.ResultSetUtils; @@ -15,7 +13,7 @@ import java.util.List; import java.util.Map; -public class DeleteDBEventParser { +public class DeleteDBEventParser implements DBEventParser { private final List> prepareList = new ArrayList<>(); @@ -32,9 +30,6 @@ public DeleteDBEventParser(SQLExecuteState executeState, DeleteSQLParser sqlPars } public void prepare() throws SQLException { - System.out.println("delete batch-mode:"+this.executeState.isBatchMode()); - List executeParamList = this.executeState.getBatchExecuteSQLParamList(); - System.out.println(executeParamList.size()); this.updateRows(); } diff --git a/src/main/java/com/codingapi/dbstream/sqlparser/DeleteSQLParser.java b/src/main/java/com/codingapi/dbstream/parser/DeleteSQLParser.java similarity index 95% rename from src/main/java/com/codingapi/dbstream/sqlparser/DeleteSQLParser.java rename to src/main/java/com/codingapi/dbstream/parser/DeleteSQLParser.java index b92a1da..77f1512 100644 --- a/src/main/java/com/codingapi/dbstream/sqlparser/DeleteSQLParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/DeleteSQLParser.java @@ -1,11 +1,11 @@ -package com.codingapi.dbstream.sqlparser; +package com.codingapi.dbstream.parser; import com.codingapi.dbstream.utils.SQLUtils; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class DeleteSQLParser { +public class DeleteSQLParser implements SQLParser { private final String sql; diff --git a/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java index 9d2ded6..493cf46 100644 --- a/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java @@ -1,10 +1,8 @@ package com.codingapi.dbstream.parser; -import com.codingapi.dbstream.interceptor.SQLExecuteParam; import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.InsertSQLParser; import com.codingapi.dbstream.stream.DBEvent; import com.codingapi.dbstream.stream.EventType; import com.codingapi.dbstream.utils.ResultSetUtils; @@ -16,7 +14,7 @@ import java.util.List; import java.util.Map; -public class InsertDBEventParser { +public class InsertDBEventParser implements DBEventParser{ private final InsertSQLParser sqlParser; private final SQLExecuteState executeState; @@ -81,9 +79,6 @@ private List loadDataEvents() { } public void prepare() throws SQLException { - System.out.println("insert batch-mode:"+this.executeState.isBatchMode()); - List executeParamList = this.executeState.getBatchExecuteSQLParamList(); - System.out.println(executeParamList.size()); boolean defaultInsertSQL = this.sqlParser.isDefaultInsertSQL(); if (defaultInsertSQL) { this.loadDefaultInsertDataList(); @@ -107,6 +102,7 @@ private void loadSelectInsertDataList() throws SQLException { private void loadDefaultInsertDataList() throws SQLException { List values = this.sqlParser.getValues(); + List paramList = this.executeState.getListParams(); Map data = new HashMap<>(); for (int i = 0; i < columns.size(); i++) { String column = columns.get(i); @@ -122,7 +118,7 @@ private void loadDefaultInsertDataList() throws SQLException { } } } else if (insertValue.isJdbc()) { - value = i + 1; + value = paramList.get(i); } else { value = insertValue.getValue(); } diff --git a/src/main/java/com/codingapi/dbstream/sqlparser/InsertSQLParser.java b/src/main/java/com/codingapi/dbstream/parser/InsertSQLParser.java similarity index 97% rename from src/main/java/com/codingapi/dbstream/sqlparser/InsertSQLParser.java rename to src/main/java/com/codingapi/dbstream/parser/InsertSQLParser.java index 52f14f7..c1e80ee 100644 --- a/src/main/java/com/codingapi/dbstream/sqlparser/InsertSQLParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/InsertSQLParser.java @@ -1,4 +1,4 @@ -package com.codingapi.dbstream.sqlparser; +package com.codingapi.dbstream.parser; import com.codingapi.dbstream.utils.SQLUtils; import lombok.Getter; @@ -9,7 +9,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -public class InsertSQLParser { +public class InsertSQLParser implements SQLParser{ // 匹配 INSERT INTO table_name (...) private static final Pattern TABLE_NAME_PATTERN = Pattern.compile( diff --git a/src/main/java/com/codingapi/dbstream/parser/SQLParser.java b/src/main/java/com/codingapi/dbstream/parser/SQLParser.java new file mode 100644 index 0000000..5f83c36 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/parser/SQLParser.java @@ -0,0 +1,6 @@ +package com.codingapi.dbstream.parser; + +public interface SQLParser { + + String getTableName(); +} diff --git a/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java index deb047d..1c363ae 100644 --- a/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java @@ -1,10 +1,8 @@ package com.codingapi.dbstream.parser; -import com.codingapi.dbstream.interceptor.SQLExecuteParam; import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.UpdateSQLParser; import com.codingapi.dbstream.stream.DBEvent; import com.codingapi.dbstream.stream.EventType; import com.codingapi.dbstream.utils.ResultSetUtils; @@ -15,7 +13,7 @@ import java.util.List; import java.util.Map; -public class UpdateDBEventParser { +public class UpdateDBEventParser implements DBEventParser{ private final UpdateSQLParser sqlParser; private final String aliasTable; @@ -33,9 +31,6 @@ public UpdateDBEventParser(SQLExecuteState executeState, UpdateSQLParser sqlPars } public void prepare() throws SQLException { - System.out.println("update batch-mode:"+this.executeState.isBatchMode()); - List executeParamList = this.executeState.getBatchExecuteSQLParamList(); - System.out.println(executeParamList.size()); this.updateRows(); } diff --git a/src/main/java/com/codingapi/dbstream/sqlparser/UpdateSQLParser.java b/src/main/java/com/codingapi/dbstream/parser/UpdateSQLParser.java similarity index 97% rename from src/main/java/com/codingapi/dbstream/sqlparser/UpdateSQLParser.java rename to src/main/java/com/codingapi/dbstream/parser/UpdateSQLParser.java index 5e4f4da..e3b3e39 100644 --- a/src/main/java/com/codingapi/dbstream/sqlparser/UpdateSQLParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/UpdateSQLParser.java @@ -1,4 +1,4 @@ -package com.codingapi.dbstream.sqlparser; +package com.codingapi.dbstream.parser; import com.codingapi.dbstream.utils.SQLUtils; @@ -7,7 +7,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -public class UpdateSQLParser { +public class UpdateSQLParser implements SQLParser{ private final String sql; diff --git a/src/test/java/com/codingapi/dbstream/sqlparser/DeleteSQLParserTest.java b/src/test/java/com/codingapi/dbstream/sqlparser/DeleteSQLParserTest.java index 492d800..0bbda52 100644 --- a/src/test/java/com/codingapi/dbstream/sqlparser/DeleteSQLParserTest.java +++ b/src/test/java/com/codingapi/dbstream/sqlparser/DeleteSQLParserTest.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.sqlparser; +import com.codingapi.dbstream.parser.DeleteSQLParser; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvFileSource; diff --git a/src/test/java/com/codingapi/dbstream/sqlparser/InsertSQLParserTest.java b/src/test/java/com/codingapi/dbstream/sqlparser/InsertSQLParserTest.java index b7c68d9..18d4911 100644 --- a/src/test/java/com/codingapi/dbstream/sqlparser/InsertSQLParserTest.java +++ b/src/test/java/com/codingapi/dbstream/sqlparser/InsertSQLParserTest.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.sqlparser; +import com.codingapi.dbstream.parser.InsertSQLParser; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvFileSource; diff --git a/src/test/java/com/codingapi/dbstream/sqlparser/UpdateSQLParserTest.java b/src/test/java/com/codingapi/dbstream/sqlparser/UpdateSQLParserTest.java index 969997c..917b4b1 100644 --- a/src/test/java/com/codingapi/dbstream/sqlparser/UpdateSQLParserTest.java +++ b/src/test/java/com/codingapi/dbstream/sqlparser/UpdateSQLParserTest.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.sqlparser; +import com.codingapi.dbstream.parser.UpdateSQLParser; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvFileSource; diff --git a/src/test/java/com/example/dbstream/entity/User1.java b/src/test/java/com/example/dbstream/entity/User1.java index 3cf9956..4c73faf 100644 --- a/src/test/java/com/example/dbstream/entity/User1.java +++ b/src/test/java/com/example/dbstream/entity/User1.java @@ -10,8 +10,7 @@ public class User1 { @Id - @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "user_seq") - @SequenceGenerator(name = "user_seq", sequenceName = "user_seq", allocationSize = 50) + @GeneratedValue(strategy = GenerationType.AUTO) private Long id; private String username; diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index 7c380aa..37aec93 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -1,33 +1,11 @@ spring.application.name=example spring.datasource.driver-class-name=com.codingapi.dbstream.driver.DBStreamProxyDriver -#spring.datasource.url=jdbc:h2:file:./test.db - -spring.datasource.url=jdbc:postgresql://localhost:5432/demo -spring.datasource.username=postgres -spring.datasource.password=123456 -spring.jpa.show-sql=true +spring.datasource.url=jdbc:h2:file:./test.db spring.jpa.hibernate.ddl-auto=update -#spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQLDialect -#spring.jpa.database-platform=org.hibernate.dialect.H2Dialect - -spring.jpa.properties.hibernate.jdbc.batch_size=1000 -spring.jpa.properties.hibernate.order_inserts=true -spring.jpa.properties.hibernate.order_updates=true -spring.jpa.properties.hibernate.order_deletes=true -spring.jpa.properties.hibernate.batch_versioned_data=true -spring.jpa.properties.hibernate.flush.mode=COMMIT -logging.level.org.hibernate.engine.jdbc.batch.internal.BatchingBatch=DEBUG -logging.level.org.hibernate.engine.jdbc.spi.SqlStatementLogger=DEBUG - -# -# -#spring.application.name=example -#spring.datasource.driver-class-name=com.codingapi.dbstream.driver.DBStreamProxyDriver -#spring.datasource.url=jdbc:dm://10.13.14.227:5236/TOBACCO_HR_NINGXIA_DEV?localTimezone=480 -#spring.datasource.username=TOBACCO_HR_NINGXIA_DEV -#spring.datasource.password=NX_TOBACCO_HR2025 -#spring.jpa.show-sql=true -#spring.jpa.hibernate.ddl-auto=update -#spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.DmDialect -#spring.jpa.database-platform=org.hibernate.dialect.DmDialect \ No newline at end of file +#spring.jpa.properties.hibernate.jdbc.batch_size=1000 +#spring.jpa.properties.hibernate.order_inserts=true +#spring.jpa.properties.hibernate.order_updates=true +#spring.jpa.properties.hibernate.order_deletes=true +#spring.jpa.properties.hibernate.batch_versioned_data=true +#spring.jpa.properties.hibernate.flush.mode=COMMIT