Skip to content

Commit

Permalink
fix #36 cannot reuse ShardingPreparedStatement
Browse files Browse the repository at this point in the history
fix #114 ShardingPreparedStatement batch execution parse sql
  • Loading branch information
hanahmily authored and gaoht committed Jul 6, 2016
1 parent fe49c45 commit 1f7ed61
Show file tree
Hide file tree
Showing 27 changed files with 571 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEventBus;
import com.dangdang.ddframe.rdb.sharding.executor.event.EventExecutionType;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.BatchPreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.google.common.base.Optional;
Expand Down Expand Up @@ -194,6 +195,80 @@ private boolean executeInternal(final PreparedStatementExecutorWrapper preparedS
return result;
}


/**
* 执行批量接口.
*
* @return 每个
*/
public int[] executeBatch() {
Context context = MetricsContext.start("ShardingPreparedStatement-executeUpdate");
postExecutionEvents();
final boolean isExceptionThrown = ExecutorExceptionHandler.isExceptionThrown();
final Map<String, Object> dataMap = ExecutorDataMap.getDataMap();
try {
if (1 == preparedStatementExecutorWrappers.size()) {
return executeBatchInternal((BatchPreparedStatementExecutorWrapper) preparedStatementExecutorWrappers.iterator().next(), isExceptionThrown, dataMap);
}
return executorEngine.execute(preparedStatementExecutorWrappers, new ExecuteUnit<PreparedStatementExecutorWrapper, int[]>() {

@Override
public int[] execute(final PreparedStatementExecutorWrapper input) throws Exception {
return executeBatchInternal((BatchPreparedStatementExecutorWrapper) input, isExceptionThrown, dataMap);
}
}, new MergeUnit<int[], int[]>() {

@Override
public int[] merge(final List<int[]> results) {
if (null == results) {
return new int[]{0};
}
int length = 0;
for (int[] array : results) {
length += array.length;
}
int[] result = new int[length];
int pos = 0;
for (int[] array : results) {
System.arraycopy(array, 0, result, pos, array.length);
pos += array.length;
}
return result;
}
});
} finally {
MetricsContext.stop(context);
}
}

private int[] executeBatchInternal(final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper, final boolean isExceptionThrown, final Map<String, Object> dataMap) {
int[] result;
ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);
ExecutorDataMap.setDataMap(dataMap);
try {
result = batchPreparedStatementExecutorWrapper.getPreparedStatement().executeBatch();
} catch (final SQLException ex) {
postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper, EventExecutionType.EXECUTE_FAILURE, Optional.of(ex));
ExecutorExceptionHandler.handleException(ex);
return null;
}
postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper);
return result;
}

private void postBatchExecutionEventsAfterExecution(final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper) {
postBatchExecutionEventsAfterExecution(batchPreparedStatementExecutorWrapper, EventExecutionType.EXECUTE_SUCCESS, Optional.<SQLException>absent());
}

private void postBatchExecutionEventsAfterExecution(
final BatchPreparedStatementExecutorWrapper batchPreparedStatementExecutorWrapper, final EventExecutionType eventExecutionType, final Optional<SQLException> exp) {
for (DMLExecutionEvent each : batchPreparedStatementExecutorWrapper.getDmlExecutionEvents()) {
each.setEventExecutionType(eventExecutionType);
each.setExp(exp);
DMLExecutionEventBus.post(each);
}
}

