Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,20 @@ public Builder addOption(String option, String value) {
return this;
}

/**
* Removes an option from this node.
*
* @param option option to be removed, null value will be ignored
* @return this builder
*/
public Builder removeOption(String option) {
if (!ClickHouseChecker.isNullOrEmpty(option)) {
options.remove(option);
}

return this;
}

/**
* Sets all options for this node. Use null or empty value to clear all existing
* options.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,4 +257,12 @@ public int getUpdateCount() {
public boolean isEmpty() {
return progress.get().isEmpty() && stats.get().isEmpty();
}

@Override
public String toString() {
return new StringBuilder().append("ClickHouseResponseSummary [readBytes=").append(getReadBytes())
.append(", readRows=").append(getReadRows()).append(", totalRowsToRead=").append(getTotalRowsToRead())
.append(", writtenBytes=").append(getWrittenBytes()).append(", writtenRows=").append(getWrittenRows())
.append(", updates=").append(getUpdateCount()).append(']').toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,16 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.clickhouse.client.ClickHouseChecker;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseDataStreamFactory;
import com.clickhouse.client.ClickHouseDeserializer;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.ClickHouseResponseSummary;
Expand Down Expand Up @@ -427,14 +430,27 @@ public void cancel() throws SQLException {

final String qid;
if ((qid = this.queryId) != null) {
ClickHouseClient.send(request.getServer(), String.format("KILL QUERY WHERE query_id='%s'", qid))
.whenComplete((summary, exception) -> {
if (exception != null) {
log.warn("Failed to kill query [%s] due to: %s", qid, exception.getMessage());
} else if (summary != null) {
log.debug("Killed query [%s]", qid);
}
});
String sessionIdKey = ClickHouseClientOption.SESSION_ID.getKey();
ClickHouseNode server = request.getServer();
if (server.getOptions().containsKey(sessionIdKey)) {
server = ClickHouseNode.builder(request.getServer()).removeOption(sessionIdKey)
.removeOption(ClickHouseClientOption.SESSION_CHECK.getKey())
.removeOption(ClickHouseClientOption.SESSION_TIMEOUT.getKey()).build();
}
try {
List<ClickHouseResponseSummary> summaries = ClickHouseClient
.send(server, String.format("KILL QUERY WHERE query_id='%s'", qid))
.get(request.getConfig().getConnectionTimeout(), TimeUnit.MILLISECONDS);
log.info("Killed query [%s]: %s", qid, summaries.get(0));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted for killing query [%s]", qid);
} catch (TimeoutException e) {
log.warn("Timed out after waiting %d ms for killing query [%s]",
request.getConfig().getConnectionTimeout(), qid);
} catch (Exception e) { // unexpected
throw SqlExceptionUtils.handle(e.getCause());
}
}
if (request.getTransaction() != null) {
request.getTransaction().abort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,15 @@ private Object[][] getTimeZoneTestOptions() {
new Object[] { true }, new Object[] { false } };
}

@DataProvider(name = "connectionProperties")
private Object[][] getConnectionProperties() {
Properties emptyProps = new Properties();
Properties sessionProps = new Properties();
sessionProps.setProperty(ClickHouseClientOption.SESSION_ID.getKey(), UUID.randomUUID().toString());
return new Object[][] {
new Object[] { emptyProps }, new Object[] { sessionProps } };
}

@Test(groups = "integration")
public void testJdbcEscapeSyntax() throws SQLException {
try (ClickHouseConnection conn = newConnection(new Properties());
Expand Down Expand Up @@ -237,13 +246,15 @@ public void testAsyncInsert() throws SQLException {
}
}

@Test(groups = "integration")
public void testCancelQuery() throws Exception {
try (ClickHouseConnection conn = newConnection(new Properties());
@Test(dataProvider = "connectionProperties", groups = "integration")
public void testCancelQuery(Properties props) throws Exception {
try (ClickHouseConnection conn = newConnection(props);
ClickHouseStatement stmt = conn.createStatement();) {
CountDownLatch c = new CountDownLatch(1);
ClickHouseClient.submit(() -> stmt.executeQuery("select * from numbers(100000000)")).whenComplete(
(rs, e) -> {
Assert.assertNull(e, "Should NOT have any exception");

int index = 0;

try {
Expand All @@ -252,15 +263,22 @@ public void testCancelQuery() throws Exception {
c.countDown();
}
}
Assert.fail("Query should have been cancelled");
} catch (SQLException ex) {
// ignore
Assert.assertNotNull(ex, "Should end up with exception");
}
});
try {
c.await(5, TimeUnit.SECONDS);
} finally {
stmt.cancel();
}

try (ResultSet rs = stmt.executeQuery("select 5")) {
Assert.assertTrue(rs.next(), "Should have at least one record");
Assert.assertEquals(rs.getInt(1), 5);
Assert.assertFalse(rs.next(), "Should have only one record");
}
}
}

Expand Down