Skip to content

Commit

Permalink
Primitive array, serializable option, and shaded jars
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Oct 17, 2021
1 parent 2a61ab3 commit d09d385
Show file tree
Hide file tree
Showing 50 changed files with 4,328 additions and 385 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ static CompletableFuture<ClickHouseResponseSummary> load(ClickHouseNode server,
// must run in async mode so that we won't hold everything in memory
try (ClickHouseClient client = ClickHouseClient.builder()
.nodeSelector(ClickHouseNodeSelector.of(theServer.getProtocol()))
.addOption(ClickHouseClientOption.ASYNC, true).build()) {
.option(ClickHouseClientOption.ASYNC, true).build()) {
ClickHousePipedStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedStream(client.getConfig());
// execute query in a separate thread(because async is explicitly set to true)
Expand Down Expand Up @@ -333,7 +333,7 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
// set async to false so that we don't have to create additional thread
try (ClickHouseClient client = ClickHouseClient.builder()
.nodeSelector(ClickHouseNodeSelector.of(theServer.getProtocol()))
.addOption(ClickHouseClientOption.ASYNC, false).build()) {
.option(ClickHouseClientOption.ASYNC, false).build()) {
ClickHouseRequest<?> request = client.connect(theServer).format(ClickHouseFormat.RowBinary);
if ((boolean) ClickHouseDefaults.AUTO_SESSION.getEffectiveDefaultValue() && queries.size() > 1) {
request.session(UUID.randomUUID().toString(), false);
Expand Down Expand Up @@ -371,7 +371,7 @@ static CompletableFuture<ClickHouseResponseSummary> send(ClickHouseNode server,
// set async to false so that we don't have to create additional thread
try (ClickHouseClient client = ClickHouseClient.builder()
.nodeSelector(ClickHouseNodeSelector.of(theServer.getProtocol()))
.addOption(ClickHouseClientOption.ASYNC, false).build();
.option(ClickHouseClientOption.ASYNC, false).build();
ClickHouseResponse resp = client.connect(theServer).format(ClickHouseFormat.RowBinary).query(sql)
.params(params).execute().get()) {
return resp.getSummary();
Expand Down Expand Up @@ -441,7 +441,7 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
// set async to false so that we don't have to create additional thread
try (ClickHouseClient client = ClickHouseClient.builder()
.nodeSelector(ClickHouseNodeSelector.of(theServer.getProtocol()))
.addOption(ClickHouseClientOption.ASYNC, false).build()) {
.option(ClickHouseClientOption.ASYNC, false).build()) {
// format doesn't matter here as we only need a summary
ClickHouseRequest<?> request = client.connect(theServer).format(ClickHouseFormat.RowBinary)
.query(query);
Expand Down Expand Up @@ -499,7 +499,7 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
// set async to false so that we don't have to create additional thread
try (ClickHouseClient client = ClickHouseClient.builder()
.nodeSelector(ClickHouseNodeSelector.of(theServer.getProtocol()))
.addOption(ClickHouseClientOption.ASYNC, false).build()) {
.option(ClickHouseClientOption.ASYNC, false).build()) {
// format doesn't matter here as we only need a summary
ClickHouseRequest<?> request = client.connect(theServer).format(ClickHouseFormat.RowBinary);
for (String[] p : params) {
Expand All @@ -513,38 +513,6 @@ static CompletableFuture<List<ClickHouseResponseSummary>> send(ClickHouseNode se
});
}

/**
* Tests if the given server is alive or not. Unlike other methods, it's a
* synchronous call with minimum overhead(e.g. tiny buffer, no compression and
* no deserialization etc).
*
* @param server server to test
* @param timeout timeout in millisecond
* @return true if the server is alive; false otherwise
*/
static boolean test(ClickHouseNode server, int timeout) {
if (server != null) {
server = ClickHouseCluster.probe(server, timeout);

try (ClickHouseClient client = ClickHouseClient.builder()
.nodeSelector(ClickHouseNodeSelector.of(server.getProtocol()))
.addOption(ClickHouseClientOption.ASYNC, false) // use current thread
.addOption(ClickHouseClientOption.CONNECTION_TIMEOUT, timeout)
.addOption(ClickHouseClientOption.SOCKET_TIMEOUT, timeout)
.addOption(ClickHouseClientOption.MAX_BUFFER_SIZE, 8) // actually 4 bytes should be enough
.addOption(ClickHouseClientOption.MAX_QUEUED_BUFFERS, 1).build();
ClickHouseResponse resp = client.connect(server).compression(ClickHouseCompression.NONE)
.format(ClickHouseFormat.TabSeparated).query("SELECT 1").execute()
.get(timeout, TimeUnit.MILLISECONDS)) {
return true;
} catch (Exception e) {
// ignore
}
}

return false;
}

/**
* Tests whether the given protocol is supported or not. An advanced client can
* support as many protocols as needed.
Expand Down Expand Up @@ -618,6 +586,37 @@ default void init(ClickHouseConfig config) {
ClickHouseChecker.nonNull(config, "configuration");
}

/**
* Tests if the given server is alive or not. Pay attention that it's a
* synchronous call with minimum overhead(e.g. tiny buffer, no compression and
* no deserialization etc).
*
* @param server server to test
* @param timeout timeout in millisecond
* @return true if the server is alive; false otherwise
*/
default boolean ping(ClickHouseNode server, int timeout) {
if (server != null) {
server = ClickHouseCluster.probe(server, timeout);

try (ClickHouseResponse resp = connect(server) // create request
.option(ClickHouseClientOption.ASYNC, false) // use current thread
.option(ClickHouseClientOption.CONNECTION_TIMEOUT, timeout)
.option(ClickHouseClientOption.SOCKET_TIMEOUT, timeout)
.option(ClickHouseClientOption.MAX_BUFFER_SIZE, 8) // actually 4 bytes should be enough
.option(ClickHouseClientOption.MAX_QUEUED_BUFFERS, 1) // enough with only one buffer
.compression(ClickHouseCompression.NONE) // no compression required for such a small packet
.format(ClickHouseFormat.TabSeparated).query("SELECT 1").execute()
.get(timeout, TimeUnit.MILLISECONDS)) {
return true;
} catch (Exception e) {
// ignore
}
}

return false;
}

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
package com.clickhouse.client;

import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;

import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseConfigOption;
import com.clickhouse.client.config.ClickHouseDefaults;

Expand Down Expand Up @@ -47,7 +51,7 @@ public class ClickHouseClientBuilder {
protected Object metricRegistry;
protected ClickHouseNodeSelector nodeSelector;

protected final Map<ClickHouseConfigOption, Object> options;
protected final Map<ClickHouseConfigOption, Serializable> options;

/**
* Default constructor.
Expand Down Expand Up @@ -106,7 +110,7 @@ public ClickHouseClient build() {

if (client == null) {
throw new IllegalStateException(
ClickHouseUtils.format("No suitable client(out of %d) found in classpath.", counter));
ClickHouseUtils.format("No suitable ClickHouse client(out of %d) found in classpath.", counter));
} else {
client.init(getConfig());
}
Expand All @@ -122,9 +126,11 @@ public ClickHouseClient build() {
* @param value value
* @return this builder
*/
public ClickHouseClientBuilder addOption(ClickHouseConfigOption option, Object value) {
Object oldValue = options.put(ClickHouseChecker.nonNull(option, "option"),
ClickHouseChecker.nonNull(value, "value"));
public ClickHouseClientBuilder option(ClickHouseConfigOption option, Serializable value) {
if (option == null || value == null) {
throw new IllegalArgumentException("Non-null option and value are required");
}
Object oldValue = options.put(option, value);
if (oldValue == null || !value.equals(oldValue)) {
resetConfig();
}
Expand All @@ -150,12 +156,39 @@ public ClickHouseClientBuilder removeOption(ClickHouseConfigOption option) {
/**
* Sets options.
*
* @param options non-null map containing all options
* @param options map containing all options
* @return this builder
*/
public ClickHouseClientBuilder options(Map<ClickHouseConfigOption, Serializable> options) {
if (options != null && !options.isEmpty()) {
this.options.putAll(options);
resetConfig();
}

return this;
}

/**
* Sets options.
*
* @param options options
* @return this builder
*/
public ClickHouseClientBuilder options(Map<ClickHouseConfigOption, Object> options) {
if (ClickHouseChecker.nonNull(options, "options").size() > 0) {
options.putAll(options);
public ClickHouseClientBuilder options(Properties options) {
if (options != null && !options.isEmpty()) {
for (Entry<Object, Object> e : options.entrySet()) {
Object key = e.getKey();
Object value = e.getValue();
if (key == null || value == null) {
continue;
}

ClickHouseClientOption o = ClickHouseClientOption.fromKey(key.toString());
if (o != null) {
this.options.put(o, ClickHouseConfigOption.fromString(value.toString(), o.getValueType()));
}
}

resetConfig();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public static ClickHouseNode probe(ClickHouseNode node, int timeout) {
} else if (buf[3] == 0) {
p = ClickHouseProtocol.MYSQL;
} else if (buf[0] == 72 && buf[9] == 52) {
p = ClickHouseProtocol.NATIVE;
p = ClickHouseProtocol.TCP;
}
}
} catch (IOException e) {
Expand Down Expand Up @@ -313,9 +313,18 @@ protected void check() {
// detect flaky node and check it in a different way(less frequency)
try {
boolean passed = true;
int timeout = 5000;
for (int i = 0; i < unhealthyNodes.size(); i++) {
ClickHouseNode node = unhealthyNodes.get(i);
if (ClickHouseClient.test(node, 5000)) { // another configuration?
ClickHouseNode node = probe(unhealthyNodes.get(i), timeout);

// probe is faster than ping but it cannot tell if the server works or not
boolean isAlive = false;
try (ClickHouseClient client = ClickHouseClient.newInstance(node.getProtocol())) {
isAlive = client.ping(node, timeout);
} catch (Exception e) {
// ignore
}
if (isAlive) { // another configuration?
update(node, Status.HEALTHY);
} else {
passed = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ public final class ClickHouseColumn implements Serializable {
private List<ClickHouseColumn> nested;
private List<String> parameters;

private int arrayLevel;
private ClickHouseColumn arrayBaseColumn;

private static ClickHouseColumn update(ClickHouseColumn column) {
int size = column.parameters.size();
switch (column.dataType) {
Expand All @@ -45,6 +48,18 @@ private static ClickHouseColumn update(ClickHouseColumn column) {
column.baseType = ClickHouseDataType.of(column.parameters.get(1));
}
break;
case Array:
column.arrayLevel = 1;
column.arrayBaseColumn = column.nested.get(0);
while (column.arrayLevel < 255) {
if (column.arrayBaseColumn.dataType == ClickHouseDataType.Array) {
column.arrayLevel++;
column.arrayBaseColumn = column.arrayBaseColumn.nested.get(0);
} else {
break;
}
}
break;
case DateTime:
if (size >= 2) { // same as DateTime64
column.scale = Integer.parseInt(column.parameters.get(0));
Expand Down Expand Up @@ -140,6 +155,10 @@ protected static int readColumn(String args, int startIndex, int len, String nam
int endIndex = ClickHouseUtils.skipBrackets(args, index, len, '(');
List<ClickHouseColumn> nestedColumns = new LinkedList<>();
readColumn(args, index + 1, endIndex - 1, "", nestedColumns);
if (nestedColumns.size() != 1) {
throw new IllegalArgumentException(
"Array can have one and only one nested column, but we got: " + nestedColumns.size());
}
column = new ClickHouseColumn(ClickHouseDataType.Array, name, args.substring(startIndex, endIndex),
nullable, lowCardinality, null, nestedColumns);
i = endIndex;
Expand All @@ -158,6 +177,10 @@ protected static int readColumn(String args, int startIndex, int len, String nam
i = readColumn(args, i, endIndex, "", nestedColumns) - 1;
}
}
if (nestedColumns.size() != 2) {
throw new IllegalArgumentException(
"Map should have two nested columns(key and value), but we got: " + nestedColumns.size());
}
column = new ClickHouseColumn(ClickHouseDataType.Map, name, args.substring(startIndex, endIndex), nullable,
lowCardinality, null, nestedColumns);
i = endIndex;
Expand All @@ -168,8 +191,12 @@ protected static int readColumn(String args, int startIndex, int len, String nam
}
i = ClickHouseUtils.skipBrackets(args, index, len, '(');
String originalTypeName = args.substring(startIndex, i);
List<ClickHouseColumn> nestedColumns = parse(args.substring(index + 1, i - 1));
if (nestedColumns.isEmpty()) {
throw new IllegalArgumentException("Nested should have at least one nested column");
}
column = new ClickHouseColumn(ClickHouseDataType.Nested, name, originalTypeName, nullable, lowCardinality,
null, parse(args.substring(index + 1, i - 1)));
null, nestedColumns);
} else if (args.startsWith(KEYWORD_TUPLE, i)) {
int index = args.indexOf('(', i + KEYWORD_TUPLE.length());
if (index < i) {
Expand All @@ -185,7 +212,9 @@ protected static int readColumn(String args, int startIndex, int len, String nam
i = readColumn(args, i, endIndex, "", nestedColumns) - 1;
}
}

if (nestedColumns.isEmpty()) {
throw new IllegalArgumentException("Tuple should have at least one nested column");
}
column = new ClickHouseColumn(ClickHouseDataType.Tuple, name, args.substring(startIndex, endIndex),
nullable, lowCardinality, null, nestedColumns);
}
Expand Down Expand Up @@ -324,15 +353,15 @@ private ClickHouseColumn(ClickHouseDataType dataType, String columnName, String
this.nullable = nullable;
this.lowCardinality = lowCardinality;

if (parameters == null || parameters.size() == 0) {
if (parameters == null || parameters.isEmpty()) {
this.parameters = Collections.emptyList();
} else {
List<String> list = new ArrayList<>(parameters.size());
list.addAll(parameters);
this.parameters = Collections.unmodifiableList(list);
}

if (nestedColumns == null || nestedColumns.size() == 0) {
if (nestedColumns == null || nestedColumns.isEmpty()) {
this.nested = Collections.emptyList();
} else {
List<ClickHouseColumn> list = new ArrayList<>(nestedColumns.size());
Expand All @@ -341,6 +370,34 @@ private ClickHouseColumn(ClickHouseDataType dataType, String columnName, String
}
}

public boolean isAggregateFunction() {
return dataType == ClickHouseDataType.AggregateFunction;
}

public boolean isArray() {
return dataType == ClickHouseDataType.Array;
}

public boolean isMap() {
return dataType == ClickHouseDataType.Map;
}

public boolean isNested() {
return dataType == ClickHouseDataType.Nested;
}

public boolean isTuple() {
return dataType == ClickHouseDataType.Tuple;
}

public int getArrayNestedLevel() {
return arrayLevel;
}

public ClickHouseColumn getArrayBaseColumn() {
return arrayBaseColumn;
}

public ClickHouseDataType getDataType() {
return dataType;
}
Expand Down
Loading

0 comments on commit d09d385

Please sign in to comment.