Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 65 additions & 107 deletions README.md

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions clickhouse-cli-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ Either [clickhouse-client](https://clickhouse.com/docs/en/interfaces/cli/) or [d

```xml
<dependency>
<!-- will stop using ru.yandex.clickhouse starting from 0.4.0 -->
<!-- please stop using ru.yandex.clickhouse as it's been deprecated -->
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-cli-client</artifactId>
<version>0.3.2-patch9</version>
<version>0.3.2-patch10</version>
</dependency>
```

Expand Down
55 changes: 42 additions & 13 deletions clickhouse-client/README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,55 @@
# ClickHouse Java Client

Async Java client for ClickHouse. `clickhouse-client` is an abstract module, so it does not work by itself until being used together with an implementation like `clickhouse-grpc-client` or `clickhouse-http-client`.
Async Java client for ClickHouse. `clickhouse-client` is an abstract module, so it does not work by itself until being used together with an implementation like `clickhouse-http-client`, `clickhouse-grpc-client` or `clickhouse-cli-client`.

## Configuration

You can pass any client option([common](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java), [http](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-http-client/src/main/java/com/clickhouse/client/http/config/ClickHouseHttpOption.java), [grpc](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/config/ClickHouseGrpcOption.java), and [cli](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/config/ClickHouseCommandLineOption.java)) to `ClickHouseRequest.option()` and [server setting](https://clickhouse.com/docs/en/operations/settings/) to `ClickHouseRequest.set()` before execution, for instance:

```java
client.connect("http://localhost/system")
.query("select 1")
// short version of option(ClickHouseClientOption.FORMAT, ClickHouseFormat.RowBinaryWithNamesAndTypes)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.option(ClickHouseClientOption.SOCKET_TIMEOUT, 30000 * 2) // 60 seconds
.set("max_rows_to_read", 100)
.set("read_overflow_mode", "throw")
.execute()
.whenComplete((response, throwable) -> {
if (throwable != null) {
log.error("Unexpected error", throwable);
} else {
try {
for (ClickHouseRecord rec : response.records()) {
// ...
}
} finally {
response.close();
}
}
});
```

[Default value](https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseDefaults.java) can be either configured via system property or environment variable.

## Quick Start

```xml
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-http-client</artifactId>
<version>0.3.2-patch9</version>
<version>0.3.2-patch10</version>
</dependency>
```

```java
// declare a server to connect to
ClickHouseNode server = ClickHouseNode.of("server1.domain", ClickHouseProtocol.HTTP, 8123, "my_db");
// declare a list of servers to connect to
ClickHouseNodes servers = ClickHouseNodes.of(
"jdbc:ch:http://server1.domain,server2.domain,server3.domain/my_db"
+ "?load_balancing_policy=random&health_check_interval=5000&failover=2");

// execute multiple queries in a worker thread one after another within same session
CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(server,
CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.send(servers.get(),
"create database if not exists test",
"use test", // change current database from my_db to test
"create table if not exists test_table(s String) engine=Memory",
Expand All @@ -30,20 +61,18 @@ CompletableFuture<List<ClickHouseResponseSummary>> future = ClickHouseClient.sen
// block current thread until queries completed, and then retrieve summaries
// List<ClickHouseResponseSummary> results = future.get();

try (ClickHouseClient client = ClickHouseClient.newInstance(server.getProtocol())) {
ClickHouseRequest<?> request = client.connect(server).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP)) {
ClickHouseRequest<?> request = client.connect(servers).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
// load data into a table and wait until it's completed
request.write().query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
request.write()
.query("insert into my_table select c2, c3 from input('c1 UInt8, c2 String, c3 Int32')")
.data(myInputStream).execute().thenAccept(response -> {
response.close();
});

// query with named parameter
try (ClickHouseResponse response = request.query(
ClickHouseParameterizedQuery.of(
request.getConfig(),
"select * from numbers(:limit)")
).params(100000).executeAndWait()) {
try (ClickHouseResponse response = request.query(ClickHouseParameterizedQuery.of(
request.getConfig(), "select * from numbers(:limit)")).params(100000).executeAndWait()) {
for (ClickHouseRecord r : response.records()) {
// Don't cache ClickHouseValue / ClickHouseRecord as they're reused for
// corresponding column / row
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ public static ClickHouseNode of(URI uri, ClickHouseNode template) {
}
if (protocol != ClickHouseProtocol.POSTGRESQL && scheme.charAt(scheme.length() - 1) == 's') {
params.put(ClickHouseClientOption.SSL.getKey(), Boolean.TRUE.toString());
params.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.NONE.name());
params.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
}

ClickHouseCredentials credentials = template.credentials;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.clickhouse.client;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -14,8 +16,10 @@
import java.util.Map;
import java.util.Objects;
import java.util.Map.Entry;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Optional;
import java.util.Properties;
Expand All @@ -39,6 +43,8 @@ public class ClickHouseRequest<SelfT extends ClickHouseRequest<SelfT>> implement
* Mutation request.
*/
public static class Mutation extends ClickHouseRequest<Mutation> {
private ClickHouseWriter writer;

protected Mutation(ClickHouseRequest<?> request, boolean sealed) {
super(request.getClient(), request.server, request.serverRef, request.options, sealed);
this.settings.putAll(request.settings);
Expand Down Expand Up @@ -105,6 +111,20 @@ public Mutation format(ClickHouseFormat format) {
return super.format(format);
}

/**
* Sets custom writer for streaming. This will create a piped stream between the
* writer and ClickHouse server.
*
* @param writer writer
* @return mutation request
*/
public Mutation data(ClickHouseWriter writer) {
checkSealed();

this.writer = changeProperty(PROP_WRITER, this.writer, writer);
return this;
}

/**
* Loads data from given file which may or may not be compressed.
*
Expand Down Expand Up @@ -197,6 +217,70 @@ public Mutation data(ClickHouseDeferredValue<ClickHouseInputStream> input) {
return this;
}

@Override
public CompletableFuture<ClickHouseResponse> execute() {
if (writer != null) {
ClickHouseConfig c = getConfig();
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(c, null);
data(stream.getInputStream());
CompletableFuture<ClickHouseResponse> future = null;
if (c.isAsync()) {
future = getClient().execute(isSealed() ? this : seal());
}
try (ClickHouseOutputStream out = stream) {
writer.write(out);
} catch (IOException e) {
throw new CompletionException(e);
}
if (future != null) {
return future;
}
}

return getClient().execute(isSealed() ? this : seal());
}

@Override
public ClickHouseResponse executeAndWait() throws ClickHouseException {
if (writer != null) {
ClickHouseConfig c = getConfig();
ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(c, null);
data(stream.getInputStream());
CompletableFuture<ClickHouseResponse> future = null;
if (c.isAsync()) {
future = getClient().execute(isSealed() ? this : seal());
}
try (ClickHouseOutputStream out = stream) {
writer.write(out);
} catch (IOException e) {
throw ClickHouseException.of(e, getServer());
}
if (future != null) {
try {
return future.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw ClickHouseException.forCancellation(e, getServer());
} catch (CancellationException e) {
throw ClickHouseException.forCancellation(e, getServer());
} catch (ExecutionException | UncheckedIOException e) {
Throwable cause = e.getCause();
if (cause == null) {
cause = e;
}
throw cause instanceof ClickHouseException ? (ClickHouseException) cause
: ClickHouseException.of(cause, getServer());
} catch (RuntimeException e) { // unexpected
throw ClickHouseException.of(e, getServer());
}
}
}

return getClient().executeAndWait(isSealed() ? this : seal());
}

/**
* Sends mutation requets for execution. Same as
* {@code client.execute(request.seal())}.
Expand Down Expand Up @@ -256,6 +340,7 @@ public Mutation seal() {
static final String PROP_PREPARED_QUERY = "preparedQuery";
static final String PROP_QUERY = "query";
static final String PROP_QUERY_ID = "queryId";
static final String PROP_WRITER = "writer";

private final boolean sealed;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testInvalidNodes() {
public void testValidNodes() {
Map<String, String> options = new HashMap<>();
options.put(ClickHouseClientOption.SSL.getKey(), "false");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
options.put(ClickHouseClientOption.DATABASE.getKey(), "db1");

Set<String> tags = new HashSet<>();
Expand All @@ -183,7 +183,7 @@ public void testValidNodes() {
public void testSecureNode() {
Map<String, String> options = new HashMap<>();
options.put(ClickHouseClientOption.SSL.getKey(), "true");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
options.put(ClickHouseClientOption.DATABASE.getKey(), "db1");

Assert.assertEquals(ClickHouseNode.of("https://node1:443/db1"),
Expand Down Expand Up @@ -218,7 +218,7 @@ public void testSingleWordNode() {
public void testNodeWithProtocol() {
Map<String, String> options = new HashMap<>();
options.put(ClickHouseClientOption.SSL.getKey(), "true");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());

for (ClickHouseProtocol p : ClickHouseProtocol.values()) {
Assert.assertEquals(ClickHouseNode.of(p.name() + ":///?#"),
Expand Down Expand Up @@ -254,7 +254,7 @@ public void testNodeWithHostAndPort() {
public void testNodeWithDatabase() {
Map<String, String> options = new HashMap<>();
options.put(ClickHouseClientOption.SSL.getKey(), "true");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());

Assert.assertEquals(ClickHouseNode.of("grpcs://node1:19100/"),
new ClickHouseNode("node1", ClickHouseProtocol.GRPC, 19100, null, options, null));
Expand Down Expand Up @@ -324,13 +324,13 @@ public void testNodeWithOptions() {
Map<String, String> options = new HashMap<>();
options.put(ClickHouseClientOption.ASYNC.getKey(), "false");
options.put(ClickHouseClientOption.SSL.getKey(), "true");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
options.put(ClickHouseClientOption.CONNECTION_TIMEOUT.getKey(), "500");

for (String uri : new String[] {
"https://node1?!async&ssl&connect_timeout=500",
"http://node1?async=false&ssl=true&sslmode=NONE&connect_timeout=500",
"http://node1?&&&&async=false&ssl&&&&&sslmode=NONE&connect_timeout=500&&&",
"http://node1?async=false&ssl=true&sslmode=STRICT&connect_timeout=500",
"http://node1?&&&&async=false&ssl&&&&&sslmode=STRICT&connect_timeout=500&&&",
}) {
Assert.assertEquals(ClickHouseNode.of(uri),
new ClickHouseNode("node1", ClickHouseProtocol.HTTP,
Expand Down Expand Up @@ -379,7 +379,7 @@ public void testQueryWithSlash() throws Exception {
Assert.assertEquals(server.toUri(), new URI("http://localhost:1234?/a/b/c=d"));

Assert.assertEquals(ClickHouseNode.of("https://myserver/db/1/2/3?a%20=%201&b=/root/my.crt").toUri(),
new URI("http://myserver:8443/db/1/2/3?ssl=true&sslmode=NONE&a%20=%201&b=/root/my.crt"));
new URI("http://myserver:8443/db/1/2/3?ssl=true&sslmode=STRICT&a%20=%201&b=/root/my.crt"));
}

@Test(groups = { "integration" })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.clickhouse.client.ClickHouseNode.Status;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.client.config.ClickHouseSslMode;

import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -213,7 +214,7 @@ public void testSingleNodeList() {

Map<String, String> options = new HashMap<>();
options.put(ClickHouseClientOption.SSL.getKey(), "true");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), "NONE");
options.put(ClickHouseClientOption.SSL_MODE.getKey(), ClickHouseSslMode.STRICT.name());
options.put(ClickHouseClientOption.DATABASE.getKey(), "db1");

Assert.assertEquals(ClickHouseNodes.of("https://node1:443/db1").nodes.get(0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

import com.clickhouse.client.ClickHouseClientBuilder.Agent;
Expand Down Expand Up @@ -909,6 +910,46 @@ public void testCustomRead() throws Exception {
Assert.assertEquals(count, 1000L);
}

@Test(groups = { "integration" })
public void testCustomWriter() throws Exception {
ClickHouseNode server = getServer();
ClickHouseClient.send(server, "drop table if exists test_custom_writer",
"create table test_custom_writer(a Int8) engine=Memory")
.get();

try (ClickHouseClient client = getClient()) {
AtomicInteger i = new AtomicInteger(1);
ClickHouseRequest.Mutation req = client.connect(server).write().format(ClickHouseFormat.RowBinary)
.table("test_custom_writer").data(o -> {
o.write(i.getAndIncrement());
});
for (boolean b : new boolean[] { true, false }) {
req.option(ClickHouseClientOption.ASYNC, b);

try (ClickHouseResponse resp = req.send().get()) {
Assert.assertNotNull(resp);
}

try (ClickHouseResponse resp = req.sendAndWait()) {
Assert.assertNotNull(resp);
}

try (ClickHouseResponse resp = req.execute().get()) {
Assert.assertNotNull(resp);
}

try (ClickHouseResponse resp = req.executeAndWait()) {
Assert.assertNotNull(resp);
}
}

try (ClickHouseResponse resp = client.connect(server).query("select count(1) from test_custom_writer")
.executeAndWait()) {
Assert.assertEquals(resp.firstRecord().getValue(0).asInteger(), i.get() - 1);
}
}
}

@Test(groups = { "integration" })
public void testDumpAndLoadFile() throws Exception {
// super.testLoadRawData();
Expand Down
Loading