From aeb5878cb0320a7e36c2142392f0b4ff7efefd7b Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 15 Jul 2022 20:00:05 +0800 Subject: [PATCH 1/2] Use unbounded queue for batch insert --- .../internal/InputBasedPreparedStatement.java | 8 +++-- .../jdbc/ClickHousePreparedStatementTest.java | 30 +++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java index 9ffc0d362..9cbc8f1bd 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/InputBasedPreparedStatement.java @@ -77,7 +77,8 @@ protected InputBasedPreparedStatement(ClickHouseConnectionImpl connection, Click counter = 0; // it's important to make sure the queue has unlimited length - stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, null); + stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config.getWriteBufferSize(), 0, + config.getSocketTimeout(), null); } protected void ensureParams() throws SQLException { @@ -350,7 +351,10 @@ public void clearBatch() throws SQLException { // ignore } counter = 0; - stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(getConfig(), null); + + ClickHouseConfig config = getConfig(); + stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config.getWriteBufferSize(), 0, + config.getSocketTimeout(), null); } @Override diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index 25fe593d3..21979d35b 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -1368,6 +1368,36 @@ public void testQueryWithNamedParameter() throws SQLException { } } + @Test(groups = "integration") + public void testInsertBufferSize() throws Exception { + Properties props = new Properties(); + props.setProperty(ClickHouseClientOption.WRITE_BUFFER_SIZE.getKey(), "1"); + props.setProperty(ClickHouseClientOption.MAX_QUEUED_BUFFERS.getKey(), "1"); + try (ClickHouseConnection conn = newConnection(new Properties()); + Statement s = conn.createStatement()) { + s.execute("drop table if exists test_insert_buffer_size; " + + "CREATE TABLE test_insert_buffer_size(value String) ENGINE=Memory"); + try (PreparedStatement ps = conn.prepareStatement( + "INSERT INTO test_insert_buffer_size")) { + ps.setString(1, "1"); + ps.addBatch(); + ps.setString(1, "2"); + ps.addBatch(); + ps.setString(1, "3"); + ps.addBatch(); + ps.executeBatch(); + } + + try (ResultSet rs = s.executeQuery("select * from test_insert_buffer_size order by value")) { + int count = 1; + while (rs.next()) { + Assert.assertEquals(rs.getInt(1), count++); + } + Assert.assertEquals(count, 4); + } + } + } + @Test(groups = "integration") public void testInsertWithAndSelect() throws Exception { try (ClickHouseConnection conn = newConnection(new Properties()); From 6a11320c51cbf7222c416066ee1e469b427c6538 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Fri, 15 Jul 2022 20:16:31 +0800 Subject: [PATCH 2/2] Rename test and consider clearBatch --- .../jdbc/ClickHousePreparedStatementTest.java | 73 +++++++++++-------- 1 file changed, 43 insertions(+), 30 deletions(-) diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java index 21979d35b..c3ec03344 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHousePreparedStatementTest.java @@ -710,6 +710,49 @@ public void testBatchInsert() throws SQLException { } } + @Test(groups = "integration") + public void testBatchInsertWithoutUnboundedQueue() throws Exception { + Properties props = new Properties(); + props.setProperty(ClickHouseClientOption.WRITE_BUFFER_SIZE.getKey(), "1"); + props.setProperty(ClickHouseClientOption.MAX_QUEUED_BUFFERS.getKey(), "1"); + try (ClickHouseConnection conn = newConnection(new Properties()); + Statement s = conn.createStatement()) { + s.execute("drop table if exists test_insert_buffer_size; " + + "CREATE TABLE test_insert_buffer_size(value String) ENGINE=Memory"); + try (PreparedStatement ps = conn.prepareStatement( + "INSERT INTO test_insert_buffer_size")) { + ps.setString(1, "1"); + ps.addBatch(); + ps.setString(1, "2"); + ps.addBatch(); + ps.setString(1, "3"); + ps.addBatch(); + ps.executeBatch(); + + ps.setString(1, "4"); + ps.addBatch(); + ps.executeBatch(); + + ps.setString(1, "4"); + ps.addBatch(); + ps.clearBatch(); + ps.setString(1, "5"); + ps.addBatch(); + ps.setString(1, "6"); + ps.addBatch(); + ps.executeBatch(); + } + + try (ResultSet rs = s.executeQuery("select * from test_insert_buffer_size order by value")) { + int count = 1; + while (rs.next()) { + Assert.assertEquals(rs.getInt(1), count++); + } + Assert.assertEquals(count, 7); + } + } + } + @Test(groups = "integration") public void testQueryWithDateTime() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties()); @@ -1368,36 +1411,6 @@ public void testQueryWithNamedParameter() throws SQLException { } } - @Test(groups = "integration") - public void testInsertBufferSize() throws Exception { - Properties props = new Properties(); - props.setProperty(ClickHouseClientOption.WRITE_BUFFER_SIZE.getKey(), "1"); - props.setProperty(ClickHouseClientOption.MAX_QUEUED_BUFFERS.getKey(), "1"); - try (ClickHouseConnection conn = newConnection(new Properties()); - Statement s = conn.createStatement()) { - s.execute("drop table if exists test_insert_buffer_size; " - + "CREATE TABLE test_insert_buffer_size(value String) ENGINE=Memory"); - try (PreparedStatement ps = conn.prepareStatement( - "INSERT INTO test_insert_buffer_size")) { - ps.setString(1, "1"); - ps.addBatch(); - ps.setString(1, "2"); - ps.addBatch(); - ps.setString(1, "3"); - ps.addBatch(); - ps.executeBatch(); - } - - try (ResultSet rs = s.executeQuery("select * from test_insert_buffer_size order by value")) { - int count = 1; - while (rs.next()) { - Assert.assertEquals(rs.getInt(1), count++); - } - Assert.assertEquals(count, 4); - } - } - } - @Test(groups = "integration") public void testInsertWithAndSelect() throws Exception { try (ClickHouseConnection conn = newConnection(new Properties());