From b16ce07389dbf516cbdb5168ce32462dc2d0ae67 Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Sat, 6 Aug 2022 11:00:06 +0800 Subject: [PATCH 1/2] Remove sessionId before cancelling query --- .../com/clickhouse/client/ClickHouseNode.java | 14 ++++++ .../client/ClickHouseResponseSummary.java | 8 ++++ .../internal/ClickHouseStatementImpl.java | 32 +++++++++---- .../jdbc/ClickHouseStatementTest.java | 46 ++++++++++++++++++- 4 files changed, 91 insertions(+), 9 deletions(-) diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java index 3eb2559bc..21cfe6779 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java @@ -274,6 +274,20 @@ public Builder addOption(String option, String value) { return this; } + /** + * Removes an option from this node. + * + * @param option option to be removed, null value will be ignored + * @return this builder + */ + public Builder removeOption(String option) { + if (!ClickHouseChecker.isNullOrEmpty(option)) { + options.remove(option); + } + + return this; + } + /** * Sets all options for this node. Use null or empty value to clear all existing * options. diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java index 33bc639b6..19790e61a 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java @@ -257,4 +257,12 @@ public int getUpdateCount() { public boolean isEmpty() { return progress.get().isEmpty() && stats.get().isEmpty(); } + + @Override + public String toString() { + return new StringBuilder().append("ClickHouseResponseSummary [readBytes=").append(getReadBytes()) + .append(", readRows=").append(getReadRows()).append(", totalRowsToRead=").append(getTotalRowsToRead()) + .append(", writtenBytes=").append(getWrittenBytes()).append(", writtenRows=").append(getWrittenRows()) + .append(", updates=").append(getUpdateCount()).append(']').toString(); + } } diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java index d1b47522b..effc90e3c 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java @@ -11,6 +11,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.clickhouse.client.ClickHouseChecker; import com.clickhouse.client.ClickHouseClient; @@ -18,6 +20,7 @@ import com.clickhouse.client.ClickHouseDataStreamFactory; import com.clickhouse.client.ClickHouseDeserializer; import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseResponseSummary; @@ -427,14 +430,27 @@ public void cancel() throws SQLException { final String qid; if ((qid = this.queryId) != null) { - ClickHouseClient.send(request.getServer(), String.format("KILL QUERY WHERE query_id='%s'", qid)) - .whenComplete((summary, exception) -> { - if (exception != null) { - log.warn("Failed to kill query [%s] due to: %s", qid, exception.getMessage()); - } else if (summary != null) { - log.debug("Killed query [%s]", qid); - } - }); + String sessionIdKey = ClickHouseClientOption.SESSION_ID.getKey(); + ClickHouseNode server = request.getServer(); + if (server.getOptions().containsKey(sessionIdKey)) { + server = ClickHouseNode.builder(request.getServer()).removeOption(sessionIdKey) + .removeOption(ClickHouseClientOption.SESSION_CHECK.getKey()) + .removeOption(ClickHouseClientOption.SESSION_TIMEOUT.getKey()).build(); + } + try { + List summaries = ClickHouseClient + .send(server, String.format("KILL QUERY WHERE query_id='%s'", qid)) + .get(request.getConfig().getConnectionTimeout(), TimeUnit.MILLISECONDS); + log.info("Killed query [%s]: %s", qid, summaries.get(0)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted for killing query [%s]", qid); + } catch (TimeoutException e) { + log.warn("Timed out after waiting %d ms for killing query [%s]", + request.getConfig().getConnectionTimeout(), qid); + } catch (Exception e) { // unexpected + throw SqlExceptionUtils.handle(e.getCause()); + } } if (request.getTransaction() != null) { request.getTransaction().abort(); diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java index 681bdecaf..63b6553ac 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java @@ -252,8 +252,9 @@ public void testCancelQuery() throws Exception { c.countDown(); } } + Assert.fail("Query should have been cancelled"); } catch (SQLException ex) { - // ignore + Assert.assertNotNull(ex); } }); try { @@ -261,6 +262,49 @@ public void testCancelQuery() throws Exception { } finally { stmt.cancel(); } + + try (ResultSet rs = stmt.executeQuery("select 5")) { + Assert.assertTrue(rs.next(), "Should have at least one record"); + Assert.assertEquals(rs.getInt(1), 5); + Assert.assertFalse(rs.next(), "Should have only one record"); + } + } + } + + @Test(groups = "integration") + public void testCancelQueryWithSession() throws Exception { + Properties props = new Properties(); + props.setProperty(ClickHouseClientOption.SESSION_ID.getKey(), UUID.randomUUID().toString()); + props.setProperty(ClickHouseClientOption.REPEAT_ON_SESSION_LOCK.getKey(), "false"); + try (ClickHouseConnection conn = newConnection(props); + ClickHouseStatement stmt = conn.createStatement();) { + CountDownLatch c = new CountDownLatch(1); + ClickHouseClient.submit(() -> stmt.executeQuery("select * from numbers(100000000)")).whenComplete( + (rs, e) -> { + int index = 0; + + try { + while (rs.next()) { + if (index++ < 1) { + c.countDown(); + } + } + Assert.fail("Query should have been cancelled"); + } catch (SQLException ex) { + Assert.assertNotNull(ex); + } + }); + try { + c.await(5, TimeUnit.SECONDS); + } finally { + stmt.cancel(); + } + + try (ResultSet rs = stmt.executeQuery("select 5")) { + Assert.assertTrue(rs.next(), "Should have at least one record"); + Assert.assertEquals(rs.getInt(1), 5); + Assert.assertFalse(rs.next(), "Should have only one record"); + } } } From 453e4db8dbf01c7f66a8f4024d92bcfcbc0052af Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Sat, 6 Aug 2022 17:07:22 +0800 Subject: [PATCH 2/2] Update tests and consolidate similar ones --- .../jdbc/ClickHouseStatementTest.java | 54 +++++-------------- 1 file changed, 14 insertions(+), 40 deletions(-) diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java index 63b6553ac..9349cf9dc 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java @@ -52,6 +52,15 @@ private Object[][] getTimeZoneTestOptions() { new Object[] { true }, new Object[] { false } }; } + @DataProvider(name = "connectionProperties") + private Object[][] getConnectionProperties() { + Properties emptyProps = new Properties(); + Properties sessionProps = new Properties(); + sessionProps.setProperty(ClickHouseClientOption.SESSION_ID.getKey(), UUID.randomUUID().toString()); + return new Object[][] { + new Object[] { emptyProps }, new Object[] { sessionProps } }; + } + @Test(groups = "integration") public void testJdbcEscapeSyntax() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties()); @@ -237,50 +246,15 @@ public void testAsyncInsert() throws SQLException { } } - @Test(groups = "integration") - public void testCancelQuery() throws Exception { - try (ClickHouseConnection conn = newConnection(new Properties()); - ClickHouseStatement stmt = conn.createStatement();) { - CountDownLatch c = new CountDownLatch(1); - ClickHouseClient.submit(() -> stmt.executeQuery("select * from numbers(100000000)")).whenComplete( - (rs, e) -> { - int index = 0; - - try { - while (rs.next()) { - if (index++ < 1) { - c.countDown(); - } - } - Assert.fail("Query should have been cancelled"); - } catch (SQLException ex) { - Assert.assertNotNull(ex); - } - }); - try { - c.await(5, TimeUnit.SECONDS); - } finally { - stmt.cancel(); - } - - try (ResultSet rs = stmt.executeQuery("select 5")) { - Assert.assertTrue(rs.next(), "Should have at least one record"); - Assert.assertEquals(rs.getInt(1), 5); - Assert.assertFalse(rs.next(), "Should have only one record"); - } - } - } - - @Test(groups = "integration") - public void testCancelQueryWithSession() throws Exception { - Properties props = new Properties(); - props.setProperty(ClickHouseClientOption.SESSION_ID.getKey(), UUID.randomUUID().toString()); - props.setProperty(ClickHouseClientOption.REPEAT_ON_SESSION_LOCK.getKey(), "false"); + @Test(dataProvider = "connectionProperties", groups = "integration") + public void testCancelQuery(Properties props) throws Exception { try (ClickHouseConnection conn = newConnection(props); ClickHouseStatement stmt = conn.createStatement();) { CountDownLatch c = new CountDownLatch(1); ClickHouseClient.submit(() -> stmt.executeQuery("select * from numbers(100000000)")).whenComplete( (rs, e) -> { + Assert.assertNull(e, "Should NOT have any exception"); + int index = 0; try { @@ -291,7 +265,7 @@ public void testCancelQueryWithSession() throws Exception { } Assert.fail("Query should have been cancelled"); } catch (SQLException ex) { - Assert.assertNotNull(ex); + Assert.assertNotNull(ex, "Should end up with exception"); } }); try {