diff --git a/pom.xml b/pom.xml index d562688..2e5d9a2 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 com.codingapi.dbstream dbstream-driver - 1.0.8 + 1.0.9 https://github.com/codingapi/dbstream-driver dbstream-driver @@ -34,8 +34,7 @@ 1.18.42 - 2.7.18 - 2.2.222 + 3.5.7 @@ -55,7 +54,14 @@ com.h2database h2 - ${h2.version} + 2.2.222 + test + + + + org.postgresql + postgresql + 42.7.3 test diff --git a/src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java b/src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java index 712b7a1..d1306fc 100644 --- a/src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java +++ b/src/main/java/com/codingapi/dbstream/driver/DBStreamProxyDriver.java @@ -1,9 +1,9 @@ package com.codingapi.dbstream.driver; import com.codingapi.dbstream.interceptor.SQLRunningContext; -import com.codingapi.dbstream.listener.SQLDeleteExecuteListener; -import com.codingapi.dbstream.listener.SQLInsertExecuteListener; -import com.codingapi.dbstream.listener.SQLUpdateExecuteListener; +import com.codingapi.dbstream.listener.stream.SQLDeleteExecuteListener; +import com.codingapi.dbstream.listener.stream.SQLInsertExecuteListener; +import com.codingapi.dbstream.listener.stream.SQLUpdateExecuteListener; import com.codingapi.dbstream.proxy.ConnectionProxy; import com.codingapi.dbstream.scanner.DBMetaContext; import com.codingapi.dbstream.scanner.DBMetaData; diff --git a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteParam.java b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteParam.java new file mode 100644 index 0000000..6f73a6d --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteParam.java @@ -0,0 +1,82 @@ +package com.codingapi.dbstream.interceptor; + +import lombok.Getter; +import lombok.Setter; + +import java.util.*; + +/** + * SQL执行参数信息 + */ +public class SQLExecuteParam { + + + /** + * SQL参数,integer index模式 + */ + @Getter + private final Map indexParams; + /** + * SQL参数,string key 模型 + */ + @Getter + private final Map mapParams; + + /** + * 执行的sql + */ + @Getter + @Setter + private String sql; + + public SQLExecuteParam() { + this.indexParams = new HashMap<>(); + this.mapParams = new HashMap<>(); + } + + + /** + * 更新sql参数 + * + * @param key 参数key + * @param value 参数值 + */ + public void setParam(String key, Object value) { + mapParams.put(key, value); + } + + /** + * 更新sql参数 + * + * @param index 参数索引 + * @param value 参数值 + */ + public void setParam(int index, Object value) { + indexParams.put(index, value); + } + + /** + * 清理参数 + */ + public void cleanParams(){ + this.indexParams.clear(); + this.mapParams.clear(); + } + + /** + * 获取参数列表 + * @return List + */ + public List getListParams(){ + List list = new ArrayList<>(); + if (indexParams.isEmpty()) { + return list; + } + List keys = new ArrayList<>(indexParams.keySet()); + Collections.sort(keys); + for(Integer key: keys){ + list.add(indexParams.get(key)); + } + return list; + } +} diff --git a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java index 5984a88..eba1f1d 100644 --- a/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java +++ b/src/main/java/com/codingapi/dbstream/interceptor/SQLExecuteState.java @@ -17,21 +17,25 @@ public class SQLExecuteState { /** - * SQL参数,integer index模式 + * 执行SQL队列 */ - @Getter - private final Map indexParams; + private final List sqlExecuteParams; + /** - * SQL参数,string key 模型 + * 当前执行对象 + */ + private SQLExecuteParam currentExecute; + + /** + * 模式判断 */ @Getter - private final Map mapParams; + private boolean batchMode = false; /** - * 执行的sql + * 当前绑定sql */ @Getter - @Setter private String sql; /** @@ -78,9 +82,56 @@ public SQLExecuteState(String sql, ConnectionProxy connectionProxy, Statement st this.connectionProxy = connectionProxy; this.statement = statement; this.metaData = metaData; + this.sqlExecuteParams = new ArrayList<>(); + + this.currentExecute = new SQLExecuteParam(); + this.currentExecute.setSql(sql); + this.sqlExecuteParams.add(currentExecute); + } - this.indexParams = new HashMap<>(); - this.mapParams = new HashMap<>(); + public void setSql(String sql){ + this.sql = sql; + if(this.currentExecute!=null) { + this.currentExecute.setSql(sql); + } + } + + /** + * 添加任务队列 + * + * @param sql 执行sql + */ + public void addBatch(String sql) { + batchMode = true; + SQLExecuteParam executeParam = new SQLExecuteParam(); + executeParam.setSql(sql); + this.sqlExecuteParams.add(executeParam); + this.currentExecute = executeParam; + } + + /** + * 添加任务队列 + * + */ + public void addBatch() { + this.addBatch(this.sql); + } + + /** + * 清空队列 + */ + public void clearBatch() { + this.sqlExecuteParams.clear(); + this.currentExecute = null; + } + + /** + * 清理参数设置 + */ + public void cleanParams(){ + if(this.currentExecute!=null) { + this.currentExecute.cleanParams(); + } } /** @@ -114,7 +165,9 @@ public long getExecuteTimestamp() { * @param value 参数值 */ public void setParam(String key, Object value) { - mapParams.put(key, value); + if(this.currentExecute!=null) { + currentExecute.setParam(key, value); + } } /** @@ -124,24 +177,52 @@ public void setParam(String key, Object value) { * @param value 参数值 */ public void setParam(int index, Object value) { - indexParams.put(index, value); + if(this.currentExecute!=null) { + currentExecute.setParam(index, value); + } } /** * 获取参数列表 + * * @return List */ - public List getListParams(){ - List list = new ArrayList<>(); - if (indexParams.isEmpty()) { - return list; + public List getListParams() { + if(batchMode){ + if(this.sqlExecuteParams.isEmpty()){ + return new ArrayList<>(); + } + int size = this.sqlExecuteParams.size(); + return this.sqlExecuteParams.get(size-2).getListParams(); } - List keys = new ArrayList<>(indexParams.keySet()); - Collections.sort(keys); - for(Integer key: keys){ - list.add(indexParams.get(key)); + if(this.currentExecute!=null) { + return currentExecute.getListParams(); } - return list; + return new ArrayList<>(); + } + + + /** + * 获取Batch的SQLExecuteState + * @return List + */ + public List getBatchSQLExecuteStateList(){ + if(this.batchMode){ + if(this.sqlExecuteParams.isEmpty()){ + return new ArrayList<>(); + } + int size = this.sqlExecuteParams.size(); + List list = new ArrayList<>(); + List paramList = this.sqlExecuteParams.subList(0,size-1); + for(SQLExecuteParam executeParam:paramList){ + SQLExecuteState executeState = new SQLExecuteState(executeParam.getSql(), connectionProxy,statement,metaData); + executeState.currentExecute = executeParam; + list.add(executeState); + } + return list; + + } + return new ArrayList<>(); } @@ -167,13 +248,15 @@ public String getTransactionKey() { /** * 查询 + * * @param sql sql * @return 查询结果 * @throws SQLException 查询异常 */ public List> query(String sql) throws SQLException { - return this.query(sql,new ArrayList<>()); + return this.query(sql, new ArrayList<>()); } + /** * 查询 * @@ -216,10 +299,12 @@ public List> getStatementGenerateKeys(DbTable dbTable) { Map map = new HashMap<>(); ResultSetMetaData resultSetMetaData = rs.getMetaData(); int columnCount = resultSetMetaData.getColumnCount(); - List primaryKeyColumns = dbTable.getPrimaryColumns(); for (int i = 1; i <= columnCount; i++) { - DbColumn dbColumn = primaryKeyColumns.get(i - 1); - map.put(dbColumn.getName(), rs.getObject(i)); + String columName = resultSetMetaData.getColumnName(i); + DbColumn dbColumn = dbTable.getColumnByName(columName); + if (dbColumn != null) { + map.put(dbColumn.getName(), rs.getObject(i)); + } } list.add(map); } @@ -232,6 +317,7 @@ public List> getStatementGenerateKeys(DbTable dbTable) { /** * 获取驱动配置信息 + * * @return Properties */ public Properties getDriverProperties() { @@ -244,6 +330,7 @@ public Properties getDriverProperties() { /** * 获取数据库的jdbcUrl + * * @return jdbcUrl */ public String getJdbcUrl() { @@ -256,10 +343,11 @@ public String getJdbcUrl() { /** * 获取数据库的jdbcKey + * * @return jdbcKey */ - public String getJdbcKey(){ - if(metaData==null){ + public String getJdbcKey() { + if (metaData == null) { return null; } return metaData.getKeyJdbcKey(); @@ -268,6 +356,7 @@ public String getJdbcKey(){ /** * 更新数据库的元数据信息 + * * @param tableName 表名 */ public void updateMetaData(String tableName) throws SQLException { diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java deleted file mode 100644 index c32ba81..0000000 --- a/src/main/java/com/codingapi/dbstream/listener/SQLDeleteExecuteListener.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.codingapi.dbstream.listener; - -import com.codingapi.dbstream.DBStreamContext; -import com.codingapi.dbstream.interceptor.SQLExecuteState; -import com.codingapi.dbstream.parser.DeleteDBEventParser; -import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.DeleteSQLParser; -import com.codingapi.dbstream.stream.DBEvent; -import com.codingapi.dbstream.stream.TransactionEventPools; -import com.codingapi.dbstream.utils.SQLUtils; - -import java.sql.SQLException; -import java.util.List; - -public class SQLDeleteExecuteListener implements SQLExecuteListener { - - private final static ThreadLocal threadLocal = new ThreadLocal<>(); - - @Override - public int order() { - return 100; - } - - @Override - public void before(SQLExecuteState executeState) throws SQLException { - String sql = executeState.getSql(); - if (SQLUtils.isDeleteSQL(sql)) { - try { - threadLocal.remove(); - DeleteSQLParser sqlParser = new DeleteSQLParser(sql); - String tableName = sqlParser.getTableName(); - executeState.updateMetaData(tableName); - DbTable dbTable = executeState.getDbTable(tableName); - if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) { - DeleteDBEventParser dataParser = new DeleteDBEventParser(executeState, sqlParser, dbTable); - dataParser.prepare(); - threadLocal.set(dataParser); - } - } catch (Exception e) { - threadLocal.remove(); - throw new SQLException(e); - } - } else { - threadLocal.remove(); - } - } - - @Override - public void after(SQLExecuteState executeState, Object result) throws SQLException { - String sql = executeState.getSql(); - String transactionKey = executeState.getTransactionKey(); - if (SQLUtils.isDeleteSQL(sql)) { - DeleteDBEventParser dataParser = threadLocal.get(); - if (dataParser != null) { - List eventList = dataParser.loadEvents(result); - TransactionEventPools.getInstance().addEvents(transactionKey, eventList); - threadLocal.remove(); - } - } - } -} diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java deleted file mode 100644 index 53c7d65..0000000 --- a/src/main/java/com/codingapi/dbstream/listener/SQLInsertExecuteListener.java +++ /dev/null @@ -1,60 +0,0 @@ -package com.codingapi.dbstream.listener; - -import com.codingapi.dbstream.DBStreamContext; -import com.codingapi.dbstream.interceptor.SQLExecuteState; -import com.codingapi.dbstream.parser.InsertDBEventParser; -import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.InsertSQLParser; -import com.codingapi.dbstream.stream.DBEvent; -import com.codingapi.dbstream.stream.TransactionEventPools; -import com.codingapi.dbstream.utils.SQLUtils; - -import java.sql.SQLException; -import java.util.List; - -public class SQLInsertExecuteListener implements SQLExecuteListener { - - private final static ThreadLocal threadLocal = new ThreadLocal<>(); - - @Override - public int order() { - return 100; - } - - @Override - public void before(SQLExecuteState executeState) throws SQLException { - String sql = executeState.getSql(); - if (SQLUtils.isInsertSQL(sql)) { - try { - threadLocal.remove(); - InsertSQLParser sqlParser = new InsertSQLParser(sql); - String tableName = sqlParser.getTableName(); - executeState.updateMetaData(tableName); - DbTable dbTable = executeState.getDbTable(tableName); - if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) { - InsertDBEventParser dataParser = new InsertDBEventParser(executeState, sqlParser, dbTable); - dataParser.prepare(); - threadLocal.set(dataParser); - } - } catch (Exception e) { - threadLocal.remove(); - throw new SQLException(e); - } - } - } - - @Override - public void after(SQLExecuteState executeState, Object result) throws SQLException { - String sql = executeState.getSql(); - String transactionKey = executeState.getTransactionKey(); - if (SQLUtils.isInsertSQL(sql)) { - InsertDBEventParser dataParser = threadLocal.get(); - if (dataParser != null) { - List eventList = dataParser.loadEvents(result); - TransactionEventPools.getInstance().addEvents(transactionKey, eventList); - threadLocal.remove(); - } - } - } - -} diff --git a/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java deleted file mode 100644 index 2a05b10..0000000 --- a/src/main/java/com/codingapi/dbstream/listener/SQLUpdateExecuteListener.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.codingapi.dbstream.listener; - -import com.codingapi.dbstream.DBStreamContext; -import com.codingapi.dbstream.interceptor.SQLExecuteState; -import com.codingapi.dbstream.parser.UpdateDBEventParser; -import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.UpdateSQLParser; -import com.codingapi.dbstream.stream.DBEvent; -import com.codingapi.dbstream.stream.TransactionEventPools; -import com.codingapi.dbstream.utils.SQLUtils; - -import java.sql.SQLException; -import java.util.List; - -public class SQLUpdateExecuteListener implements SQLExecuteListener { - - private final static ThreadLocal threadLocal = new ThreadLocal<>(); - - @Override - public int order() { - return 100; - } - - @Override - public void before(SQLExecuteState executeState) throws SQLException { - String sql = executeState.getSql(); - if (SQLUtils.isUpdateSQL(sql)) { - try { - threadLocal.remove(); - UpdateSQLParser sqlParser = new UpdateSQLParser(sql); - String tableName = sqlParser.getTableName(); - executeState.updateMetaData(tableName); - DbTable dbTable = executeState.getDbTable(tableName); - if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) { - UpdateDBEventParser dataParser = new UpdateDBEventParser(executeState, sqlParser, dbTable); - dataParser.prepare(); - threadLocal.set(dataParser); - } - } catch (Exception e) { - threadLocal.remove(); - throw new SQLException(e); - } - } else { - threadLocal.remove(); - } - } - - @Override - public void after(SQLExecuteState executeState, Object result) throws SQLException { - String sql = executeState.getSql(); - String transactionKey = executeState.getTransactionKey(); - if (SQLUtils.isUpdateSQL(sql)) { - UpdateDBEventParser dataParser = threadLocal.get(); - if (dataParser != null) { - List eventList = dataParser.loadEvents(result); - TransactionEventPools.getInstance().addEvents(transactionKey, eventList); - threadLocal.remove(); - } - } - } -} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/SQLDeleteExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/stream/SQLDeleteExecuteListener.java new file mode 100644 index 0000000..5a3de5b --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/SQLDeleteExecuteListener.java @@ -0,0 +1,33 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.interceptor.SQLExecuteState; +import com.codingapi.dbstream.parser.DBEventParser; +import com.codingapi.dbstream.parser.DeleteDBEventParser; +import com.codingapi.dbstream.parser.DeleteSQLParser; +import com.codingapi.dbstream.parser.SQLParser; +import com.codingapi.dbstream.scanner.DbTable; +import com.codingapi.dbstream.utils.SQLUtils; + +public class SQLDeleteExecuteListener extends SQLStreamExecuteListener { + + @Override + public int order() { + return 100; + } + + @Override + public boolean isSupport(String sql) { + return SQLUtils.isDeleteSQL(sql); + } + + @Override + public SQLParser createSQLParser(String sql) { + return new DeleteSQLParser(sql); + } + + @Override + public DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState, SQLParser sqlParser, DbTable dbTable) { + return new DeleteDBEventParser(sqlExecuteState, (DeleteSQLParser) sqlParser, dbTable); + } + +} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/SQLInsertExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/stream/SQLInsertExecuteListener.java new file mode 100644 index 0000000..d951cde --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/SQLInsertExecuteListener.java @@ -0,0 +1,34 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.interceptor.SQLExecuteState; +import com.codingapi.dbstream.parser.DBEventParser; +import com.codingapi.dbstream.parser.InsertDBEventParser; +import com.codingapi.dbstream.parser.InsertSQLParser; +import com.codingapi.dbstream.parser.SQLParser; +import com.codingapi.dbstream.scanner.DbTable; +import com.codingapi.dbstream.utils.SQLUtils; + +public class SQLInsertExecuteListener extends SQLStreamExecuteListener { + + @Override + public int order() { + return 100; + } + + @Override + public boolean isSupport(String sql) { + return SQLUtils.isInsertSQL(sql); + } + + @Override + public SQLParser createSQLParser(String sql) { + return new InsertSQLParser(sql); + } + + @Override + public DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState, SQLParser sqlParser, DbTable dbTable) { + return new InsertDBEventParser(sqlExecuteState, (InsertSQLParser) sqlParser, dbTable); + } + + +} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/SQLStreamExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/stream/SQLStreamExecuteListener.java new file mode 100644 index 0000000..b140f05 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/SQLStreamExecuteListener.java @@ -0,0 +1,80 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.DBStreamContext; +import com.codingapi.dbstream.interceptor.SQLExecuteState; +import com.codingapi.dbstream.listener.SQLExecuteListener; +import com.codingapi.dbstream.parser.*; +import com.codingapi.dbstream.scanner.DbTable; +import com.codingapi.dbstream.stream.DBEvent; +import com.codingapi.dbstream.stream.TransactionEventPools; + +import java.sql.SQLException; +import java.util.List; + +public abstract class SQLStreamExecuteListener implements SQLExecuteListener { + + public abstract boolean isSupport(String sql); + + public abstract SQLParser createSQLParser(String sql); + + public abstract DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState,SQLParser sqlParser,DbTable dbTable); + + @Override + public void before(SQLExecuteState executeState) throws SQLException { + String sql = executeState.getSql(); + if (this.isSupport(sql)) { + try { + ThreadLocalContext.getInstance().remove(); + SQLParser sqlParser = this.createSQLParser(sql); + String tableName = sqlParser.getTableName(); + executeState.updateMetaData(tableName); + DbTable dbTable = executeState.getDbTable(tableName); + if (dbTable != null && DBStreamContext.getInstance().support(executeState.getDriverProperties(), dbTable)) { + if (executeState.isBatchMode()) { + List executeStateList = executeState.getBatchSQLExecuteStateList(); + for (int i = 0; i < executeStateList.size(); i++) { + SQLExecuteState sqlExecuteState = executeStateList.get(i); + DBEventParser dataParser = this.createDbEventParser(sqlExecuteState,sqlParser,dbTable); + dataParser.prepare(); + ThreadLocalContext.getInstance().push(i, dataParser); + } + } else { + DBEventParser dataParser = this.createDbEventParser(executeState,sqlParser,dbTable); + dataParser.prepare(); + ThreadLocalContext.getInstance().push(dataParser); + } + } + } catch (Exception e) { + ThreadLocalContext.getInstance().remove(); + throw new SQLException(e); + } + } + } + + @Override + public void after(SQLExecuteState executeState, Object result) throws SQLException { + String sql = executeState.getSql(); + String transactionKey = executeState.getTransactionKey(); + if (this.isSupport(sql)) { + if (executeState.isBatchMode()) { + List executeStateList = executeState.getBatchSQLExecuteStateList(); + for (int i = 0; i < executeStateList.size(); i++) { + DBEventParser dataParser = ThreadLocalContext.getInstance().get(i); + if (dataParser != null) { + List eventList = dataParser.loadEvents(result); + TransactionEventPools.getInstance().addEvents(transactionKey, eventList); + } + } + ThreadLocalContext.getInstance().remove(); + } else { + DBEventParser dataParser = ThreadLocalContext.getInstance().get(); + if (dataParser != null) { + List eventList = dataParser.loadEvents(result); + TransactionEventPools.getInstance().addEvents(transactionKey, eventList); + } + ThreadLocalContext.getInstance().remove(); + } + } + } + +} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/SQLUpdateExecuteListener.java b/src/main/java/com/codingapi/dbstream/listener/stream/SQLUpdateExecuteListener.java new file mode 100644 index 0000000..0b7f809 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/SQLUpdateExecuteListener.java @@ -0,0 +1,33 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.interceptor.SQLExecuteState; +import com.codingapi.dbstream.parser.DBEventParser; +import com.codingapi.dbstream.parser.SQLParser; +import com.codingapi.dbstream.parser.UpdateDBEventParser; +import com.codingapi.dbstream.parser.UpdateSQLParser; +import com.codingapi.dbstream.scanner.DbTable; +import com.codingapi.dbstream.utils.SQLUtils; + +public class SQLUpdateExecuteListener extends SQLStreamExecuteListener { + + @Override + public int order() { + return 100; + } + + @Override + public boolean isSupport(String sql) { + return SQLUtils.isUpdateSQL(sql); + } + + @Override + public SQLParser createSQLParser(String sql) { + return new UpdateSQLParser(sql); + } + + @Override + public DBEventParser createDbEventParser(SQLExecuteState sqlExecuteState, SQLParser sqlParser, DbTable dbTable) { + return new UpdateDBEventParser(sqlExecuteState, (UpdateSQLParser) sqlParser, dbTable); + } + +} diff --git a/src/main/java/com/codingapi/dbstream/listener/stream/ThreadLocalContext.java b/src/main/java/com/codingapi/dbstream/listener/stream/ThreadLocalContext.java new file mode 100644 index 0000000..0eae441 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/listener/stream/ThreadLocalContext.java @@ -0,0 +1,50 @@ +package com.codingapi.dbstream.listener.stream; + +import com.codingapi.dbstream.parser.DBEventParser; +import lombok.Getter; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class ThreadLocalContext { + + private final Map cache; + + private final ThreadLocal threadLocal = new ThreadLocal<>(); + + private ThreadLocalContext(){ + this.cache = new ConcurrentHashMap<>(); + } + + @Getter + private final static ThreadLocalContext instance = new ThreadLocalContext(); + + public void remove(){ + this.threadLocal.remove(); + } + + public void push(DBEventParser eventParser){ + this.push(0,eventParser); + } + + public void push(int index,DBEventParser eventParser){ + ThreadLocalContext context = threadLocal.get(); + if(context==null){ + context = new ThreadLocalContext(); + threadLocal.set(context); + } + context.cache.put(index,eventParser); + } + + public DBEventParser get() { + return this.get(0); + } + + public DBEventParser get(int index) { + ThreadLocalContext context = threadLocal.get(); + if(context==null){ + return null; + } + return context.cache.get(index); + } +} diff --git a/src/main/java/com/codingapi/dbstream/parser/DBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/DBEventParser.java new file mode 100644 index 0000000..15ad526 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/parser/DBEventParser.java @@ -0,0 +1,13 @@ +package com.codingapi.dbstream.parser; + +import com.codingapi.dbstream.stream.DBEvent; + +import java.sql.SQLException; +import java.util.List; + +public interface DBEventParser { + + void prepare() throws SQLException; + + List loadEvents(Object result) throws SQLException; +} diff --git a/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java index 1914483..29e62d7 100644 --- a/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/DeleteDBEventParser.java @@ -3,7 +3,6 @@ import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.DeleteSQLParser; import com.codingapi.dbstream.stream.DBEvent; import com.codingapi.dbstream.stream.EventType; import com.codingapi.dbstream.utils.ResultSetUtils; @@ -14,7 +13,7 @@ import java.util.List; import java.util.Map; -public class DeleteDBEventParser { +public class DeleteDBEventParser implements DBEventParser { private final List> prepareList = new ArrayList<>(); diff --git a/src/main/java/com/codingapi/dbstream/sqlparser/DeleteSQLParser.java b/src/main/java/com/codingapi/dbstream/parser/DeleteSQLParser.java similarity index 95% rename from src/main/java/com/codingapi/dbstream/sqlparser/DeleteSQLParser.java rename to src/main/java/com/codingapi/dbstream/parser/DeleteSQLParser.java index b92a1da..77f1512 100644 --- a/src/main/java/com/codingapi/dbstream/sqlparser/DeleteSQLParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/DeleteSQLParser.java @@ -1,11 +1,11 @@ -package com.codingapi.dbstream.sqlparser; +package com.codingapi.dbstream.parser; import com.codingapi.dbstream.utils.SQLUtils; import java.util.regex.Matcher; import java.util.regex.Pattern; -public class DeleteSQLParser { +public class DeleteSQLParser implements SQLParser { private final String sql; diff --git a/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java index 4179ad9..493cf46 100644 --- a/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/InsertDBEventParser.java @@ -3,7 +3,6 @@ import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.InsertSQLParser; import com.codingapi.dbstream.stream.DBEvent; import com.codingapi.dbstream.stream.EventType; import com.codingapi.dbstream.utils.ResultSetUtils; @@ -15,7 +14,7 @@ import java.util.List; import java.util.Map; -public class InsertDBEventParser { +public class InsertDBEventParser implements DBEventParser{ private final InsertSQLParser sqlParser; private final SQLExecuteState executeState; @@ -103,6 +102,7 @@ private void loadSelectInsertDataList() throws SQLException { private void loadDefaultInsertDataList() throws SQLException { List values = this.sqlParser.getValues(); + List paramList = this.executeState.getListParams(); Map data = new HashMap<>(); for (int i = 0; i < columns.size(); i++) { String column = columns.get(i); @@ -118,7 +118,7 @@ private void loadDefaultInsertDataList() throws SQLException { } } } else if (insertValue.isJdbc()) { - value = i + 1; + value = paramList.get(i); } else { value = insertValue.getValue(); } diff --git a/src/main/java/com/codingapi/dbstream/sqlparser/InsertSQLParser.java b/src/main/java/com/codingapi/dbstream/parser/InsertSQLParser.java similarity index 97% rename from src/main/java/com/codingapi/dbstream/sqlparser/InsertSQLParser.java rename to src/main/java/com/codingapi/dbstream/parser/InsertSQLParser.java index 52f14f7..c1e80ee 100644 --- a/src/main/java/com/codingapi/dbstream/sqlparser/InsertSQLParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/InsertSQLParser.java @@ -1,4 +1,4 @@ -package com.codingapi.dbstream.sqlparser; +package com.codingapi.dbstream.parser; import com.codingapi.dbstream.utils.SQLUtils; import lombok.Getter; @@ -9,7 +9,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -public class InsertSQLParser { +public class InsertSQLParser implements SQLParser{ // 匹配 INSERT INTO table_name (...) private static final Pattern TABLE_NAME_PATTERN = Pattern.compile( diff --git a/src/main/java/com/codingapi/dbstream/parser/SQLParser.java b/src/main/java/com/codingapi/dbstream/parser/SQLParser.java new file mode 100644 index 0000000..5f83c36 --- /dev/null +++ b/src/main/java/com/codingapi/dbstream/parser/SQLParser.java @@ -0,0 +1,6 @@ +package com.codingapi.dbstream.parser; + +public interface SQLParser { + + String getTableName(); +} diff --git a/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java b/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java index abf0d62..1c363ae 100644 --- a/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/UpdateDBEventParser.java @@ -3,7 +3,6 @@ import com.codingapi.dbstream.interceptor.SQLExecuteState; import com.codingapi.dbstream.scanner.DbColumn; import com.codingapi.dbstream.scanner.DbTable; -import com.codingapi.dbstream.sqlparser.UpdateSQLParser; import com.codingapi.dbstream.stream.DBEvent; import com.codingapi.dbstream.stream.EventType; import com.codingapi.dbstream.utils.ResultSetUtils; @@ -14,7 +13,7 @@ import java.util.List; import java.util.Map; -public class UpdateDBEventParser { +public class UpdateDBEventParser implements DBEventParser{ private final UpdateSQLParser sqlParser; private final String aliasTable; diff --git a/src/main/java/com/codingapi/dbstream/sqlparser/UpdateSQLParser.java b/src/main/java/com/codingapi/dbstream/parser/UpdateSQLParser.java similarity index 97% rename from src/main/java/com/codingapi/dbstream/sqlparser/UpdateSQLParser.java rename to src/main/java/com/codingapi/dbstream/parser/UpdateSQLParser.java index 5e4f4da..e3b3e39 100644 --- a/src/main/java/com/codingapi/dbstream/sqlparser/UpdateSQLParser.java +++ b/src/main/java/com/codingapi/dbstream/parser/UpdateSQLParser.java @@ -1,4 +1,4 @@ -package com.codingapi.dbstream.sqlparser; +package com.codingapi.dbstream.parser; import com.codingapi.dbstream.utils.SQLUtils; @@ -7,7 +7,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -public class UpdateSQLParser { +public class UpdateSQLParser implements SQLParser{ private final String sql; diff --git a/src/main/java/com/codingapi/dbstream/proxy/CallableStatementProxy.java b/src/main/java/com/codingapi/dbstream/proxy/CallableStatementProxy.java index 595f835..c1a25c3 100644 --- a/src/main/java/com/codingapi/dbstream/proxy/CallableStatementProxy.java +++ b/src/main/java/com/codingapi/dbstream/proxy/CallableStatementProxy.java @@ -799,6 +799,7 @@ public void setBinaryStream(int parameterIndex, InputStream x, int length) throw @Override public void clearParameters() throws SQLException { callableStatement.clearParameters(); + this.executeState.cleanParams(); } @Override @@ -824,6 +825,7 @@ public boolean execute() throws SQLException { @Override public void addBatch() throws SQLException { callableStatement.addBatch(); + this.executeState.addBatch(); } @Override @@ -1153,11 +1155,13 @@ public int getResultSetType() throws SQLException { @Override public void addBatch(String sql) throws SQLException { callableStatement.addBatch(sql); + this.executeState.addBatch(sql); } @Override public void clearBatch() throws SQLException { callableStatement.clearBatch(); + this.executeState.clearBatch(); } @Override diff --git a/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java b/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java index 8c132b1..4f94925 100644 --- a/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java +++ b/src/main/java/com/codingapi/dbstream/proxy/PreparedStatementProxy.java @@ -147,6 +147,7 @@ public void setBinaryStream(int parameterIndex, InputStream x, int length) throw @Override public void clearParameters() throws SQLException { preparedStatement.clearParameters(); + this.executeState.cleanParams(); } @Override @@ -172,6 +173,7 @@ public boolean execute() throws SQLException { @Override public void addBatch() throws SQLException { preparedStatement.addBatch(); + this.executeState.addBatch(); } @Override @@ -509,11 +511,13 @@ public int getResultSetType() throws SQLException { @Override public void addBatch(String sql) throws SQLException { preparedStatement.addBatch(sql); + this.executeState.addBatch(sql); } @Override public void clearBatch() throws SQLException { preparedStatement.clearBatch(); + this.executeState.clearBatch(); } @Override diff --git a/src/test/java/com/codingapi/dbstream/sqlparser/DeleteSQLParserTest.java b/src/test/java/com/codingapi/dbstream/sqlparser/DeleteSQLParserTest.java index 492d800..0bbda52 100644 --- a/src/test/java/com/codingapi/dbstream/sqlparser/DeleteSQLParserTest.java +++ b/src/test/java/com/codingapi/dbstream/sqlparser/DeleteSQLParserTest.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.sqlparser; +import com.codingapi.dbstream.parser.DeleteSQLParser; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvFileSource; diff --git a/src/test/java/com/codingapi/dbstream/sqlparser/InsertSQLParserTest.java b/src/test/java/com/codingapi/dbstream/sqlparser/InsertSQLParserTest.java index b7c68d9..18d4911 100644 --- a/src/test/java/com/codingapi/dbstream/sqlparser/InsertSQLParserTest.java +++ b/src/test/java/com/codingapi/dbstream/sqlparser/InsertSQLParserTest.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.sqlparser; +import com.codingapi.dbstream.parser.InsertSQLParser; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvFileSource; diff --git a/src/test/java/com/codingapi/dbstream/sqlparser/UpdateSQLParserTest.java b/src/test/java/com/codingapi/dbstream/sqlparser/UpdateSQLParserTest.java index 969997c..917b4b1 100644 --- a/src/test/java/com/codingapi/dbstream/sqlparser/UpdateSQLParserTest.java +++ b/src/test/java/com/codingapi/dbstream/sqlparser/UpdateSQLParserTest.java @@ -1,5 +1,6 @@ package com.codingapi.dbstream.sqlparser; +import com.codingapi.dbstream.parser.UpdateSQLParser; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvFileSource; diff --git a/src/test/java/com/example/dbstream/entity/User1.java b/src/test/java/com/example/dbstream/entity/User1.java index 5801620..4c73faf 100644 --- a/src/test/java/com/example/dbstream/entity/User1.java +++ b/src/test/java/com/example/dbstream/entity/User1.java @@ -1,8 +1,8 @@ package com.example.dbstream.entity; +import jakarta.persistence.*; import lombok.Data; -import javax.persistence.*; @Data @Entity @@ -10,8 +10,8 @@ public class User1 { @Id - @GeneratedValue(strategy = GenerationType.IDENTITY) - private long id; + @GeneratedValue(strategy = GenerationType.AUTO) + private Long id; private String username; diff --git a/src/test/java/com/example/dbstream/entity/User2.java b/src/test/java/com/example/dbstream/entity/User2.java index 55c50d1..dd63f9c 100644 --- a/src/test/java/com/example/dbstream/entity/User2.java +++ b/src/test/java/com/example/dbstream/entity/User2.java @@ -2,7 +2,7 @@ import lombok.Data; -import javax.persistence.*; +import jakarta.persistence.*; @Data @Entity diff --git a/src/test/java/com/example/dbstream/repository/User2Repository.java b/src/test/java/com/example/dbstream/repository/User2Repository.java index c126588..cbdd231 100644 --- a/src/test/java/com/example/dbstream/repository/User2Repository.java +++ b/src/test/java/com/example/dbstream/repository/User2Repository.java @@ -25,7 +25,7 @@ public interface User2Repository extends JpaRepository { int counts(); @Modifying - @Query("update User2 set password = now() where username = ?1") + @Query("update User2 set password = '123' where username = ?1") int resetPasswordByUsername1(String username); @Modifying diff --git a/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java b/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java index e85ea3b..f1a5920 100644 --- a/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java +++ b/src/test/java/com/example/dbstream/tests/User1RepositoryTest.java @@ -2,19 +2,17 @@ import com.codingapi.dbstream.DBStreamContext; -import com.codingapi.dbstream.stream.DBEvent; -import com.codingapi.dbstream.stream.DBEventPusher; import com.example.dbstream.entity.User1; import com.example.dbstream.listener.MySQLListener; import com.example.dbstream.repository.User1Repository; +import jakarta.persistence.EntityManager; +import jakarta.transaction.Transactional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; -import javax.transaction.Transactional; -import java.util.ArrayList; import java.util.List; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -26,15 +24,18 @@ class User1RepositoryTest { @Autowired private User1Repository userRepository; + @Autowired + private EntityManager entityManager; + @BeforeEach void setUp() { - DBStreamContext.getInstance().addEventPusher(new DBEventPusher() { - @Override - public void push(List events) { - System.out.println(events); - } - }); +// DBStreamContext.getInstance().addEventPusher(new DBEventPusher() { +// @Override +// public void push(List events) { +// System.out.println(events); +// } +// }); } /** @@ -171,32 +172,25 @@ void test7() { */ @Test @Transactional - @Rollback(false) + @Rollback(value = false) void test8() { + List list = userRepository.findAll(); + for (User1 user:list){ + user.setEmail("111"); + } + userRepository.saveAll(list); - DBStreamContext.getInstance().addListener(new MySQLListener()); - - List list = new ArrayList<>(); - - - for (int i=0;i<10;i++){ + userRepository.deleteAll(); + for (int i=0;i<5;i++){ User1 user1 = new User1(); user1.setUsername("admin1"); user1.setPassword("admin1"); user1.setEmail("admin1@example.com"); user1.setNickname("admin1"); - list.add(user1); - - User1 user2 = new User1(); - user2.setUsername("admin2"); - user2.setPassword("admin2"); - user2.setEmail("admin2@example.com"); - user2.setNickname("admin2"); - list.add(user2); + entityManager.persist(user1); } - - userRepository.saveAll(list); - + entityManager.flush(); + entityManager.clear(); } diff --git a/src/test/java/com/example/dbstream/tests/User2RepositoryTest.java b/src/test/java/com/example/dbstream/tests/User2RepositoryTest.java index 4251ec3..6c45119 100644 --- a/src/test/java/com/example/dbstream/tests/User2RepositoryTest.java +++ b/src/test/java/com/example/dbstream/tests/User2RepositoryTest.java @@ -5,12 +5,12 @@ import com.example.dbstream.entity.User2; import com.example.dbstream.listener.MySQLListener; import com.example.dbstream.repository.User2Repository; +import jakarta.transaction.Transactional; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.annotation.Rollback; -import javax.transaction.Transactional; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/src/test/resources/application.properties b/src/test/resources/application.properties index 6b969cb..37aec93 100644 --- a/src/test/resources/application.properties +++ b/src/test/resources/application.properties @@ -1,18 +1,11 @@ spring.application.name=example spring.datasource.driver-class-name=com.codingapi.dbstream.driver.DBStreamProxyDriver spring.datasource.url=jdbc:h2:file:./test.db -spring.jpa.show-sql=true spring.jpa.hibernate.ddl-auto=update -spring.jpa.database-platform=org.hibernate.dialect.H2Dialect -# -# -#spring.application.name=example -#spring.datasource.driver-class-name=com.codingapi.dbstream.driver.DBStreamProxyDriver -#spring.datasource.url=jdbc:dm://10.13.14.227:5236/TOBACCO_HR_NINGXIA_DEV?localTimezone=480 -#spring.datasource.username=TOBACCO_HR_NINGXIA_DEV -#spring.datasource.password=NX_TOBACCO_HR2025 -#spring.jpa.show-sql=true -#spring.jpa.hibernate.ddl-auto=update -#spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.DmDialect -#spring.jpa.database-platform=org.hibernate.dialect.DmDialect \ No newline at end of file +#spring.jpa.properties.hibernate.jdbc.batch_size=1000 +#spring.jpa.properties.hibernate.order_inserts=true +#spring.jpa.properties.hibernate.order_updates=true +#spring.jpa.properties.hibernate.order_deletes=true +#spring.jpa.properties.hibernate.batch_versioned_data=true +#spring.jpa.properties.hibernate.flush.mode=COMMIT