diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java index 3eb2559bc..21cfe6779 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseNode.java @@ -274,6 +274,20 @@ public Builder addOption(String option, String value) { return this; } + /** + * Removes an option from this node. + * + * @param option option to be removed, null value will be ignored + * @return this builder + */ + public Builder removeOption(String option) { + if (!ClickHouseChecker.isNullOrEmpty(option)) { + options.remove(option); + } + + return this; + } + /** * Sets all options for this node. Use null or empty value to clear all existing * options. diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java index 33bc639b6..19790e61a 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseResponseSummary.java @@ -257,4 +257,12 @@ public int getUpdateCount() { public boolean isEmpty() { return progress.get().isEmpty() && stats.get().isEmpty(); } + + @Override + public String toString() { + return new StringBuilder().append("ClickHouseResponseSummary [readBytes=").append(getReadBytes()) + .append(", readRows=").append(getReadRows()).append(", totalRowsToRead=").append(getTotalRowsToRead()) + .append(", writtenBytes=").append(getWrittenBytes()).append(", writtenRows=").append(getWrittenRows()) + .append(", updates=").append(getUpdateCount()).append(']').toString(); + } } diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java index d1b47522b..effc90e3c 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/internal/ClickHouseStatementImpl.java @@ -11,6 +11,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.clickhouse.client.ClickHouseChecker; import com.clickhouse.client.ClickHouseClient; @@ -18,6 +20,7 @@ import com.clickhouse.client.ClickHouseDataStreamFactory; import com.clickhouse.client.ClickHouseDeserializer; import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseResponseSummary; @@ -427,14 +430,27 @@ public void cancel() throws SQLException { final String qid; if ((qid = this.queryId) != null) { - ClickHouseClient.send(request.getServer(), String.format("KILL QUERY WHERE query_id='%s'", qid)) - .whenComplete((summary, exception) -> { - if (exception != null) { - log.warn("Failed to kill query [%s] due to: %s", qid, exception.getMessage()); - } else if (summary != null) { - log.debug("Killed query [%s]", qid); - } - }); + String sessionIdKey = ClickHouseClientOption.SESSION_ID.getKey(); + ClickHouseNode server = request.getServer(); + if (server.getOptions().containsKey(sessionIdKey)) { + server = ClickHouseNode.builder(request.getServer()).removeOption(sessionIdKey) + .removeOption(ClickHouseClientOption.SESSION_CHECK.getKey()) + .removeOption(ClickHouseClientOption.SESSION_TIMEOUT.getKey()).build(); + } + try { + List summaries = ClickHouseClient + .send(server, String.format("KILL QUERY WHERE query_id='%s'", qid)) + .get(request.getConfig().getConnectionTimeout(), TimeUnit.MILLISECONDS); + log.info("Killed query [%s]: %s", qid, summaries.get(0)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.warn("Interrupted for killing query [%s]", qid); + } catch (TimeoutException e) { + log.warn("Timed out after waiting %d ms for killing query [%s]", + request.getConfig().getConnectionTimeout(), qid); + } catch (Exception e) { // unexpected + throw SqlExceptionUtils.handle(e.getCause()); + } } if (request.getTransaction() != null) { request.getTransaction().abort(); diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java index 681bdecaf..9349cf9dc 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseStatementTest.java @@ -52,6 +52,15 @@ private Object[][] getTimeZoneTestOptions() { new Object[] { true }, new Object[] { false } }; } + @DataProvider(name = "connectionProperties") + private Object[][] getConnectionProperties() { + Properties emptyProps = new Properties(); + Properties sessionProps = new Properties(); + sessionProps.setProperty(ClickHouseClientOption.SESSION_ID.getKey(), UUID.randomUUID().toString()); + return new Object[][] { + new Object[] { emptyProps }, new Object[] { sessionProps } }; + } + @Test(groups = "integration") public void testJdbcEscapeSyntax() throws SQLException { try (ClickHouseConnection conn = newConnection(new Properties()); @@ -237,13 +246,15 @@ public void testAsyncInsert() throws SQLException { } } - @Test(groups = "integration") - public void testCancelQuery() throws Exception { - try (ClickHouseConnection conn = newConnection(new Properties()); + @Test(dataProvider = "connectionProperties", groups = "integration") + public void testCancelQuery(Properties props) throws Exception { + try (ClickHouseConnection conn = newConnection(props); ClickHouseStatement stmt = conn.createStatement();) { CountDownLatch c = new CountDownLatch(1); ClickHouseClient.submit(() -> stmt.executeQuery("select * from numbers(100000000)")).whenComplete( (rs, e) -> { + Assert.assertNull(e, "Should NOT have any exception"); + int index = 0; try { @@ -252,8 +263,9 @@ public void testCancelQuery() throws Exception { c.countDown(); } } + Assert.fail("Query should have been cancelled"); } catch (SQLException ex) { - // ignore + Assert.assertNotNull(ex, "Should end up with exception"); } }); try { @@ -261,6 +273,12 @@ public void testCancelQuery() throws Exception { } finally { stmt.cancel(); } + + try (ResultSet rs = stmt.executeQuery("select 5")) { + Assert.assertTrue(rs.next(), "Should have at least one record"); + Assert.assertEquals(rs.getInt(1), 5); + Assert.assertFalse(rs.next(), "Should have only one record"); + } } }