From 737e148d724ddc607ac2fbdcae2fa4c7e34cf615 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 30 Dec 2022 11:34:38 +0800 Subject: [PATCH 1/3] Add stream-based prepared statemenet --- .../client/ClickHouseInputStream.java | 68 +++- .../client/ClickHouseOutputStream.java | 2 +- .../client/data/ClickHouseSimpleResponse.java | 6 +- .../client/data/ClickHouseStreamResponse.java | 2 +- .../client/stream/RestrictedInputStream.java | 141 ++++++++ .../client/stream/InputStreamImplTest.java | 46 ++- .../jdbc/ClickHousePreparedStatement.java | 7 +- .../internal/ClickHouseConnectionImpl.java | 37 +- .../StreamBasedPreparedStatement.java | 338 ++++++++++++++++++ .../jdbc/ClickHousePreparedStatementTest.java | 76 ++++ 10 files changed, 697 insertions(+), 26 deletions(-) create mode 100644 clickhouse-client/src/main/java/com/clickhouse/client/stream/RestrictedInputStream.java create mode 100644 clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/StreamBasedPreparedStatement.java diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java index b842c6e11..68d7e7a2d 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java @@ -15,6 +15,8 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -28,6 +30,7 @@ import com.clickhouse.client.stream.DeferredInputStream; import com.clickhouse.client.stream.EmptyInputStream; import com.clickhouse.client.stream.Lz4InputStream; +import com.clickhouse.client.stream.RestrictedInputStream; import com.clickhouse.client.stream.IterableByteArrayInputStream; import com.clickhouse.client.stream.IterableByteBufferInputStream; import com.clickhouse.client.stream.IterableMultipleInputStream; @@ -85,6 +88,29 @@ public static ClickHouseInputStream wrap(ClickHouseFile file, InputStream input, return chInput; } + /** + * Wraps the given input stream with length limitation. Please pay attention + * that calling close() method of the wrapper will never close the inner input + * stream. + * + * @param input non-null input stream + * @param bufferSize buffer size + * @param length maximum bytes can be read from the input + * @param postCloseAction custom action will be performed right after closing + * the wrapped input stream + * @return non-null wrapped input stream + */ + public static ClickHouseInputStream wrap(InputStream input, int bufferSize, long length, Runnable postCloseAction) { + if (input instanceof RestrictedInputStream) { + RestrictedInputStream ris = (RestrictedInputStream) input; + if (ris.getRemaining() == length) { + return ris; + } + } + + return new RestrictedInputStream(null, input, bufferSize, length, postCloseAction); + } + /** * Gets an empty input stream that produces nothing and cannot be closed. * @@ -505,15 +531,20 @@ public static File save(File file, InputStream in, int bufferSize, int timeout, * Optional post close action. */ protected final Runnable postCloseAction; + /** + * User data shared between multiple calls. + */ + protected final Map userData; + + protected volatile boolean closed; - protected boolean closed; protected OutputStream copyTo; protected ClickHouseInputStream(ClickHouseFile file, OutputStream copyTo, Runnable postCloseAction) { this.byteBuffer = ClickHouseByteBuffer.newInstance(); this.file = file != null ? file : ClickHouseFile.NULL; this.postCloseAction = postCloseAction; - + this.userData = new HashMap<>(); this.closed = false; this.copyTo = copyTo; } @@ -550,6 +581,37 @@ public ClickHouseFile getUnderlyingFile() { return file; } + /** + * Gets user data associated with this input stream. + * + * @param key key + * @return value, could be null + */ + public final Object getUserData(String key) { + return userData.get(key); + } + + /** + * Removes user data. + * + * @param key key + * @return removed user data, could be null + */ + public final Object removeUserData(String key) { + return userData.remove(key); + } + + /** + * Sets user data. + * + * @param key key + * @param value value + * @return overidded value, could be null + */ + public final Object setUserData(String key, Object value) { + return userData.put(key, value); + } + /** * Peeks one byte. It's similar as {@link #read()} except it never changes * cursor. @@ -849,6 +911,8 @@ public boolean isClosed() { public void close() throws IOException { if (!closed) { closed = true; + // clear user data if any + userData.clear(); // don't want to hold the last byte array reference for too long byteBuffer.reset(); if (postCloseAction != null) { diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java index d3974a32e..e999fd73d 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java @@ -149,7 +149,7 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize, Cli protected final ClickHouseFile file; protected final Runnable postCloseAction; - protected boolean closed; + protected volatile boolean closed; protected ClickHouseOutputStream(ClickHouseFile file, Runnable postCloseAction) { this.file = file != null ? file : ClickHouseFile.NULL; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java index ff260a20e..515b4eb7f 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseSimpleResponse.java @@ -122,7 +122,7 @@ public static ClickHouseResponse of(ClickHouseResponse response, ClickHouseRecor private final List records; private final ClickHouseResponseSummary summary; - private boolean isClosed; + private volatile boolean closed; protected ClickHouseSimpleResponse(List columns, List records, ClickHouseResponseSummary summary) { @@ -169,11 +169,11 @@ public Iterable records() { @Override public void close() { // nothing to close - isClosed = true; + closed = true; } @Override public boolean isClosed() { - return isClosed; + return closed; } } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java index c0d96a83a..0b5cfa42d 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java @@ -60,7 +60,7 @@ public static ClickHouseResponse of(ClickHouseConfig config, ClickHouseInputStre protected final List columns; protected final ClickHouseResponseSummary summary; - private boolean closed; + private volatile boolean closed; protected ClickHouseStreamResponse(ClickHouseConfig config, ClickHouseInputStream input, Map settings, List columns, ClickHouseResponseSummary summary) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/RestrictedInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/RestrictedInputStream.java new file mode 100644 index 000000000..24f61a92b --- /dev/null +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/RestrictedInputStream.java @@ -0,0 +1,141 @@ +package com.clickhouse.client.stream; + +import java.io.IOException; +import java.io.InputStream; +import java.util.LinkedList; + +import com.clickhouse.client.ClickHouseByteBuffer; +import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseDataUpdater; +import com.clickhouse.client.ClickHouseFile; +import com.clickhouse.client.ClickHouseOutputStream; +import com.clickhouse.client.ClickHouseUtils; +import com.clickhouse.client.config.ClickHouseClientOption; + +/** + * Wrapper of {@link java.io.InputStream} with length limitation. Unlike + * {@link WrappedInputStream}, the inner input stream remains open after calling + * close(). + */ +public class RestrictedInputStream extends AbstractByteArrayInputStream { + private final InputStream in; + + private long length; + + @Override + protected int updateBuffer() throws IOException { + position = 0; + + if (closed) { + return limit = 0; + } + + int len = buffer.length; + if (this.length < len) { + len = (int) this.length; + } + + int off = 0; + while (len > 0) { + int read = in.read(buffer, off, len); + if (read == -1) { + break; + } else { + off += read; + len -= read; + } + } + if (copyTo != null) { + copyTo.write(buffer, 0, off); + } + this.length -= off; + return limit = off; + } + + public RestrictedInputStream(ClickHouseFile file, InputStream input, int bufferSize, long length, + Runnable postCloseAction) { + super(file, null, postCloseAction); + + this.in = ClickHouseChecker.nonNull(input, "InputStream"); + // fixed buffer + this.buffer = new byte[ClickHouseUtils.getBufferSize(bufferSize, + (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), + (int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue())]; + + this.length = ClickHouseChecker.notLessThan(length, "Length", 0L); + + this.position = 0; + this.limit = 0; + } + + @Override + public ClickHouseByteBuffer readCustom(ClickHouseDataUpdater reader) throws IOException { + if (reader == null) { + return byteBuffer.reset(); + } + ensureOpen(); + + LinkedList list = new LinkedList<>(); + int offset = position; + int len = 0; + boolean more = true; + while (more) { + int remain = limit - position; + if (remain < 1) { + closeQuietly(); + more = false; + } else { + int read = reader.update(buffer, position, limit); + if (read == -1) { + byte[] bytes = new byte[limit]; + System.arraycopy(buffer, position, bytes, position, remain); + len += remain; + position = limit; + list.add(bytes); + if (updateBuffer() < 1) { + closeQuietly(); + more = false; + } + } else { + len += read; + position += read; + list.add(buffer); + more = false; + } + } + } + return byteBuffer.update(list, offset, len); + } + + @Override + public long pipe(ClickHouseOutputStream output) throws IOException { + long count = 0L; + if (output == null || output.isClosed()) { + return count; + } + ensureOpen(); + + try { + int l = limit; + int p = position; + int remain = l - p; + if (remain > 0) { + output.writeBytes(buffer, p, remain); + count += remain; + position = l; + } + + while ((remain = updateBuffer()) > 0) { + output.writeBytes(buffer, 0, remain); + count += remain; + } + } finally { + close(); + } + return count; + } + + public final long getRemaining() { + return length; + } +} diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/stream/InputStreamImplTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/stream/InputStreamImplTest.java index bb5d96b84..46e98e64d 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/stream/InputStreamImplTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/stream/InputStreamImplTest.java @@ -170,6 +170,7 @@ private Object[][] getEmptyInputStreams() { { ClickHouseInputStream.of(new ByteArrayInputStream(new byte[0]), new ByteArrayInputStream(new byte[0])) }, { ClickHouseInputStream.of((ByteArrayInputStream) null, (ByteArrayInputStream) null) }, + { ClickHouseInputStream.wrap(new ByteArrayInputStream(new byte[] { 1, 2, 3 }), 2048, 0L, null) }, // strings { ClickHouseInputStream.of(new String[0]) }, { ClickHouseInputStream.of("") }, @@ -252,6 +253,9 @@ private Object[][] getInputStreamsWithData() { .of(new InputStream[] { new ByteArrayInputStream(new byte[] { 0x65 }), null, new ByteArrayInputStream(new byte[] { 0x66, 0x67, 0x68 }), null, new ByteArrayInputStream(new byte[] { 0x69, 0x70 }) }) }, + { ClickHouseInputStream.wrap( + new ByteArrayInputStream(new byte[] { 1, 2, 0x65, 0x66, 0x67, 0x68, 0x69, 0x70, 1, 2 }, 2, 6), + 2048, 6L, null) }, // strings { ClickHouseInputStream.of("efghip") }, { ClickHouseInputStream.of("e", "fg", "hip") }, @@ -322,7 +326,10 @@ private Object[][] getInputStreamWithData() { ByteBuffer.wrap(new byte[] { -1, 1, 2, 3, -4 }, 1, 3), ByteBuffer.allocate(0), null, ByteBuffer.wrap(new byte[] { 4, 5 }), null, ByteBuffer.allocate(0), null), - null) } + null) }, + new Object[] { + new RestrictedInputStream(null, new ByteArrayInputStream(new byte[] { 1, 2, 3, 4, 5 }), 2048, + 5L, null) }, }; } @@ -721,4 +728,41 @@ public void testReadCustomBuffer() throws IOException { Assert.assertThrows(IOException.class, () -> in.readCustom(new CustomReader((byte) 1, (byte) 2)::read)); } } + + @Test(groups = { "unit" }) + public void testRestrictedInputStream() throws IOException { + ByteArrayInputStream bytes = new ByteArrayInputStream(new byte[] { 1, 2, 3, 4, 5, 6 }); + Assert.assertThrows(IllegalArgumentException.class, () -> ClickHouseInputStream.wrap(bytes, 0, -1, null)); + + ClickHouseInputStream in = ClickHouseInputStream.wrap(bytes, 0, 0, null); + Assert.assertTrue(in instanceof RestrictedInputStream); + Assert.assertFalse(in.isClosed()); + Assert.assertEquals(in.available(), 0); + Assert.assertEquals(((RestrictedInputStream) in).getRemaining(), 0); + + in = ClickHouseInputStream.wrap(bytes, 0, 1, null); + Assert.assertEquals(in.available(), 1); + Assert.assertEquals(in.read(), 1); + Assert.assertEquals(in.available(), 0); + Assert.assertEquals(in.read(), -1); + + in = ClickHouseInputStream.wrap(bytes, 0, 3, null); + Assert.assertEquals(in.available(), 3); + Assert.assertEquals(in.read(), 2); + Assert.assertEquals(in.available(), 2); + Assert.assertEquals(in.read(), 3); + Assert.assertEquals(in.available(), 1); + Assert.assertEquals(in.read(), 4); + Assert.assertEquals(in.available(), 0); + Assert.assertEquals(in.read(), -1); + + in = ClickHouseInputStream.wrap(bytes, 0, 3, null); + Assert.assertEquals(in.available(), 2); + Assert.assertEquals(in.read(), 5); + Assert.assertEquals(in.available(), 1); + Assert.assertEquals(in.read(), 6); + Assert.assertEquals(in.available(), 0); + Assert.assertEquals(in.read(), -1); + Assert.assertEquals(((RestrictedInputStream) in).getRemaining(), 1); + } } diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java index 2cb2d9c73..110e34e98 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHousePreparedStatement.java @@ -20,6 +20,7 @@ import java.sql.Timestamp; import java.sql.Types; +import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.data.BinaryStreamUtils; public interface ClickHousePreparedStatement extends PreparedStatement { @@ -167,7 +168,7 @@ default void setAsciiStream(int parameterIndex, InputStream x, long length) thro @Override default void setBinaryStream(int parameterIndex, InputStream x, long length) throws SQLException { - throw SqlExceptionUtils.unsupportedError("setBinaryStream not implemented"); + setBinaryStream(parameterIndex, length < 0L ? x : ClickHouseInputStream.wrap(x, 0, length, null)); } @Override @@ -182,12 +183,12 @@ default void setAsciiStream(int parameterIndex, InputStream x) throws SQLExcepti @Override default void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { - setBinaryStream(parameterIndex, x, 0L); + throw SqlExceptionUtils.unsupportedError("setBinaryStream not implemented"); } @Override default void setCharacterStream(int parameterIndex, Reader reader) throws SQLException { - setCharacterStream(parameterIndex, reader, 0L); + setCharacterStream(parameterIndex, reader, -1L); } @Override diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java index 76d7baf13..1e8c7f7e7 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseConnectionImpl.java @@ -43,6 +43,7 @@ import com.clickhouse.client.ClickHouseUtils; import com.clickhouse.client.ClickHouseValues; import com.clickhouse.client.ClickHouseVersion; +import com.clickhouse.client.ClickHouseRequest.Mutation; import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.config.ClickHouseOption; import com.clickhouse.client.config.ClickHouseRenameMethod; @@ -778,23 +779,29 @@ public PreparedStatement prepareStatement(String sql, int resultSetType, int res resultSetType, resultSetConcurrency, resultSetHoldability); } else if (parsedStmt.getStatementType() == StatementType.INSERT) { if (!ClickHouseChecker.isNullOrBlank(parsedStmt.getInput())) { - // ugly workaround for https://github.com/ClickHouse/ClickHouse/issues/39866 - // TODO replace JSON and Object('json') types in input function to String + // an ugly workaround of https://github.com/ClickHouse/ClickHouse/issues/39866 + // would be replace JSON and Object('json') types in the query to String + Mutation m = clientRequest.write(); + if (parsedStmt.hasFormat()) { + m.format(ClickHouseFormat.valueOf(parsedStmt.getFormat())); + } // insert query using input function - ps = new InputBasedPreparedStatement(this, - clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), - ClickHouseColumn.parse(parsedStmt.getInput()), resultSetType, - resultSetConcurrency, resultSetHoldability); - } else if (!parsedStmt.containsKeyword("SELECT") && !parsedStmt.hasValues() && - (!parsedStmt.hasFormat() || clientRequest.getFormat().name().equals(parsedStmt.getFormat()))) { - ps = new InputBasedPreparedStatement(this, - clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), - getTableColumns(parsedStmt.getDatabase(), parsedStmt.getTable(), - parsedStmt.getContentBetweenKeywords( - ClickHouseSqlStatement.KEYWORD_TABLE_COLUMNS_START, - ClickHouseSqlStatement.KEYWORD_TABLE_COLUMNS_END)), - resultSetType, resultSetConcurrency, resultSetHoldability); + ps = new InputBasedPreparedStatement(this, m.query(parsedStmt.getSQL(), newQueryId()), + ClickHouseColumn.parse(parsedStmt.getInput()), resultSetType, resultSetConcurrency, + resultSetHoldability); + } else if (!parsedStmt.containsKeyword("SELECT") && !parsedStmt.hasValues()) { + ps = parsedStmt.hasFormat() + ? new StreamBasedPreparedStatement(this, + clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), parsedStmt, + resultSetType, resultSetConcurrency, resultSetHoldability) + : new InputBasedPreparedStatement(this, + clientRequest.write().query(parsedStmt.getSQL(), newQueryId()), + getTableColumns(parsedStmt.getDatabase(), parsedStmt.getTable(), + parsedStmt.getContentBetweenKeywords( + ClickHouseSqlStatement.KEYWORD_TABLE_COLUMNS_START, + ClickHouseSqlStatement.KEYWORD_TABLE_COLUMNS_END)), + resultSetType, resultSetConcurrency, resultSetHoldability); } } } diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/StreamBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/StreamBasedPreparedStatement.java new file mode 100644 index 000000000..22f77c7e5 --- /dev/null +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/StreamBasedPreparedStatement.java @@ -0,0 +1,338 @@ +package com.clickhouse.jdbc.internal; + +import java.io.File; +import java.io.InputStream; +import java.math.BigDecimal; +import java.sql.Array; +import java.sql.Date; +import java.sql.ParameterMetaData; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseColumn; +import com.clickhouse.client.ClickHouseDataStreamFactory; +import com.clickhouse.client.ClickHouseDataType; +import com.clickhouse.client.ClickHouseInputStream; +import com.clickhouse.client.ClickHouseOutputStream; +import com.clickhouse.client.ClickHousePipedOutputStream; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseValues; +import com.clickhouse.client.ClickHouseWriter; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; +import com.clickhouse.jdbc.ClickHousePreparedStatement; +import com.clickhouse.jdbc.SqlExceptionUtils; +import com.clickhouse.jdbc.parser.ClickHouseSqlStatement; + +public class StreamBasedPreparedStatement extends AbstractPreparedStatement implements ClickHousePreparedStatement { + private static final Logger log = LoggerFactory.getLogger(StreamBasedPreparedStatement.class); + + private static final String ERROR_SET_PARAM = "Please use setString()/setBytes()/setInputStream() or pass String/InputStream/ClickHouseInputStream to setObject() method instead"; + private static final String DEFAULT_KEY = "pipe"; + private static final List DEFAULT_PARAMS = Collections + .singletonList(ClickHouseColumn.of("data", ClickHouseDataType.String, false)); + + private final ClickHouseSqlStatement parsedStmt; + private final ClickHouseParameterMetaData paramMetaData; + + private final List batch; + + private ClickHouseInputStream value; + + protected StreamBasedPreparedStatement(ClickHouseConnectionImpl connection, ClickHouseRequest request, + ClickHouseSqlStatement parsedStmt, int resultSetType, int resultSetConcurrency, int resultSetHoldability) + throws SQLException { + super(connection, request, resultSetType, resultSetConcurrency, resultSetHoldability); + + this.parsedStmt = parsedStmt; + this.value = null; + paramMetaData = new ClickHouseParameterMetaData(DEFAULT_PARAMS, mapper, connection.getTypeMap()); + batch = new LinkedList<>(); + } + + protected void ensureParams() throws SQLException { + if (value == null) { + throw SqlExceptionUtils.clientError("Missing input stream"); + } + } + + @Override + protected long[] executeAny(boolean asBatch) throws SQLException { + ensureOpen(); + boolean continueOnError = false; + if (asBatch) { + if (batch.isEmpty()) { + return ClickHouseValues.EMPTY_LONG_ARRAY; + } + continueOnError = getConnection().getJdbcConfig().isContinueBatchOnError(); + } else { + try { + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } + addBatch(); + } catch (SQLException e) { + clearBatch(); + throw e; + } + } + + long[] results = new long[batch.size()]; + int count = 0; + String sql = getRequest().getStatements(false).get(0); + try { + for (ClickHouseInputStream in : batch) { + @SuppressWarnings("unchecked") + final CompletableFuture future = (CompletableFuture) in.removeUserData(DEFAULT_KEY); + results[count++] = executeInsert(sql, in); + if (future != null) { + future.get(); + } + } + } catch (Exception e) { + if (!asBatch) { + throw SqlExceptionUtils.handle(e); + } + + if (!continueOnError) { + throw SqlExceptionUtils.batchUpdateError(e, results); + } + log.error("Failed to execute batch insert of %d records", count + 1, e); + } finally { + clearBatch(); + } + + return results; + } + + @Override + protected int getMaxParameterIndex() { + return 1; + } + + protected String getSql() { + // why? because request can be modified so it might not always same as + // parsedStmt.getSQL() + return getRequest().getStatements(false).get(0); + } + + @Override + public ResultSet executeQuery() throws SQLException { + ensureParams(); + try { + executeAny(false); + } catch (SQLException e) { + if (e.getSQLState() != null) { + throw e; + } else { + throw new SQLException("Query failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR, e.getCause()); + } + } + + ResultSet rs = getResultSet(); + if (rs != null) { // should not happen + try { + rs.close(); + } catch (Exception e) { + // ignore + } + } + return newEmptyResultSet(); + } + + @Override + public long executeLargeUpdate() throws SQLException { + ensureParams(); + try { + executeAny(false); + } catch (SQLException e) { + if (e.getSQLState() != null) { + throw e; + } else { + throw new SQLException("Update failed", SqlExceptionUtils.SQL_STATE_SQL_ERROR, e.getCause()); + } + } + long row = getLargeUpdateCount(); + return row > 0L ? row : 0L; + } + + @Override + public void setByte(int parameterIndex, byte x) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setShort(int parameterIndex, short x) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setInt(int parameterIndex, int x) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setLong(int parameterIndex, long x) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setFloat(int parameterIndex, float x) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setDouble(int parameterIndex, double x) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setBigDecimal(int parameterIndex, BigDecimal x) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setString(int parameterIndex, String x) throws SQLException { + ensureOpen(); + + value = ClickHouseInputStream.of(x); + } + + @Override + public void setBytes(int parameterIndex, byte[] x) throws SQLException { + ensureOpen(); + + value = ClickHouseInputStream.of(x); + } + + @Override + public void setBinaryStream(int parameterIndex, InputStream x) throws SQLException { + ensureOpen(); + + value = ClickHouseInputStream.of(x); + } + + @Override + public void clearParameters() throws SQLException { + ensureOpen(); + + value = null; + } + + @Override + public void setObject(int parameterIndex, Object x) throws SQLException { + ensureOpen(); + + if (x instanceof ClickHouseWriter) { + final ClickHouseWriter writer = (ClickHouseWriter) x; + final ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() + .createPipedOutputStream(getConfig(), null); + value = stream.getInputStream(); + + // always run in async mode or it will not work + value.setUserData(DEFAULT_KEY, ClickHouseClient.submit(() -> { + try (ClickHouseOutputStream out = stream) { + writer.write(out); + } + return true; + })); + } else if (x instanceof InputStream) { + value = ClickHouseInputStream.of((InputStream) x); + } else if (x instanceof String) { + value = ClickHouseInputStream.of((String) x); + } else if (x instanceof byte[]) { + value = ClickHouseInputStream.of((byte[]) x); + } else if (x instanceof File) { + value = ClickHouseInputStream.of((File) x); + } else { + throw SqlExceptionUtils + .clientError("Only byte[], String, File, InputStream and ClickHouseWriter are supported"); + } + } + + @Override + public boolean execute() throws SQLException { + ensureParams(); + if (!batch.isEmpty()) { + throw SqlExceptionUtils.undeterminedExecutionError(); + } + + final String sql = getSql(); + @SuppressWarnings("unchecked") + final CompletableFuture future = (CompletableFuture) value.removeUserData(DEFAULT_KEY); + executeInsert(sql, value); + if (future != null) { + try { + future.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Execution of query was interrupted: %s", sql); + } catch (ExecutionException e) { + throw SqlExceptionUtils.handle(e.getCause()); + } + } + return false; + } + + @Override + public void addBatch() throws SQLException { + ensureOpen(); + + ensureParams(); + batch.add(value); + clearParameters(); + } + + @Override + public void clearBatch() throws SQLException { + ensureOpen(); + + this.batch.clear(); + } + + @Override + public void setArray(int parameterIndex, Array x) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setDate(int parameterIndex, Date x, Calendar cal) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setTime(int parameterIndex, Time x, Calendar cal) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setTimestamp(int parameterIndex, Timestamp x, Calendar cal) throws SQLException { + throw SqlExceptionUtils.clientError(ERROR_SET_PARAM); + } + + @Override + public void setNull(int parameterIndex, int sqlType, String typeName) throws SQLException { + ensureOpen(); + + value = ClickHouseInputStream.empty(); + } + + @Override + public ParameterMetaData getParameterMetaData() throws SQLException { + return paramMetaData; + } + + @Override + public void setObject(int parameterIndex, Object x, int targetSqlType, int scaleOrLength) throws SQLException { + setObject(parameterIndex, x); + } +} diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index f798ba500..ee0a81384 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -37,8 +37,10 @@ import com.clickhouse.client.ClickHouseDataType; import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseInputStream; +import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHousePipedOutputStream; import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseWriter; import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.data.ClickHouseBitmap; import com.clickhouse.client.data.ClickHouseExternalTable; @@ -46,6 +48,7 @@ import com.clickhouse.client.data.UnsignedLong; import com.clickhouse.jdbc.internal.InputBasedPreparedStatement; import com.clickhouse.jdbc.internal.SqlBasedPreparedStatement; +import com.clickhouse.jdbc.internal.StreamBasedPreparedStatement; import org.testng.Assert; import org.testng.SkipException; @@ -1579,6 +1582,79 @@ public void testInsertWithNullDateTime() throws SQLException { } } + @Test(groups = "integration") + public void testInsertWithFormat() throws SQLException { + Properties props = new Properties(); + try (ClickHouseConnection conn = newConnection(props); Statement s = conn.createStatement()) { + s.execute("drop table if exists test_insert_with_format; " + + "CREATE TABLE test_insert_with_format(i Int32, s String) ENGINE=Memory"); + try (PreparedStatement ps = conn.prepareStatement("INSERT INTO test_insert_with_format format CSV")) { + Assert.assertTrue(ps instanceof StreamBasedPreparedStatement); + Assert.assertEquals(ps.getParameterMetaData().getParameterCount(), 1); + Assert.assertEquals(ps.getParameterMetaData().getParameterClassName(1), String.class.getName()); + ps.setObject(1, ClickHouseInputStream.of("1,\\N\n2,two")); + Assert.assertEquals(ps.executeUpdate(), 2); + } + + try (ResultSet rs = s.executeQuery("select * from test_insert_with_format order by i")) { + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 1); + Assert.assertEquals(rs.getString(2), ""); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 2); + Assert.assertEquals(rs.getString(2), "two"); + Assert.assertFalse(rs.next()); + } + + s.execute("truncate table test_insert_with_format"); + + try (PreparedStatement ps = conn + .prepareStatement( + "INSERT INTO test_insert_with_format(s,i)SETTINGS insert_null_as_default=1 format JSONEachRow")) { + Assert.assertTrue(ps instanceof StreamBasedPreparedStatement); + ps.setString(1, "{\"i\":null,\"s\":null}"); + ps.addBatch(); + ps.setObject(1, "{\"i\":1,\"s\":\"one\"}"); + ps.addBatch(); + ps.setObject(1, new ClickHouseWriter() { + @Override + public void write(ClickHouseOutputStream out) throws IOException { + out.write("{\"i\":2,\"s\":\"22\"}".getBytes()); + } + }); + ps.addBatch(); + Assert.assertEquals(ps.executeBatch(), new int[] { 1, 1, 1 }); + } + + try (PreparedStatement ps = conn + .prepareStatement( + "INSERT INTO test_insert_with_format(s,i) select * from input('s String, i Int32') format CSV")) { + Assert.assertFalse(ps instanceof StreamBasedPreparedStatement); + ps.setInt(2, 3); + ps.setString(1, "three"); + Assert.assertEquals(ps.executeUpdate(), 1); + } + + try (PreparedStatement ps = conn.prepareStatement( + "select i,s from test_insert_with_format order by i format RowBinaryWithNamesAndTypes"); + ResultSet rs = ps.executeQuery()) { + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 0); + Assert.assertEquals(rs.getString(2), ""); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 1); + Assert.assertEquals(rs.getString(2), "one"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 2); + Assert.assertEquals(rs.getString(2), "22"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt(1), 3); + Assert.assertEquals(rs.getString(2), "three"); + Assert.assertFalse(rs.next()); + } + } + } + @Test(groups = "integration") public void testInsertWithSettings() throws SQLException { Properties props = new Properties(); From 1b483c667c17fbb9f8e294123166380093e20a44 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 30 Dec 2022 11:46:36 +0800 Subject: [PATCH 2/3] Skip a few cases before 22.5 due to breaking changes on server side --- .../com/clickhouse/jdbc/ClickHousePreparedStatementTest.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index ee0a81384..34335d0e2 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -1606,6 +1606,11 @@ public void testInsertWithFormat() throws SQLException { Assert.assertFalse(rs.next()); } + if (!conn.getServerVersion().check("[22.5,)")) { + throw new SkipException( + "Skip due to breaking change introduced by https://github.com/ClickHouse/ClickHouse/pull/35883"); + } + s.execute("truncate table test_insert_with_format"); try (PreparedStatement ps = conn From 513b271c02f3f868281d5d46f286b57abe77f349 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 30 Dec 2022 11:57:57 +0800 Subject: [PATCH 3/3] Skip a few more cases --- .../jdbc/ClickHousePreparedStatementTest.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index 34335d0e2..c51906304 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -1586,6 +1586,11 @@ public void testInsertWithNullDateTime() throws SQLException { public void testInsertWithFormat() throws SQLException { Properties props = new Properties(); try (ClickHouseConnection conn = newConnection(props); Statement s = conn.createStatement()) { + if (!conn.getServerVersion().check("[22.5,)")) { + throw new SkipException( + "Skip due to breaking change introduced by https://github.com/ClickHouse/ClickHouse/pull/35883"); + } + s.execute("drop table if exists test_insert_with_format; " + "CREATE TABLE test_insert_with_format(i Int32, s String) ENGINE=Memory"); try (PreparedStatement ps = conn.prepareStatement("INSERT INTO test_insert_with_format format CSV")) { @@ -1606,11 +1611,6 @@ public void testInsertWithFormat() throws SQLException { Assert.assertFalse(rs.next()); } - if (!conn.getServerVersion().check("[22.5,)")) { - throw new SkipException( - "Skip due to breaking change introduced by https://github.com/ClickHouse/ClickHouse/pull/35883"); - } - s.execute("truncate table test_insert_with_format"); try (PreparedStatement ps = conn