From bf297e662753ec1974f92cb531ef3bbbfdbaa99a Mon Sep 17 00:00:00 2001 From: Evgenij Ryazanov Date: Sun, 4 Feb 2024 12:13:47 +0800 Subject: [PATCH 1/2] Optimize execute[Large]Batch methods --- h2/src/main/org/h2/command/Command.java | 161 ++++++++++------ .../main/org/h2/command/CommandInterface.java | 25 ++- h2/src/main/org/h2/command/CommandList.java | 6 +- h2/src/main/org/h2/command/CommandRemote.java | 146 ++++++++++++--- h2/src/main/org/h2/engine/Constants.java | 8 +- h2/src/main/org/h2/engine/SessionRemote.java | 15 +- .../org/h2/jdbc/JdbcPreparedStatement.java | 112 +++++------ h2/src/main/org/h2/jdbc/JdbcStatement.java | 57 +++--- h2/src/main/org/h2/result/BatchResult.java | 40 ++++ .../main/org/h2/server/TcpServerThread.java | 176 +++++++++++------- 10 files changed, 513 insertions(+), 233 deletions(-) create mode 100644 h2/src/main/org/h2/result/BatchResult.java diff --git a/h2/src/main/org/h2/command/Command.java b/h2/src/main/org/h2/command/Command.java index 6b048fb180..ba47cdd55c 100644 --- a/h2/src/main/org/h2/command/Command.java +++ b/h2/src/main/org/h2/command/Command.java @@ -6,6 +6,7 @@ package org.h2.command; import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Set; import org.h2.api.ErrorCode; @@ -18,10 +19,13 @@ import org.h2.expression.ParameterInterface; import org.h2.message.DbException; import org.h2.message.Trace; +import org.h2.result.BatchResult; +import org.h2.result.MergedResult; import org.h2.result.ResultInterface; import org.h2.result.ResultWithGeneratedKeys; import org.h2.result.ResultWithPaddedStrings; import org.h2.util.Utils; +import org.h2.value.Value; /** * Represents a SQL statement. This object is only used on the server side. @@ -151,10 +155,10 @@ protected void checkCanceled() { } @Override - public void stop() { + public void stop(boolean commitIfAutoCommit) { if (session.isOpen()) { commitIfNonTransactional(); - if (isTransactional() && session.getAutoCommit()) { + if (commitIfAutoCommit && isTransactional() && session.getAutoCommit()) { session.commit(false); } } @@ -228,7 +232,7 @@ public ResultInterface executeQuery(long maxrows, boolean scrollable) { session.resetThreadLocalSession(oldSession); session.endStatement(); if (callStop) { - stop(); + stop(true); } } } finally { @@ -238,74 +242,115 @@ public ResultInterface executeQuery(long maxrows, boolean scrollable) { @Override public ResultWithGeneratedKeys executeUpdate(Object generatedKeysRequest) { - long start = 0; - boolean callStop = true; session.lock(); try { - Database database = getDatabase(); session.waitIfExclusiveModeEnabled(); - commitIfNonTransactional(); - SessionLocal.Savepoint rollback = session.setSavepoint(); - session.startStatementWithinTransaction(this); - DbException ex = null; - Session oldSession = session.setThreadLocalSession(); - try { - while (true) { - database.checkPowerOff(); - try { - return update(generatedKeysRequest); - } catch (DbException e) { - // cannot retry some commands - if (!isRetryable()) { - throw e; + return executeUpdate(generatedKeysRequest, true); + } finally { + session.unlock(); + } + } + + @Override + public BatchResult executeBatchUpdate(ArrayList batchParameters, Object generatedKeysRequest) { + session.lock(); + try { + session.waitIfExclusiveModeEnabled(); + int size = batchParameters.size(); + long[] updateCounts = new long[size]; + MergedResult generatedKeys = generatedKeysRequest != null ? new MergedResult() : null; + ArrayList exceptions = new ArrayList<>(); + for (int i = 0; i < size; i++) { + Value[] set = batchParameters.get(i); + ArrayList parameters = getParameters(); + for (int j = 0, l = set.length; j < l; j++) { + parameters.get(j).setValue(set[j], true); + } + long updateCount; + try { + ResultWithGeneratedKeys result = executeUpdate(generatedKeysRequest, i + 1 == size); + updateCount = result.getUpdateCount(); + if (generatedKeys != null) { + ResultInterface keys = result.getGeneratedKeys(); + if (keys != null) { + generatedKeys.add(keys); } - start = filterConcurrentUpdate(e, start); - } catch (OutOfMemoryError e) { - callStop = false; - database.shutdownImmediately(); - throw DbException.convert(e); - } catch (Throwable e) { - throw DbException.convert(e); } + } catch (Exception e) { + exceptions.add(DbException.toSQLException(e)); + updateCount = Statement.EXECUTE_FAILED; } - } catch (DbException e) { - e = e.addSQL(sql); - SQLException s = e.getSQLException(); - database.exceptionThrown(s, sql); - if (s.getErrorCode() == ErrorCode.OUT_OF_MEMORY) { - callStop = false; - database.shutdownImmediately(); - throw e; - } + updateCounts[i] = updateCount; + } + return new BatchResult(updateCounts, generatedKeys != null ? generatedKeys.getResult() : null, exceptions); + } finally { + session.unlock(); + } + } + + private ResultWithGeneratedKeys executeUpdate(Object generatedKeysRequest, boolean commitIfAutoCommit) { + long start = 0; + boolean callStop = true; + Database database = getDatabase(); + commitIfNonTransactional(); + SessionLocal.Savepoint rollback = session.setSavepoint(); + session.startStatementWithinTransaction(this); + DbException ex = null; + Session oldSession = session.setThreadLocalSession(); + try { + while (true) { + database.checkPowerOff(); try { - database.checkPowerOff(); - if (s.getErrorCode() == ErrorCode.DEADLOCK_1) { - session.rollback(); - } else { - session.rollbackTo(rollback); + return update(generatedKeysRequest); + } catch (DbException e) { + // cannot retry some commands + if (!isRetryable()) { + throw e; } - } catch (Throwable nested) { - e.addSuppressed(nested); + start = filterConcurrentUpdate(e, start); + } catch (OutOfMemoryError e) { + callStop = false; + database.shutdownImmediately(); + throw DbException.convert(e); + } catch (Throwable e) { + throw DbException.convert(e); } - ex = e; + } + } catch (DbException e) { + e = e.addSQL(sql); + SQLException s = e.getSQLException(); + database.exceptionThrown(s, sql); + if (s.getErrorCode() == ErrorCode.OUT_OF_MEMORY) { + callStop = false; + database.shutdownImmediately(); throw e; - } finally { - session.resetThreadLocalSession(oldSession); - try { - session.endStatement(); - if (callStop) { - stop(); - } - } catch (Throwable nested) { - if (ex == null) { - throw nested; - } else { - ex.addSuppressed(nested); - } + } + try { + database.checkPowerOff(); + if (s.getErrorCode() == ErrorCode.DEADLOCK_1) { + session.rollback(); + } else { + session.rollbackTo(rollback); } + } catch (Throwable nested) { + e.addSuppressed(nested); } + ex = e; + throw e; } finally { - session.unlock(); + session.resetThreadLocalSession(oldSession); + try { + session.endStatement(); + if (callStop) { + stop(true); + } + } catch (Throwable nested) { + if (ex == null) { + throw nested; + } else { + ex.addSuppressed(nested); + } + } } } diff --git a/h2/src/main/org/h2/command/CommandInterface.java b/h2/src/main/org/h2/command/CommandInterface.java index 46cb01fba3..69ab8ee1da 100644 --- a/h2/src/main/org/h2/command/CommandInterface.java +++ b/h2/src/main/org/h2/command/CommandInterface.java @@ -7,8 +7,10 @@ import java.util.ArrayList; import org.h2.expression.ParameterInterface; +import org.h2.result.BatchResult; import org.h2.result.ResultInterface; import org.h2.result.ResultWithGeneratedKeys; +import org.h2.value.Value; /** * Represents a SQL statement. @@ -600,10 +602,29 @@ public interface CommandInterface extends AutoCloseable { */ ResultWithGeneratedKeys executeUpdate(Object generatedKeysRequest); + + /** + * Executes the statement with multiple sets of parameters. + * + * @param batchParameters + * batch parameters + * @param generatedKeysRequest + * {@code null} or {@code false} if generated keys are not needed, + * {@code true} if generated keys should be configured + * automatically, {@code int[]} to specify column indices to + * return generated keys from, or {@code String[]} to specify + * column names to return generated keys from + * @return result of batch execution + */ + BatchResult executeBatchUpdate(ArrayList batchParameters, Object generatedKeysRequest); + /** - * Stop the command execution, release all locks and resources + * Stop the command execution, release all locks and resources. + * + * @param commitIfAutoCommit + * commit the session if auto-commit is enabled */ - void stop(); + void stop(boolean commitIfAutoCommit); /** * Close the statement. diff --git a/h2/src/main/org/h2/command/CommandList.java b/h2/src/main/org/h2/command/CommandList.java index 852fba18f3..b18defffab 100644 --- a/h2/src/main/org/h2/command/CommandList.java +++ b/h2/src/main/org/h2/command/CommandList.java @@ -75,10 +75,10 @@ public ResultInterface query(long maxrows) { } @Override - public void stop() { - command.stop(); + public void stop(boolean commitIfAutoCommit) { + command.stop(commitIfAutoCommit); if (remainingCommand != null) { - remainingCommand.stop(); + remainingCommand.stop(commitIfAutoCommit); } } diff --git a/h2/src/main/org/h2/command/CommandRemote.java b/h2/src/main/org/h2/command/CommandRemote.java index be37493179..5ec8c73a14 100644 --- a/h2/src/main/org/h2/command/CommandRemote.java +++ b/h2/src/main/org/h2/command/CommandRemote.java @@ -6,7 +6,11 @@ package org.h2.command; import java.io.IOException; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; + +import org.h2.engine.Constants; import org.h2.engine.GeneratedKeysMode; import org.h2.engine.SessionRemote; import org.h2.engine.SysProperties; @@ -14,6 +18,8 @@ import org.h2.expression.ParameterRemote; import org.h2.message.DbException; import org.h2.message.Trace; +import org.h2.result.BatchResult; +import org.h2.result.MergedResult; import org.h2.result.ResultInterface; import org.h2.result.ResultRemote; import org.h2.result.ResultWithGeneratedKeys; @@ -56,7 +62,7 @@ public CommandRemote(SessionRemote session, } @Override - public void stop() { + public void stop(boolean commitIfAutoCommit) { // Ignore } @@ -215,25 +221,7 @@ public ResultWithGeneratedKeys executeUpdate(Object generatedKeysRequest) { session.traceOperation("COMMAND_EXECUTE_UPDATE", id); transfer.writeInt(SessionRemote.COMMAND_EXECUTE_UPDATE).writeInt(id); sendParameters(transfer); - transfer.writeInt(generatedKeysMode); - switch (generatedKeysMode) { - case GeneratedKeysMode.COLUMN_NUMBERS: { - int[] keys = (int[]) generatedKeysRequest; - transfer.writeInt(keys.length); - for (int key : keys) { - transfer.writeInt(key); - } - break; - } - case GeneratedKeysMode.COLUMN_NAMES: { - String[] keys = (String[]) generatedKeysRequest; - transfer.writeInt(keys.length); - for (String key : keys) { - transfer.writeString(key); - } - break; - } - } + sendGeneratedKeysRequest(generatedKeysRequest, generatedKeysMode, transfer); session.done(transfer); updateCount = transfer.readRowCount(); autoCommit = transfer.readBoolean(); @@ -261,6 +249,92 @@ public ResultWithGeneratedKeys executeUpdate(Object generatedKeysRequest) { } } + @Override + public BatchResult executeBatchUpdate(ArrayList batchParameters, Object generatedKeysRequest) { + int generatedKeysMode = GeneratedKeysMode.valueOf(generatedKeysRequest); + boolean readGeneratedKeys = generatedKeysMode != GeneratedKeysMode.NONE; + int size = batchParameters.size(); + int objectId = readGeneratedKeys ? session.getNextId() : 0; + final SessionRemote session = this.session; + session.lock(); + try { + long[] updateCounts = new long[size]; + MergedResult generatedKeys = null; + ArrayList exceptions = new ArrayList<>(); + boolean autoCommit = false; + for (int i = 0, count = 0; i < transferList.size(); i++) { + prepareIfRequired(); + Transfer transfer = transferList.get(i); + MergedResult oldGeneratedKeys = generatedKeys; + generatedKeys = readGeneratedKeys ? new MergedResult() : null; + ArrayList oldExceptions = exceptions; + exceptions = new ArrayList<>(); + try { + if (transfer.getVersion() >= Constants.TCP_PROTOCOL_VERSION_21) { + session.traceOperation("COMMAND_EXECUTE_BATCH_UPDATE", id); + transfer.writeInt(SessionRemote.COMMAND_EXECUTE_BATCH_UPDATE).writeInt(id); + transfer.writeInt(size); + for (Value[] parameters : batchParameters) { + int len = parameters.length; + transfer.writeInt(len); + sendParameters(transfer, parameters); + } + sendGeneratedKeysRequest(generatedKeysRequest, generatedKeysMode, transfer); + session.done(transfer); + for (int j = 0; j < size; j++) { + updateCounts[j] = transfer.readRowCount(); + } + if (readGeneratedKeys) { + int columnCount = transfer.readInt(); + ResultRemote remoteGeneratedKeys = new ResultRemote(session, transfer, objectId, columnCount, Integer.MAX_VALUE); + generatedKeys.add(remoteGeneratedKeys); + remoteGeneratedKeys.close(); + } + int exceptionCount = transfer.readInt(); + for (int k = 0; k < exceptionCount; k++) { + exceptions.add(SessionRemote.readSQLException(transfer)); + } + autoCommit = transfer.readBoolean(); + } else { + for (int j = 0; j < size; j++) { + session.traceOperation("COMMAND_EXECUTE_UPDATE", id); + transfer.writeInt(SessionRemote.COMMAND_EXECUTE_UPDATE).writeInt(id); + Value[] parameters = batchParameters.get(j); + int len = parameters.length; + transfer.writeInt(len); + sendParameters(transfer, parameters); + sendGeneratedKeysRequest(generatedKeysRequest, generatedKeysMode, transfer); + try { + session.done(transfer); + updateCounts[j] = transfer.readRowCount(); + autoCommit = transfer.readBoolean(); + if (readGeneratedKeys) { + int columnCount = transfer.readInt(); + ResultRemote remoteGeneratedKeys = new ResultRemote(session, transfer, objectId, columnCount, Integer.MAX_VALUE); + generatedKeys.add(remoteGeneratedKeys); + remoteGeneratedKeys.close(); + } + } catch (DbException e) { + updateCounts[j] = Statement.EXECUTE_FAILED; + exceptions.add(DbException.toSQLException(e)); + } + } + } + } catch (IOException e) { + session.removeServer(e, i--, ++count); + generatedKeys = oldGeneratedKeys; + exceptions = oldExceptions; + } + } + session.setAutoCommitFromServer(autoCommit); + session.autoCommitIfCluster(); + session.readSessionState(); + return new BatchResult(updateCounts, generatedKeys != null ? generatedKeys.getResult() : null, exceptions); + } finally { + session.unlock(); + } + } + private void checkParameters() { if (cmdType != EXPLAIN) { for (ParameterInterface p : parameters) { @@ -274,15 +348,45 @@ private void sendParameters(Transfer transfer) throws IOException { transfer.writeInt(len); for (ParameterInterface p : parameters) { Value pVal = p.getParamValue(); - if (pVal == null && cmdType == EXPLAIN) { pVal = ValueNull.INSTANCE; } + transfer.writeValue(pVal); + } + } + private void sendParameters(Transfer transfer, Value[] parameters) throws IOException { + for (Value pVal : parameters) { + if (pVal == null && cmdType == EXPLAIN) { + pVal = ValueNull.INSTANCE; + } transfer.writeValue(pVal); } } + private static void sendGeneratedKeysRequest(Object generatedKeysRequest, int generatedKeysMode, Transfer transfer) + throws IOException { + transfer.writeInt(generatedKeysMode); + switch (generatedKeysMode) { + case GeneratedKeysMode.COLUMN_NUMBERS: { + int[] keys = (int[]) generatedKeysRequest; + transfer.writeInt(keys.length); + for (int key : keys) { + transfer.writeInt(key); + } + break; + } + case GeneratedKeysMode.COLUMN_NAMES: { + String[] keys = (String[]) generatedKeysRequest; + transfer.writeInt(keys.length); + for (String key : keys) { + transfer.writeString(key); + } + break; + } + } + } + @Override public void close() { final SessionRemote session = this.session; diff --git a/h2/src/main/org/h2/engine/Constants.java b/h2/src/main/org/h2/engine/Constants.java index f0d6caf0d9..ed80ba3fc0 100644 --- a/h2/src/main/org/h2/engine/Constants.java +++ b/h2/src/main/org/h2/engine/Constants.java @@ -60,6 +60,12 @@ public class Constants { */ public static final int TCP_PROTOCOL_VERSION_20 = 20; + /** + * The TCP protocol version number 21. + * @since 2.3.230 (TODO) + */ + public static final int TCP_PROTOCOL_VERSION_21 = 21; + /** * Minimum supported version of TCP protocol. */ @@ -68,7 +74,7 @@ public class Constants { /** * Maximum supported version of TCP protocol. */ - public static final int TCP_PROTOCOL_VERSION_MAX_SUPPORTED = TCP_PROTOCOL_VERSION_20; + public static final int TCP_PROTOCOL_VERSION_MAX_SUPPORTED = TCP_PROTOCOL_VERSION_21; /** * The major version of this database. diff --git a/h2/src/main/org/h2/engine/SessionRemote.java b/h2/src/main/org/h2/engine/SessionRemote.java index 4eab160af8..c638046f0c 100644 --- a/h2/src/main/org/h2/engine/SessionRemote.java +++ b/h2/src/main/org/h2/engine/SessionRemote.java @@ -73,6 +73,7 @@ public final class SessionRemote extends Session implements DataHandler { public static final int LOB_READ = 17; public static final int SESSION_PREPARE_READ_PARAMS2 = 18; public static final int GET_JDBC_META = 19; + public static final int COMMAND_EXECUTE_BATCH_UPDATE = 20; public static final int STATUS_ERROR = 0; public static final int STATUS_OK = 1; @@ -642,6 +643,18 @@ public void done(Transfer transfer) throws IOException { * on I/O exception */ public static DbException readException(Transfer transfer) throws IOException { + return DbException.convert(readSQLException(transfer)); + } + + /** + * Reads an exception as SQL exception. + * @param transfer + * the transfer object + * @return the exception + * @throws IOException + * on I/O exception + */ + public static SQLException readSQLException(Transfer transfer) throws IOException { String sqlstate = transfer.readString(); String message = transfer.readString(); String sql = transfer.readString(); @@ -652,7 +665,7 @@ public static DbException readException(Transfer transfer) throws IOException { // allow re-connect throw new IOException(s.toString(), s); } - return DbException.convert(s); + return s; } /** diff --git a/h2/src/main/org/h2/jdbc/JdbcPreparedStatement.java b/h2/src/main/org/h2/jdbc/JdbcPreparedStatement.java index 49e7ab74a5..1a4eccd770 100644 --- a/h2/src/main/org/h2/jdbc/JdbcPreparedStatement.java +++ b/h2/src/main/org/h2/jdbc/JdbcPreparedStatement.java @@ -22,10 +22,11 @@ import java.sql.SQLException; import java.sql.SQLType; import java.sql.SQLXML; -import java.sql.Statement; import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; +import java.util.Iterator; +import java.util.List; import org.h2.api.ErrorCode; import org.h2.command.CommandInterface; @@ -33,7 +34,7 @@ import org.h2.expression.ParameterInterface; import org.h2.message.DbException; import org.h2.message.TraceObject; -import org.h2.result.MergedResult; +import org.h2.result.BatchResult; import org.h2.result.ResultInterface; import org.h2.result.ResultWithGeneratedKeys; import org.h2.util.IOUtils; @@ -81,7 +82,6 @@ public class JdbcPreparedStatement extends JdbcStatement implements PreparedStat protected CommandInterface command; private ArrayList batchParameters; - private MergedResult batchIdentities; private HashMap cachedColumnLabelMap; private final Object generatedKeysRequest; @@ -116,7 +116,6 @@ public ResultSet executeQuery() throws SQLException { try { int id = getNextId(TraceObject.RESULT_SET); debugCodeAssign("ResultSet", TraceObject.RESULT_SET, id, "executeQuery()"); - batchIdentities = null; final Session session = this.session; session.lock(); try { @@ -170,7 +169,6 @@ public int executeUpdate() throws SQLException { try { debugCodeCall("executeUpdate"); checkClosed(); - batchIdentities = null; long updateCount = executeUpdateInternal(); return updateCount <= Integer.MAX_VALUE ? (int) updateCount : SUCCESS_NO_INFO; } catch (Exception e) { @@ -199,7 +197,6 @@ public long executeLargeUpdate() throws SQLException { try { debugCodeCall("executeLargeUpdate"); checkClosed(); - batchIdentities = null; return executeUpdateInternal(); } catch (Exception e) { throw logAndConvert(e); @@ -1242,7 +1239,6 @@ public void close() throws SQLException { try { super.close(); batchParameters = null; - batchIdentities = null; if (command != null) { command.close(); command = null; @@ -1263,25 +1259,24 @@ public void close() throws SQLException { public int[] executeBatch() throws SQLException { try { debugCodeCall("executeBatch"); + checkClosed(); + closeOldResultSet(); if (batchParameters == null) { - // Empty batch is allowed, see JDK-4639504 and other issues - batchParameters = new ArrayList<>(); + return new int[0]; } - batchIdentities = new MergedResult(); - int size = batchParameters.size(); - int[] result = new int[size]; - SQLException exception = new SQLException(); - checkClosed(); + BatchResult batchResult = executeBatchInternal(); + long[] longResult = batchResult.getUpdateCounts(); + int size = longResult.length; + int[] intResult = new int[size]; for (int i = 0; i < size; i++) { - long updateCount = executeBatchElement(batchParameters.get(i), exception); - result[i] = updateCount <= Integer.MAX_VALUE ? (int) updateCount : SUCCESS_NO_INFO; + long updateCount = longResult[i]; + intResult[i] = updateCount <= Integer.MAX_VALUE ? (int) updateCount : SUCCESS_NO_INFO; } - batchParameters = null; - exception = exception.getNextException(); - if (exception != null) { - throw new JdbcBatchUpdateException(exception, result); + List exceptions = batchResult.getExceptions(); + if (!exceptions.isEmpty()) { + throw new JdbcBatchUpdateException(createBatchException(exceptions), intResult); } - return result; + return intResult; } catch (Exception e) { throw logAndConvert(e); } @@ -1297,22 +1292,16 @@ public int[] executeBatch() throws SQLException { public long[] executeLargeBatch() throws SQLException { try { debugCodeCall("executeLargeBatch"); - if (batchParameters == null) { - // Empty batch is allowed, see JDK-4639504 and other issues - batchParameters = new ArrayList<>(); - } - batchIdentities = new MergedResult(); - int size = batchParameters.size(); - long[] result = new long[size]; - SQLException exception = new SQLException(); checkClosed(); - for (int i = 0; i < size; i++) { - result[i] = executeBatchElement(batchParameters.get(i), exception); + closeOldResultSet(); + if (batchParameters == null) { + return new long[0]; } - batchParameters = null; - exception = exception.getNextException(); - if (exception != null) { - throw new JdbcBatchUpdateException(exception, result); + BatchResult batchResult = executeBatchInternal(); + long[] result = batchResult.getUpdateCounts(); + List exceptions = batchResult.getExceptions(); + if (!exceptions.isEmpty()) { + throw new JdbcBatchUpdateException(createBatchException(exceptions), result); } return result; } catch (Exception e) { @@ -1320,38 +1309,37 @@ public long[] executeLargeBatch() throws SQLException { } } - private long executeBatchElement(Value[] set, SQLException exception) { - ArrayList parameters = command.getParameters(); - for (int i = 0, l = set.length; i < l; i++) { - parameters.get(i).setValue(set[i], false); - } - long updateCount; + private BatchResult executeBatchInternal() { + final Session session = this.session; + session.lock(); try { - updateCount = executeUpdateInternal(); - // Cannot use own implementation, it returns batch identities - ResultSet rs = super.getGeneratedKeys(); - batchIdentities.add(((JdbcResultSet) rs).result); - } catch (Exception e) { - exception.setNextException(logAndConvert(e)); - updateCount = Statement.EXECUTE_FAILED; + try { + setExecutingStatement(command); + BatchResult result = command.executeBatchUpdate(batchParameters, generatedKeysRequest); + ResultInterface gk = result.getGeneratedKeys(); + if (gk != null) { + int id = getNextId(TraceObject.RESULT_SET); + generatedKeys = new JdbcResultSet(conn, this, command, gk, id, true, false, false); + } + batchParameters = null; + return result; + } finally { + setExecutingStatement(null); + } + } finally { + session.unlock(); } - return updateCount; } - @Override - public ResultSet getGeneratedKeys() throws SQLException { - if (batchIdentities != null) { - try { - int id = getNextId(TraceObject.RESULT_SET); - debugCodeAssign("ResultSet", TraceObject.RESULT_SET, id, "getGeneratedKeys()"); - checkClosed(); - generatedKeys = new JdbcResultSet(conn, this, null, batchIdentities.getResult(), id, true, false, - false); - } catch (Exception e) { - throw logAndConvert(e); - } + private SQLException createBatchException(List exceptions) { + Iterator i = exceptions.iterator(); + SQLException exception = logAndConvert(i.next()), last = exception; + while (i.hasNext()) { + SQLException next = i.next(); + last.setNextException(next); + last = next; } - return super.getGeneratedKeys(); + return exception; } /** diff --git a/h2/src/main/org/h2/jdbc/JdbcStatement.java b/h2/src/main/org/h2/jdbc/JdbcStatement.java index 37864f0ee8..b395334a51 100644 --- a/h2/src/main/org/h2/jdbc/JdbcStatement.java +++ b/h2/src/main/org/h2/jdbc/JdbcStatement.java @@ -801,17 +801,29 @@ public int[] executeBatch() throws SQLException { debugCodeCall("executeBatch"); checkClosed(); if (batchCommands == null) { - batchCommands = new ArrayList<>(); + closeOldResultSet(); + return new int[0]; } int size = batchCommands.size(); int[] result = new int[size]; - SQLException exception = new SQLException(); + SQLException exception = null, last = null; for (int i = 0; i < size; i++) { - long updateCount = executeBatchElement(batchCommands.get(i), exception); - result[i] = updateCount <= Integer.MAX_VALUE ? (int) updateCount : SUCCESS_NO_INFO; + int updateCount; + try { + long longUpdateCount = executeUpdateInternal(batchCommands.get(i), null); + updateCount = longUpdateCount <= Integer.MAX_VALUE ? (int) longUpdateCount : SUCCESS_NO_INFO; + } catch (Exception e) { + SQLException s = DbException.toSQLException(e); + if (last == null) { + last = exception = s; + } else { + last.setNextException(s); + } + updateCount = Statement.EXECUTE_FAILED; + } + result[i] = updateCount; } batchCommands = null; - exception = exception.getNextException(); if (exception != null) { throw new JdbcBatchUpdateException(exception, result); } @@ -833,16 +845,28 @@ public long[] executeLargeBatch() throws SQLException { debugCodeCall("executeLargeBatch"); checkClosed(); if (batchCommands == null) { - batchCommands = new ArrayList<>(); + closeOldResultSet(); + return new long[0]; } int size = batchCommands.size(); long[] result = new long[size]; - SQLException exception = new SQLException(); + SQLException exception = null, last = null; for (int i = 0; i < size; i++) { - result[i] = executeBatchElement(batchCommands.get(i), exception); + long updateCount; + try { + updateCount = executeUpdateInternal(batchCommands.get(i), null); + } catch (Exception e) { + SQLException s = DbException.toSQLException(e); + if (last == null) { + last = exception = s; + } else { + last.setNextException(s); + } + updateCount = Statement.EXECUTE_FAILED; + } + result[i] = updateCount; } batchCommands = null; - exception = exception.getNextException(); if (exception != null) { throw new JdbcBatchUpdateException(exception, result); } @@ -852,17 +876,6 @@ public long[] executeLargeBatch() throws SQLException { } } - private long executeBatchElement(String sql, SQLException exception) { - long updateCount; - try { - updateCount = executeUpdateInternal(sql, null); - } catch (Exception e) { - exception.setNextException(logAndConvert(e)); - updateCount = Statement.EXECUTE_FAILED; - } - return updateCount; - } - /** * Return a result set with generated keys from the latest executed command * or an empty result set if keys were not generated or were not requested @@ -898,7 +911,7 @@ private long executeBatchElement(String sql, SQLException exception) { * @throws SQLException if this object is closed */ @Override - public ResultSet getGeneratedKeys() throws SQLException { + public final ResultSet getGeneratedKeys() throws SQLException { try { int id = generatedKeys != null ? generatedKeys.getTraceId() : getNextId(TraceObject.RESULT_SET); if (isDebugEnabled()) { @@ -1332,7 +1345,7 @@ void setExecutingStatement(CommandInterface c) { */ void onLazyResultSetClose(CommandInterface command, boolean closeCommand) { setExecutingStatement(null); - command.stop(); + command.stop(true); if (closeCommand) { command.close(); } diff --git a/h2/src/main/org/h2/result/BatchResult.java b/h2/src/main/org/h2/result/BatchResult.java new file mode 100644 index 0000000000..85ff55ad59 --- /dev/null +++ b/h2/src/main/org/h2/result/BatchResult.java @@ -0,0 +1,40 @@ +/* + * Copyright 2004-2024 H2 Group. Multiple-Licensed under the MPL 2.0, + * and the EPL 1.0 (https://h2database.com/html/license.html). + * Initial Developer: H2 Group + */ +package org.h2.result; + +import java.sql.SQLException; +import java.util.List; + +/** + * Result of a batch execution. + */ +public class BatchResult { + + private final long[] updateCounts; + + private final ResultInterface generatedKeys; + + private final List exceptions; + + public BatchResult(long[] updateCounts, ResultInterface generatedKeys, List exceptions) { + this.updateCounts = updateCounts; + this.generatedKeys = generatedKeys; + this.exceptions = exceptions; + } + + public long[] getUpdateCounts() { + return updateCounts; + } + + public ResultInterface getGeneratedKeys() { + return generatedKeys; + } + + public List getExceptions() { + return exceptions; + } + +} diff --git a/h2/src/main/org/h2/server/TcpServerThread.java b/h2/src/main/org/h2/server/TcpServerThread.java index 9c850417ef..929d25c903 100644 --- a/h2/src/main/org/h2/server/TcpServerThread.java +++ b/h2/src/main/org/h2/server/TcpServerThread.java @@ -14,6 +14,7 @@ import java.net.Socket; import java.sql.SQLException; import java.util.ArrayList; +import java.util.List; import java.util.Objects; import org.h2.api.ErrorCode; @@ -32,6 +33,7 @@ import org.h2.jdbc.JdbcException; import org.h2.jdbc.meta.DatabaseMetaServer; import org.h2.message.DbException; +import org.h2.result.BatchResult; import org.h2.result.ResultColumn; import org.h2.result.ResultInterface; import org.h2.result.ResultWithGeneratedKeys; @@ -239,35 +241,38 @@ void close() { private void sendError(Throwable t, boolean withStatus) { try { - SQLException e = DbException.convert(t).getSQLException(); - StringWriter writer = new StringWriter(); - e.printStackTrace(new PrintWriter(writer)); - String trace = writer.toString(); - String message; - String sql; - if (e instanceof JdbcException) { - JdbcException j = (JdbcException) e; - message = j.getOriginalMessage(); - sql = j.getSQL(); - } else { - message = e.getMessage(); - sql = null; - } if (withStatus) { transfer.writeInt(SessionRemote.STATUS_ERROR); } - transfer. - writeString(e.getSQLState()).writeString(message). - writeString(sql).writeInt(e.getErrorCode()).writeString(trace).flush(); - } catch (Exception e2) { + sendSQLException(DbException.convert(t).getSQLException()); + transfer.flush(); + } catch (Exception e) { if (!transfer.isClosed()) { - server.traceError(e2); + server.traceError(e); } // if writing the error does not work, close the connection stop = true; } } + private void sendSQLException(SQLException e) throws IOException { + StringWriter writer = new StringWriter(); + e.printStackTrace(new PrintWriter(writer)); + String trace = writer.toString(); + String message; + String sql; + if (e instanceof JdbcException) { + JdbcException j = (JdbcException) e; + message = j.getOriginalMessage(); + sql = j.getSQL(); + } else { + message = e.getMessage(); + sql = null; + } + transfer.writeString(e.getSQLState()).writeString(message).writeString(sql).writeInt(e.getErrorCode()) + .writeString(trace); + } + private void setParameters(Command command) throws IOException { int len = transfer.readInt(); ArrayList params = command.getParameters(); @@ -373,39 +378,7 @@ private void process() throws IOException { int id = transfer.readInt(); Command command = (Command) cache.getObject(id, false); setParameters(command); - boolean writeGeneratedKeys = true; - Object generatedKeysRequest; - int mode = transfer.readInt(); - switch (mode) { - case GeneratedKeysMode.NONE: - generatedKeysRequest = false; - writeGeneratedKeys = false; - break; - case GeneratedKeysMode.AUTO: - generatedKeysRequest = true; - break; - case GeneratedKeysMode.COLUMN_NUMBERS: { - int len = transfer.readInt(); - int[] keys = new int[len]; - for (int i = 0; i < len; i++) { - keys[i] = transfer.readInt(); - } - generatedKeysRequest = keys; - break; - } - case GeneratedKeysMode.COLUMN_NAMES: { - int len = transfer.readInt(); - String[] keys = new String[len]; - for (int i = 0; i < len; i++) { - keys[i] = transfer.readString(); - } - generatedKeysRequest = keys; - break; - } - default: - throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, - "Unsupported generated keys' mode " + mode); - } + Object generatedKeysRequest = readGeneratedKeysRequest(); int old = session.getModificationId(); ResultWithGeneratedKeys result; session.lock(); @@ -424,17 +397,8 @@ private void process() throws IOException { transfer.writeInt(status); transfer.writeRowCount(result.getUpdateCount()); transfer.writeBoolean(session.getAutoCommit()); - if (writeGeneratedKeys) { - ResultInterface generatedKeys = result.getGeneratedKeys(); - int columnCount = generatedKeys.getVisibleColumnCount(); - transfer.writeInt(columnCount); - long rowCount = generatedKeys.getRowCount(); - transfer.writeRowCount(rowCount); - for (int i = 0; i < columnCount; i++) { - ResultColumn.writeColumn(transfer, generatedKeys, i); - } - sendRows(generatedKeys, rowCount); - generatedKeys.close(); + if (generatedKeysRequest != Boolean.FALSE) { + sendGeneratedKeys(result.getGeneratedKeys()); } transfer.flush(); break; @@ -553,12 +517,98 @@ private void process() throws IOException { transfer.flush(); break; } + case SessionRemote.COMMAND_EXECUTE_BATCH_UPDATE: { + int id = transfer.readInt(); + Command command = (Command) cache.getObject(id, false); + int size = transfer.readInt(); + ArrayList batchParameters = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + int len = transfer.readInt(); + Value[] parameters = new Value[len]; + for (int j = 0; j < len; j++) { + parameters[j] = transfer.readValue(null); + } + batchParameters.add(parameters); + } + Object generatedKeysRequest = readGeneratedKeysRequest(); + int old = session.getModificationId(); + BatchResult result; + session.lock(); + try { + result = command.executeBatchUpdate(batchParameters, generatedKeysRequest); + } finally { + session.unlock(); + } + int status; + if (session.isClosed()) { + status = SessionRemote.STATUS_CLOSED; + stop = true; + } else { + status = getState(old); + } + transfer.writeInt(status); + for (long updateCount : result.getUpdateCounts()) { + transfer.writeLong(updateCount); + } + if (generatedKeysRequest != Boolean.FALSE) { + sendGeneratedKeys(result.getGeneratedKeys()); + } + List exceptions = result.getExceptions(); + transfer.writeInt(exceptions.size()); + for (SQLException exception : exceptions) { + sendSQLException(exception); + } + transfer.writeBoolean(session.getAutoCommit()); + transfer.flush(); + break; + } default: trace("Unknown operation: " + operation); close(); } } + private Object readGeneratedKeysRequest() throws IOException { + int mode = transfer.readInt(); + switch (mode) { + case GeneratedKeysMode.NONE: + return Boolean.FALSE; + case GeneratedKeysMode.AUTO: + return Boolean.TRUE; + case GeneratedKeysMode.COLUMN_NUMBERS: { + int len = transfer.readInt(); + int[] keys = new int[len]; + for (int i = 0; i < len; i++) { + keys[i] = transfer.readInt(); + } + return keys; + } + case GeneratedKeysMode.COLUMN_NAMES: { + int len = transfer.readInt(); + String[] keys = new String[len]; + for (int i = 0; i < len; i++) { + keys[i] = transfer.readString(); + } + return keys; + } + default: + throw DbException.get(ErrorCode.CONNECTION_BROKEN_1, + "Unsupported generated keys' mode " + mode); + } + } + + private void sendGeneratedKeys(ResultInterface generatedKeys) throws IOException { + int columnCount = generatedKeys.getVisibleColumnCount(); + transfer.writeInt(columnCount); + long rowCount = generatedKeys.getRowCount(); + transfer.writeRowCount(rowCount); + for (int i = 0; i < columnCount; i++) { + ResultColumn.writeColumn(transfer, generatedKeys, i); + } + sendRows(generatedKeys, rowCount); + generatedKeys.close(); + } + private int getState(int oldModificationId) { if (session == null) { return SessionRemote.STATUS_CLOSED; From 2582564e7b3fdbb3fd3f7c70af3bfb3a29d32c4e Mon Sep 17 00:00:00 2001 From: Evgenij Ryazanov Date: Sun, 4 Feb 2024 16:27:25 +0800 Subject: [PATCH 2/2] Pass commitIfAutoCommit parameter to the stop() method --- h2/src/main/org/h2/command/Command.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/h2/src/main/org/h2/command/Command.java b/h2/src/main/org/h2/command/Command.java index ba47cdd55c..237d9c63d0 100644 --- a/h2/src/main/org/h2/command/Command.java +++ b/h2/src/main/org/h2/command/Command.java @@ -342,7 +342,7 @@ private ResultWithGeneratedKeys executeUpdate(Object generatedKeysRequest, boole try { session.endStatement(); if (callStop) { - stop(true); + stop(commitIfAutoCommit); } } catch (Throwable nested) { if (ex == null) {