diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/PreparedStatementExecutor.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/PreparedStatementExecutor.java index e70aeb53af5d5..e83bf5a500a83 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/PreparedStatementExecutor.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/PreparedStatementExecutor.java @@ -204,8 +204,9 @@ private boolean executeInternal(final PreparedStatementExecutorWrapper preparedS * 执行批量接口. * * @return 每个 + * @param batchSize 批量执行语句总数 */ - public int[] executeBatch() { + public int[] executeBatch(final int batchSize) { Context context = MetricsContext.start("ShardingPreparedStatement-executeUpdate"); eventPostman.postExecutionEvents(); final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown(); @@ -229,15 +230,13 @@ public int[] merge(final List results) { if (null == results) { return new int[]{0}; } - int length = 0; - for (int[] array : results) { - length += array.length; - } - int[] result = new int[length]; - int pos = 0; - for (int[] array : results) { - System.arraycopy(array, 0, result, pos, array.length); - pos += array.length; + int[] result = new int[batchSize]; + int i = 0; + for (PreparedStatementExecutorWrapper each : preparedStatementExecutorWrappers) { + for (Integer[] indices : each.getBatchIndices()) { + result[indices[0]] += results.get(i)[indices[1]]; + } + i++; } return result; } diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/wrapper/PreparedStatementExecutorWrapper.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/wrapper/PreparedStatementExecutorWrapper.java index 51e4ceb2fd072..f434c52e3a014 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/wrapper/PreparedStatementExecutorWrapper.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/executor/wrapper/PreparedStatementExecutorWrapper.java @@ -26,6 +26,7 @@ import lombok.Getter; import java.sql.PreparedStatement; +import java.util.ArrayList; import java.util.List; /** @@ -42,6 +43,11 @@ public class PreparedStatementExecutorWrapper extends AbstractExecutorWrapper { private final Optional dqlExecutionEvent; + @Getter + private final List batchIndices = new ArrayList<>(); + + private int batchIndex; + public PreparedStatementExecutorWrapper(final PreparedStatement preparedStatement, final List parameters, final SQLExecutionUnit sqlExecutionUnit) { super(sqlExecutionUnit); @@ -77,4 +83,14 @@ public void addBatchParameters(final List parameters) { Preconditions.checkArgument(isDML() && dmlExecutionEvent.isPresent()); dmlExecutionEvent.get().addBatchParameters(Lists.newArrayList(parameters)); } + + /** + * 映射批量执行索引. + * 将{@linkplain com.dangdang.ddframe.rdb.sharding.jdbc.ShardingPreparedStatement}批量执行索引映射为真实的{@linkplain PreparedStatement}批量执行索引. + * + * @param shardingBatchIndex 分片批量执行索引 + */ + public void mapBatchIndex(final int shardingBatchIndex) { + batchIndices.add(new Integer[]{shardingBatchIndex, this.batchIndex++}); + } } diff --git a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java index 5a2ac3ab77f8e..cef64a620ebd8 100644 --- a/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java +++ b/sharding-jdbc-core/src/main/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatement.java @@ -55,6 +55,8 @@ public final class ShardingPreparedStatement extends AbstractPreparedStatementAd private String[] columnNames; + private int batchIndex; + ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql) { this(shardingConnection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT); } @@ -119,6 +121,7 @@ public boolean execute() throws SQLException { protected void clearRouteContext() throws SQLException { resetBatch(); cachedPreparedStatementWrappers.clear(); + batchIndex = 0; } @Override @@ -131,7 +134,9 @@ public void addBatch() throws SQLException { try { for (PreparedStatementExecutorWrapper each : routeSQL()) { each.getPreparedStatement().addBatch(); + each.mapBatchIndex(batchIndex); } + batchIndex++; getGeneratedKeyContext().addRow(); } finally { resetBatch(); @@ -146,7 +151,7 @@ private void resetBatch() throws SQLException { @Override public int[] executeBatch() throws SQLException { try { - return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedPreparedStatementWrappers).executeBatch(); + return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedPreparedStatementWrappers).executeBatch(batchIndex); } finally { clearRouteContext(); } diff --git a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatementTest.java b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatementTest.java index 9ceb9c36fb32c..331283f7bb149 100644 --- a/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatementTest.java +++ b/sharding-jdbc-core/src/test/java/com/dangdang/ddframe/rdb/sharding/jdbc/ShardingPreparedStatementTest.java @@ -336,6 +336,32 @@ public void assertAddBatchWithAutoIncrementColumn() throws SQLException { } } + @Test + public void assertUpdateBatch() throws SQLException { + String sql = "UPDATE `t_order` SET `status` = ? WHERE `status` = ?"; + try ( + Connection connection = shardingDataSource.getConnection(); + PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + preparedStatement.setString(1, "batch"); + preparedStatement.setString(2, "init"); + preparedStatement.addBatch(); + preparedStatement.setString(1, "batch"); + preparedStatement.setString(2, "init"); + preparedStatement.addBatch(); + preparedStatement.setString(1, "init"); + preparedStatement.setString(2, "batch"); + preparedStatement.addBatch(); + + int[] result = preparedStatement.executeBatch(); + assertThat(result.length, is(3)); + assertThat(result[0], is(40)); + assertThat(result[1], is(0)); + assertThat(result[2], is(40)); + } finally { + DMLExecutionEventBus.clearListener(); + } + } + @Test public void assertClearBatch() throws SQLException { String sql = "INSERT INTO `t_order`(`order_id`, `user_id`, `status`) VALUES (?,?,?)"; diff --git a/sharding-jdbc-doc/content/03-community/release-notes.md b/sharding-jdbc-doc/content/03-community/release-notes.md index 6dc94143b0931..5ab94a1adead9 100644 --- a/sharding-jdbc-doc/content/03-community/release-notes.md +++ b/sharding-jdbc-doc/content/03-community/release-notes.md @@ -12,18 +12,14 @@ next = "/03-community/directory-structure" ### 功能提升 1. [ISSUE #219](https://github.com/dangdangdotcom/sharding-jdbc/issues/219) 线程性能优化 +1. [ISSUE #215](https://github.com/dangdangdotcom/sharding-jdbc/issues/215) 流式排序的聚集结果集 StreamingOrderByReducerResultSet性能优化 +1. [ISSUE #161](https://github.com/dangdangdotcom/sharding-jdbc/issues/161) 结果集归并的时候可以采用堆排序来提升性能 ### 缺陷修正 1. [ISSUE #212](https://github.com/dangdangdotcom/sharding-jdbc/issues/212) 对去缺少数据源规则给出更有意义的提示 1. [ISSUE #214](https://github.com/dangdangdotcom/sharding-jdbc/issues/214) where中 table_name.column_name in (?,?)无法解析表达式 - -## 1.4.2 - -### 功能提升 - -1. [ISSUE #215](https://github.com/dangdangdotcom/sharding-jdbc/issues/215) 流式排序的聚集结果集 StreamingOrderByReducerResultSet性能优化 -1. [ISSUE #161](https://github.com/dangdangdotcom/sharding-jdbc/issues/161) 结果集归并的时候可以采用堆排序来提升性能 +1. [ISSUE #180](https://github.com/dangdangdotcom/sharding-jdbc/issues/180) 批量执行Update返回值不准确 ## 1.4.1