private void postExecutionEvents() {
for (PreparedStatementExecutorWrapper each : preparedStatementExecutorWrappers) {
if (each.getDMLExecutionEvent().isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package com.dangdang.ddframe.rdb.sharding.executor.wrapper;

import com.dangdang.ddframe.rdb.sharding.executor.event.DMLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import lombok.Getter;

import java.sql.PreparedStatement;
import java.util.LinkedList;
import java.util.List;

/**
* 批量操作执行上下文.
* 批量操作只支持DML语句,故只包含DML操作的事件.
*
* @author gaohongtao
*/
@Getter
public class BatchPreparedStatementExecutorWrapper extends PreparedStatementExecutorWrapper {

private final List<DMLExecutionEvent> dmlExecutionEvents = new LinkedList<>();

public BatchPreparedStatementExecutorWrapper(final PreparedStatement preparedStatement, final List<Object> parameters, final SQLExecutionUnit sqlExecutionUnit) {
super(preparedStatement, parameters, sqlExecutionUnit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.dangdang.ddframe.rdb.sharding.executor.event.DQLExecutionEvent;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import lombok.Getter;

import java.sql.PreparedStatement;
Expand All @@ -31,7 +32,7 @@
*
* @author zhangliang
*/
public final class PreparedStatementExecutorWrapper extends AbstractExecutorWrapper {
public class PreparedStatementExecutorWrapper extends AbstractExecutorWrapper {

@Getter
private final PreparedStatement preparedStatement;
Expand All @@ -45,10 +46,10 @@ public PreparedStatementExecutorWrapper(final PreparedStatement preparedStatemen
super(sqlExecutionUnit);
this.preparedStatement = preparedStatement;
if (isDML()) {
dmlExecutionEvent = Optional.of(new DMLExecutionEvent(getSqlExecutionUnit().getDataSource(), getSqlExecutionUnit().getSql(), parameters));
dmlExecutionEvent = Optional.of(new DMLExecutionEvent(getSqlExecutionUnit().getDataSource(), getSqlExecutionUnit().getSql(), Lists.newArrayList(parameters)));
dqlExecutionEvent = Optional.absent();
} else if (isDQL()) {
dqlExecutionEvent = Optional.of(new DQLExecutionEvent(getSqlExecutionUnit().getDataSource(), getSqlExecutionUnit().getSql(), parameters));
dqlExecutionEvent = Optional.of(new DQLExecutionEvent(getSqlExecutionUnit().getDataSource(), getSqlExecutionUnit().getSql(), Lists.newArrayList(parameters)));
dmlExecutionEvent = Optional.absent();
} else {
dmlExecutionEvent = Optional.absent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,23 @@
package com.dangdang.ddframe.rdb.sharding.jdbc;

import com.dangdang.ddframe.rdb.sharding.executor.PreparedStatementExecutor;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.BatchPreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.jdbc.adapter.AbstractPreparedStatementAdapter;
import com.dangdang.ddframe.rdb.sharding.merger.ResultSetFactory;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.MergeContext;
import com.dangdang.ddframe.rdb.sharding.router.PreparedSQLRouter;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import com.dangdang.ddframe.rdb.sharding.router.SQLRouteResult;
import com.google.common.base.Function;
import com.google.common.collect.Lists;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* 支持分片的预编译语句对象.
Expand All @@ -45,20 +44,16 @@
*/
public final class ShardingPreparedStatement extends AbstractPreparedStatementAdapter {

private final String sql;
private final PreparedSQLRouter preparedSQLRouter;

private final List<PreparedStatementExecutorWrapper> cachedRoutedPreparedStatements = new LinkedList<>();
private final Map<PreparedStatement, PreparedStatementExecutorWrapper> cachedRoutePreparedStatementMap = new HashMap<>();

private Integer autoGeneratedKeys;

private int[] columnIndexes;

private String[] columnNames;

private boolean hasExecuted;

private final List<List<Object>> batchParameters = new ArrayList<>();

public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql) throws SQLException {
this(shardingConnection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);
}
Expand All @@ -69,9 +64,9 @@ public ShardingPreparedStatement(final ShardingConnection shardingConnection,
}

public ShardingPreparedStatement(final ShardingConnection shardingConnection,
final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException {
final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) {
super(shardingConnection, resultSetType, resultSetConcurrency, resultSetHoldability);
this.sql = sql;
preparedSQLRouter = shardingConnection.getShardingContext().getSqlRouteEngine().prepareSQL(sql);
}

public ShardingPreparedStatement(final ShardingConnection shardingConnection, final String sql, final int autoGeneratedKeys) throws SQLException {
Expand All @@ -91,90 +86,92 @@ public ShardingPreparedStatement(final ShardingConnection shardingConnection, fi

@Override
public ResultSet executeQuery() throws SQLException {
hasExecuted = true;
setCurrentResultSet(ResultSetFactory.getResultSet(
new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).executeQuery(), getMergeContext()));
return getCurrentResultSet();
ResultSet rs;
try {
rs = ResultSetFactory.getResultSet(
new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).executeQuery(), getMergeContext());
} finally {
clearRouteContext();
}
setCurrentResultSet(rs);
return rs;
}

@Override
public int executeUpdate() throws SQLException {
hasExecuted = true;
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).executeUpdate();
try {
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).executeUpdate();
} finally {
clearRouteContext();
}
}

@Override
public boolean execute() throws SQLException {
hasExecuted = true;
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), getRoutedPreparedStatements()).execute();
try {
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routeSQL()).execute();
} finally {
clearRouteContext();
}
}

private void clearRouteContext() throws SQLException {
clearParameters();
setCurrentResultSet(null);
}

@Override
public void addBatch() throws SQLException {
batchParameters.add(Lists.newArrayList(getParameters()));
getParameters().clear();
try {
for (PreparedStatementExecutorWrapper each : routeSQL()) {
each.getPreparedStatement().addBatch();
BatchPreparedStatementExecutorWrapper wrapper;
if (cachedRoutePreparedStatementMap.containsKey(each.getPreparedStatement())) {
wrapper = (BatchPreparedStatementExecutorWrapper) cachedRoutePreparedStatementMap.get(each.getPreparedStatement());
} else {
wrapper = new BatchPreparedStatementExecutorWrapper(each.getPreparedStatement(), getParameters(), each.getSqlExecutionUnit());
cachedRoutePreparedStatementMap.put(each.getPreparedStatement(), wrapper);
}
if (each.getDMLExecutionEvent().isPresent()) {
wrapper.getDmlExecutionEvents().add(each.getDMLExecutionEvent().get());
}
}
} finally {
clearRouteContext();
}
}

@Override
public void clearBatch() throws SQLException {
batchParameters.clear();
cachedRoutePreparedStatementMap.clear();
clearRouteContext();
}

@Override
public int[] executeBatch() throws SQLException {
hasExecuted = true;
int[] result = new int[batchParameters.size()];
int i = 0;
for (List<Object> each : batchParameters) {
List<PreparedStatementExecutorWrapper> routePreparedStatements = routeSQL(each);
cachedRoutedPreparedStatements.addAll(routePreparedStatements);
result[i++] = new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), routePreparedStatements).executeUpdate();
}
return result;
}

private List<PreparedStatementExecutorWrapper> getRoutedPreparedStatements() throws SQLException {
if (!hasExecuted) {
return Collections.emptyList();
}
routeIfNeed();
return cachedRoutedPreparedStatements;
}


@Override
public List<? extends Statement> getRoutedStatements() throws SQLException {
return Lists.transform(getRoutedPreparedStatements(), new Function<PreparedStatementExecutorWrapper, Statement>() {

@Override
public Statement apply(final PreparedStatementExecutorWrapper input) {
return input.getPreparedStatement();
}
});
}

private void routeIfNeed() throws SQLException {
if (!cachedRoutedPreparedStatements.isEmpty()) {
return;
try {
return new PreparedStatementExecutor(getShardingConnection().getShardingContext().getExecutorEngine(), cachedRoutePreparedStatementMap.values()).executeBatch();
} finally {
clearBatch();
}
cachedRoutedPreparedStatements.addAll(routeSQL(getParameters()));
}

private List<PreparedStatementExecutorWrapper> routeSQL(final List<Object> parameters) throws SQLException {
private List<PreparedStatementExecutorWrapper> routeSQL() throws SQLException {
List<Object> parameters = getParameters();
List<PreparedStatementExecutorWrapper> result = new ArrayList<>();
SQLRouteResult sqlRouteResult = getShardingConnection().getShardingContext().getSqlRouteEngine().route(sql, parameters);
SQLRouteResult sqlRouteResult = preparedSQLRouter.route(parameters);
MergeContext mergeContext = sqlRouteResult.getMergeContext();
setMergeContext(mergeContext);
for (SQLExecutionUnit each : sqlRouteResult.getExecutionUnits()) {
PreparedStatement preparedStatement = generatePrepareStatement(getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatementType()), each.getSql());
PreparedStatement preparedStatement = (PreparedStatement) getStatement(getShardingConnection().getConnection(each.getDataSource(), sqlRouteResult.getSqlStatementType()), each.getSql());
replayMethodsInvocation(preparedStatement);
setParameters(preparedStatement, parameters);
result.add(new PreparedStatementExecutorWrapper(preparedStatement, parameters, each));
}
return result;
}

private PreparedStatement generatePrepareStatement(final Connection conn, final String shardingSql) throws SQLException {
protected PreparedStatement generateStatement(final Connection conn, final String shardingSql) throws SQLException {
if (null != autoGeneratedKeys) {
return conn.prepareStatement(shardingSql, autoGeneratedKeys);
}
Expand Down
Loading

0 comments on commit 1f7ed61

Please sign in to comment.