diff --git a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/misc/StreamBenchmark.java b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/misc/StreamBenchmark.java index 9b4cd185f..2c1fe693d 100644 --- a/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/misc/StreamBenchmark.java +++ b/clickhouse-benchmark/src/main/java/com/clickhouse/benchmark/misc/StreamBenchmark.java @@ -140,7 +140,6 @@ public void wrapped(StreamState state, Blackhole consumer) throws IOException { if ((count = in.pipe(out)) != state.samples) { throw new IllegalStateException(String.format("Expect %d bytes but got %d", size, count)); } - out.flush(); } if (!Arrays.equals(state.bytes, bao.toByteArray())) { throw new IllegalStateException("Incorrect result"); @@ -157,7 +156,6 @@ public void async(StreamState state, Blackhole consumer) throws IOException { if ((count = in.pipe(out)) != state.samples) { throw new IllegalStateException(String.format("Expect %d bytes but got %d", size, count)); } - out.flush(); } if (!Arrays.equals(state.bytes, bao.toByteArray())) { throw new IllegalStateException("Incorrect result"); diff --git a/clickhouse-cli-client/README.md b/clickhouse-cli-client/README.md new file mode 100644 index 000000000..649444ac3 --- /dev/null +++ b/clickhouse-cli-client/README.md @@ -0,0 +1,71 @@ +# ClickHouse Command-line Client + +This is a thin wrapper of ClickHouse native command-line client. It provides an alternative way to communicate with ClickHouse, which might be of use when you prefer: + +- TCP/native protocol over HTTP or gRPC +- native CLI client instead of pure Java implementation +- an example of implementing SPI defined in `clickhouse-client` module + +Either [clickhouse-client](https://clickhouse.com/docs/en/interfaces/cli/) or [docker](https://docs.docker.com/get-docker/) must be installed prior to use. And it's important to understand that this module uses sub-process(in addition to threads) and file-based streaming, meaning 1) it's not as fast as native CLI client or pure Java implementation, although it's close in the case of dumping and loading data; and 2) it's not suitable for scenarios like dealing with many queries in short period of time. + +## Limitations and Known Issues + +- Only `max_result_rows` and `result_overflow_mode` two settings are currently supported +- ClickHouseResponseSummary is always empty - see ClickHouse/ClickHouse#37241 +- Session is not supported - see ClickHouse/ClickHouse#37308 + +## Maven Dependency + +```xml + + + com.clickhouse + clickhouse-cli-client + 0.3.2-patch9 + +``` + +## Examples + +```java +// make sure 'clickhouse-client' or 'docker' is in PATH before you start the program +// alternatively, configure CLI path in either Java system property or environment variable, for examples: +// CHC_CLICKHOUSE_CLI_PATH=/path/to/clickhouse-client CHC_DOCKER_CLI_PATH=/path/to/docker java MyProgram +// java -Dchc_clickhouse_cli_path=/path/to/clickhouse-client -Dchc_docker_cli_path=/path/to/docker MyProgram + +// clickhouse-cli-client uses TCP protocol +ClickHouseProtocol preferredProtocol = ClickHouseProtocol.TCP; +// connect to my-server, use default port(9000) of TCP/native protocol +ClickHouseNode server = ClickHouseNode.builder().host("my-server").port(preferredProtocol).build(); + +// declares a file +ClickHouseFile file = ClickHouseFile.of("data.csv"); + +// dump query results into the file - format is CSV, according to file extension +ClickHouseClient.dump(server, "select * from some_table", file).get(); + +// now load it into my_table, using CSV format +ClickHouseClient.load(server, "my_table", file).get(); + +// it can be used in the same way as any other client +try (ClickHouseClient client = ClickHouseClient.newInstance(preferredProtocol); + ClickHouseResponse response = client.connect(server) + .query("select * from numbers(:limit)") + .params(1000).executeAndWait()) { + for (ClickHouseRecord r : response.records()) { + int num = r.getValue(0).asInteger(); + String str = r.getValue(0).asString(); + } +} + +// and of course it's part of JDBC driver +try (Connection conn = DriverManager.getConnect("jdbc:ch:tcp://my-server", "default", ""); + PreparedStatement stmt = conn.preparedStatement("select * from numbers(?)")) { + stmt.setInt(1, 1000); + ResultSet rs = stmt.executeQuery(); + while (rs.next()) { + int num = rs.getInt(1); + String str = rs.getString(1); + } +} +``` diff --git a/clickhouse-cli-client/pom.xml b/clickhouse-cli-client/pom.xml new file mode 100644 index 000000000..497d9ac5d --- /dev/null +++ b/clickhouse-cli-client/pom.xml @@ -0,0 +1,121 @@ + + 4.0.0 + + + com.clickhouse + clickhouse-java + ${revision} + + + clickhouse-cli-client + ${revision} + jar + + ${project.artifactId} + Wrapper of ClickHouse native command-line client + https://github.com/ClickHouse/clickhouse-jdbc/tree/master/clickhouse-cli-client + + + ${project.parent.groupId}.client.internal + + + + + ${project.parent.groupId} + clickhouse-client + ${revision} + + + * + * + + + + + + + org.apache.tomcat + annotations-api + provided + + + + ${project.parent.groupId} + clickhouse-client + ${revision} + test-jar + test + + + org.slf4j + slf4j-simple + test + + + org.testcontainers + testcontainers + test + + + org.testng + testng + test + + + + + + + org.apache.maven.plugins + maven-shade-plugin + + + shade + package + + shade + + + true + true + true + shaded + + + + + + + + *:* + + **/darwin/** + **/linux/** + **/win32/** + **/module-info.class + META-INF/MANIFEST.MF + META-INF/maven/** + META-INF/native-image/** + + + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + org.apache.maven.plugins + maven-failsafe-plugin + + + clickhouse-cli-client + + + + + + \ No newline at end of file diff --git a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java new file mode 100644 index 000000000..5f470cf8e --- /dev/null +++ b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLine.java @@ -0,0 +1,393 @@ +package com.clickhouse.client.cli; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseConfig; +import com.clickhouse.client.ClickHouseCredentials; +import com.clickhouse.client.ClickHouseFile; +import com.clickhouse.client.ClickHouseInputStream; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseOutputStream; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseUtils; +import com.clickhouse.client.cli.config.ClickHouseCommandLineOption; +import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.data.ClickHouseExternalTable; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; + +public class ClickHouseCommandLine implements AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(ClickHouseCommandLine.class); + + public static final String DEFAULT_CLI_ARG_VERSION = "--version"; + public static final String DEFAULT_CLICKHOUSE_CLI_PATH = "clickhouse-client"; + public static final String DEFAULT_DOCKER_CLI_PATH = "docker"; + public static final String DEFAULT_DOCKER_IMAGE = "clickhouse/clickhouse-server"; + + static boolean check(int timeout, String command, String... args) { + if (ClickHouseChecker.isNullOrBlank(command) || args == null) { + throw new IllegalArgumentException("Non-blank command and non-null arguments are required"); + } + + List list = new ArrayList<>(args.length + 1); + list.add(command); + Collections.addAll(list, args); + Process process = null; + try { + process = new ProcessBuilder(list).start(); + process.getOutputStream().close(); + if (process.waitFor(timeout, TimeUnit.MILLISECONDS)) { + int exitValue = process.exitValue(); + if (exitValue != 0) { + log.trace("Command %s exited with value %d", list, exitValue); + } + return exitValue == 0; + } else { + log.trace("Timed out waiting for command %s to complete", list); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.trace("Failed to check command %s due to: %s", list, e.getMessage()); + } finally { + if (process != null && process.isAlive()) { + process.destroyForcibly(); + } + process = null; + } + + return false; + } + + static void dockerCommand(ClickHouseConfig config, String hostDir, String containerDir, int timeout, + List commands) { + String cli = (String) config.getOption(ClickHouseCommandLineOption.DOCKER_CLI_PATH); + if (ClickHouseChecker.isNullOrBlank(cli)) { + cli = DEFAULT_DOCKER_CLI_PATH; + } + if (!check(timeout, cli, DEFAULT_CLI_ARG_VERSION)) { + throw new IllegalStateException("Docker command-line is not available: " + cli); + } else { + commands.add(cli); + } + + String img = (String) config.getOption(ClickHouseCommandLineOption.CLICKHOUSE_DOCKER_IMAGE); + if (ClickHouseChecker.isNullOrBlank(img)) { + img = DEFAULT_DOCKER_IMAGE; + } + String str = (String) config.getOption(ClickHouseCommandLineOption.CLI_CONTAINER_ID); + if (!ClickHouseChecker.isNullOrBlank(str)) { + if (!check(timeout, cli, "exec", str, DEFAULT_CLICKHOUSE_CLI_PATH, DEFAULT_CLI_ARG_VERSION)) { + synchronized (ClickHouseCommandLine.class) { + if (!check(timeout, cli, "exec", str, DEFAULT_CLICKHOUSE_CLI_PATH, DEFAULT_CLI_ARG_VERSION) + && !check(timeout, cli, "run", "--rm", "--name", str, "-v", hostDir + ':' + containerDir, + "-d", img, "tail", "-f", "/dev/null")) { + throw new IllegalStateException("Failed to start new container: " + str); + } + } + } + // reuse the existing container + commands.add("exec"); + commands.add("-i"); + commands.add(str); + } else { // create new container for each query + if (!check(timeout, cli, "run", "--rm", img, DEFAULT_CLICKHOUSE_CLI_PATH, DEFAULT_CLI_ARG_VERSION)) { + throw new IllegalStateException("Invalid ClickHouse docker image: " + img); + } + commands.add("run"); + commands.add("--rm"); + commands.add("-i"); + commands.add("-v"); + commands.add(hostDir + ':' + containerDir); + commands.add(img); + } + + commands.add(DEFAULT_CLICKHOUSE_CLI_PATH); + } + + static Process startProcess(ClickHouseNode server, ClickHouseRequest request) { + final ClickHouseConfig config = request.getConfig(); + final int timeout = config.getSocketTimeout(); + + String hostDir = (String) config.getOption(ClickHouseCommandLineOption.CLI_WORK_DIRECTORY); + hostDir = ClickHouseUtils.normalizeDirectory( + ClickHouseChecker.isNullOrBlank(hostDir) ? System.getProperty("java.io.tmpdir") : hostDir); + String containerDir = (String) config.getOption(ClickHouseCommandLineOption.CLI_CONTAINER_DIRECTORY); + if (ClickHouseChecker.isNullOrBlank(containerDir)) { + containerDir = "/data/"; + } else { + containerDir = ClickHouseUtils.normalizeDirectory(containerDir); + } + + List commands = new LinkedList<>(); + String cli = (String) config.getOption(ClickHouseCommandLineOption.CLICKHOUSE_CLI_PATH); + if (ClickHouseChecker.isNullOrBlank(cli)) { + cli = DEFAULT_CLICKHOUSE_CLI_PATH; + } + if (!check(timeout, cli, DEFAULT_CLI_ARG_VERSION)) { + // fallback to docker + dockerCommand(config, hostDir, containerDir, timeout, commands); + } else { + commands.add(cli); + containerDir = hostDir; + } + + commands.add("--compression=".concat(config.isResponseCompressed() ? "1" : "0")); + commands.add("--host=".concat(server.getHost())); + commands.add("--port=".concat(Integer.toString(server.getPort()))); + + String str = server.getDatabase(config); + if (!ClickHouseChecker.isNullOrBlank(str)) { + commands.add("--database=".concat(str)); + } + if ((boolean) config.getOption(ClickHouseCommandLineOption.USE_CLI_CONFIG)) { + str = (String) config.getOption(ClickHouseCommandLineOption.CLI_CONFIG_FILE); + if (Files.exists(Paths.get(str))) { + commands.add("--config-file=".concat(str)); + } + } else { + ClickHouseCredentials credentials = server.getCredentials(config); + str = credentials.getUserName(); + if (!ClickHouseChecker.isNullOrBlank(str)) { + commands.add("--user=".concat(str)); + } + str = credentials.getPassword(); + if (!ClickHouseChecker.isNullOrBlank(str)) { + commands.add("--password=".concat(str)); + } + } + commands.add("--format=".concat(config.getFormat().name())); + + str = request.getStatements(false).get(0); + commands.add("--query=".concat(str)); + + for (ClickHouseExternalTable table : request.getExternalTables()) { + ClickHouseFile tableFile = table.getFile(); + commands.add("--external"); + String filePath; + if (!tableFile.isAvailable() || !tableFile.getFile().getAbsolutePath().startsWith(hostDir)) { + // creating a hard link is faster but it's not platform-independent + File f = ClickHouseInputStream.save( + Paths.get(hostDir, "chc_".concat(UUID.randomUUID().toString())).toFile(), + table.getContent(), config.getWriteBufferSize(), config.getSocketTimeout(), true); + filePath = containerDir.concat(f.getName()); + } else { + filePath = tableFile.getFile().getAbsolutePath(); + if (!hostDir.equals(containerDir)) { + filePath = Paths.get(containerDir, filePath.substring(hostDir.length())).toFile().getAbsolutePath(); + } + } + commands.add("--file=" + filePath); + if (!ClickHouseChecker.isNullOrEmpty(table.getName())) { + commands.add("--name=".concat(table.getName())); + } + if (table.getFormat() != null) { + commands.add("--format=".concat(table.getFormat().name())); + } + commands.add("--structure=".concat(table.getStructure())); + } + + Map settings = request.getSettings(); + Object value = settings.get("max_result_rows"); + if (value instanceof Number) { + long maxRows = ((Number) value).longValue(); + if (maxRows > 0L) { + commands.add("--limit=".concat(Long.toString(maxRows))); + } + } + value = settings.get("result_overflow_mode"); + if (value != null) { + commands.add("--result_overflow_mode=".concat(value.toString())); + } + if ((boolean) config.getOption(ClickHouseCommandLineOption.USE_PROFILE_EVENTS)) { + commands.add("--print-profile-events"); + commands.add("--profile-events-delay-ms=-1"); + } + + log.debug("Query: %s", str); + ProcessBuilder builder = new ProcessBuilder(commands); + String workDirectory = (String) config.getOption( + ClickHouseCommandLineOption.CLI_WORK_DIRECTORY); + if (!ClickHouseChecker.isNullOrBlank(workDirectory)) { + Path p = Paths.get(workDirectory); + if (Files.isDirectory(p)) { + builder.directory(p.toFile()); + } + } + + if (request.hasOutputStream()) { + final ClickHouseOutputStream chOutput = request.getOutputStream().get(); + final ClickHouseFile outputFile = chOutput.getUnderlyingFile(); + + if (outputFile.isAvailable()) { + File f = outputFile.getFile(); + if (hostDir.equals(containerDir)) { + builder.redirectOutput(f); + } else if (f.getAbsolutePath().startsWith(hostDir)) { + String relativePath = f.getAbsolutePath().substring(hostDir.length()); + builder.redirectOutput(new File(containerDir.concat(relativePath))); + } else { + String fileName = f.getName(); + int len = fileName.length(); + int index = fileName.indexOf('.', 1); + String uuid = UUID.randomUUID().toString(); + if (index > 0 && index + 1 < len) { + fileName = new StringBuilder(len + uuid.length() + 1).append(fileName.substring(0, index)) + .append('_').append(uuid).append(fileName.substring(index)).toString(); + } else { + fileName = new StringBuilder(len + uuid.length() + 1).append(fileName).append('_') + .append(UUID.randomUUID().toString()).toString(); + } + Path newPath = Paths.get(hostDir, fileName); + try { + f = Files.createLink(newPath, f.toPath()).toFile(); + } catch (IOException e) { + throw new UncheckedIOException(e); + } catch (UnsupportedOperationException e) { + try { + f = ClickHouseInputStream.save(newPath.toFile(), new FileInputStream(f), + config.getWriteBufferSize(), timeout, true); + } catch (FileNotFoundException exp) { + throw new UncheckedIOException(exp); + } + } + } + builder.redirectOutput(f); + } + } + final Optional in = request.getInputStream(); + try { + final Process process; + if (in.isPresent()) { + final ClickHouseInputStream chInput = in.get(); + final File inputFile; + if (chInput.getUnderlyingFile().isAvailable()) { + inputFile = chInput.getUnderlyingFile().getFile(); + } else { + CompletableFuture data = ClickHouseClient.submit(() -> { + File tmp = File.createTempFile("tmp", "data"); + tmp.deleteOnExit(); + try (ClickHouseOutputStream out = ClickHouseOutputStream.of(new FileOutputStream(tmp))) { + request.getInputStream().get().pipe(out); + } + return tmp; + }); + inputFile = data.get(timeout, TimeUnit.MILLISECONDS); + } + process = builder.redirectInput(inputFile).start(); + } else { + process = builder.start(); + process.getOutputStream().close(); + } + return process; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new CompletionException(e); + } catch (CancellationException | ExecutionException | TimeoutException e) { + throw new CompletionException(e); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + private final ClickHouseNode server; + private final ClickHouseRequest request; + + private final Process process; + + private String error; + + public ClickHouseCommandLine(ClickHouseNode server, ClickHouseRequest request) { + this.server = server; + this.request = request; + + this.process = startProcess(server, request); + this.error = null; + } + + public ClickHouseInputStream getInputStream() throws IOException { + ClickHouseOutputStream out = request.getOutputStream().orElse(null); + if (out != null && !out.getUnderlyingFile().isAvailable()) { + try (OutputStream o = out) { + ClickHouseInputStream.pipe(process.getInputStream(), o, request.getConfig().getWriteBufferSize()); + } + return ClickHouseInputStream.empty(); + } else { + return ClickHouseInputStream.of(process.getInputStream(), request.getConfig().getReadBufferSize()); + } + } + + IOException getError() { + if (error == null) { + int bufferSize = (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(); + try (ByteArrayOutputStream output = new ByteArrayOutputStream(bufferSize)) { + ClickHouseInputStream.pipe(process.getErrorStream(), output, bufferSize); + error = new String(output.toByteArray(), StandardCharsets.UTF_8); + } catch (IOException e) { + error = ""; + } + try { + if (!process.waitFor(request.getConfig().getSocketTimeout(), TimeUnit.MILLISECONDS)) { + return new IOException("Timed out waiting for command to terminate"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + process.destroyForcibly(); + throw new CompletionException(e); + } + if (process.exitValue() != 0) { + if (error.isEmpty()) { + error = ClickHouseUtils.format("Command exited with value %d", process.exitValue()); + } else { + int index = error.trim().indexOf('\n'); + error = index > 0 ? error.substring(index + 1) : error; + } + } else { + if (!error.isEmpty()) { + // TODO update response summary + log.trace(() -> { + for (String line : error.split("\n")) { + log.trace(line); + } + return ""; + }); + } + error = ""; + } + } + return !ClickHouseChecker.isNullOrBlank(error) ? new IOException(error) : null; + } + + @Override + public void close() { + if (process.isAlive()) { + process.destroyForcibly(); + } + } +} diff --git a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineClient.java b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineClient.java new file mode 100644 index 000000000..a7eb84eb9 --- /dev/null +++ b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineClient.java @@ -0,0 +1,114 @@ +package com.clickhouse.client.cli; + +import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; + +import com.clickhouse.client.AbstractClient; +import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseConfig; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseRequest; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.config.ClickHouseOption; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; +import com.clickhouse.client.cli.config.ClickHouseCommandLineOption; + +/** + * Wrapper of ClickHouse native command-line client. + */ +public class ClickHouseCommandLineClient extends AbstractClient { + private static final Logger log = LoggerFactory.getLogger(ClickHouseCommandLineClient.class); + + @Override + protected ClickHouseCommandLine newConnection(ClickHouseCommandLine conn, ClickHouseNode server, + ClickHouseRequest request) { + if (conn != null) { + closeConnection(conn, false); + } + + return new ClickHouseCommandLine(server, request); + } + + @Override + protected boolean checkConnection(ClickHouseCommandLine connection, ClickHouseNode requestServer, + ClickHouseNode currentServer, ClickHouseRequest request) { + return false; + } + + @Override + protected void closeConnection(ClickHouseCommandLine conn, boolean force) { + try { + conn.close(); + } catch (Exception e) { + log.warn("Failed to close http connection due to: %s", e.getMessage()); + } + } + + @Override + public boolean accept(ClickHouseProtocol protocol) { + ClickHouseConfig config = getConfig(); + int timeout = config != null ? config.getConnectionTimeout() + : (int) ClickHouseClientOption.CONNECTION_TIMEOUT.getEffectiveDefaultValue(); + String cli = config != null ? (String) config.getOption(ClickHouseCommandLineOption.CLICKHOUSE_CLI_PATH) + : (String) ClickHouseCommandLineOption.CLICKHOUSE_CLI_PATH.getEffectiveDefaultValue(); + if (ClickHouseChecker.isNullOrBlank(cli)) { + cli = ClickHouseCommandLine.DEFAULT_CLICKHOUSE_CLI_PATH; + } + String docker = config != null ? (String) config.getOption(ClickHouseCommandLineOption.DOCKER_CLI_PATH) + : (String) ClickHouseCommandLineOption.DOCKER_CLI_PATH.getEffectiveDefaultValue(); + if (ClickHouseChecker.isNullOrBlank(docker)) { + docker = ClickHouseCommandLine.DEFAULT_DOCKER_CLI_PATH; + } + return ClickHouseProtocol.TCP == protocol + && (ClickHouseCommandLine.check(timeout, cli, ClickHouseCommandLine.DEFAULT_CLI_ARG_VERSION) + || ClickHouseCommandLine.check(timeout, docker, ClickHouseCommandLine.DEFAULT_CLI_ARG_VERSION)); + } + + @Override + public CompletableFuture execute(ClickHouseRequest request) { + final ClickHouseRequest sealedRequest = request.seal(); + final ClickHouseConfig config = sealedRequest.getConfig(); + final ClickHouseNode server = getServer(); + + if (config.isAsync()) { + return CompletableFuture + .supplyAsync(() -> { + try { + return new ClickHouseCommandLineResponse(config, getConnection(sealedRequest)); + } catch (IOException e) { + throw new CompletionException(e); + } + }); + } else { + try { + return CompletableFuture + .completedFuture(new ClickHouseCommandLineResponse(config, getConnection(sealedRequest))); + } catch (IOException e) { + throw new CompletionException(ClickHouseException.of(e, server)); + } + } + } + + @Override + public final Class getOptionClass() { + return ClickHouseCommandLineOption.class; + } + + @Override + public boolean ping(ClickHouseNode server, int timeout) { + if (server != null) { + try (ClickHouseCommandLine cli = getConnection(connect(server).query("select 1")); + ClickHouseCommandLineResponse response = new ClickHouseCommandLineResponse(getConfig(), cli)) { + return response.firstRecord().getValue(0).asInteger() == 1; + } catch (Exception e) { + // ignore + } + } + return false; + } +} diff --git a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineResponse.java b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineResponse.java new file mode 100644 index 000000000..2c28d7c8d --- /dev/null +++ b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/ClickHouseCommandLineResponse.java @@ -0,0 +1,44 @@ +package com.clickhouse.client.cli; + +import java.io.IOException; +import java.io.UncheckedIOException; + +import com.clickhouse.client.ClickHouseConfig; +import com.clickhouse.client.ClickHouseResponseSummary; +import com.clickhouse.client.data.ClickHouseStreamResponse; + +public class ClickHouseCommandLineResponse extends ClickHouseStreamResponse { + private final transient ClickHouseCommandLine cli; + + protected ClickHouseCommandLineResponse(ClickHouseConfig config, ClickHouseCommandLine cli) throws IOException { + super(config, cli.getInputStream(), null, null, ClickHouseResponseSummary.EMPTY); + + if (this.input.available() < 1) { + IOException exp = cli.getError(); + if (exp != null) { + throw exp; + } + } + + this.cli = cli; + } + + @Override + public ClickHouseResponseSummary getSummary() { + return summary; + } + + @Override + public void close() { + try { + if (cli != null) { + IOException exp = cli.getError(); + if (exp != null) { + throw new UncheckedIOException(exp); + } + } + } finally { + super.close(); + } + } +} diff --git a/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/config/ClickHouseCommandLineOption.java b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/config/ClickHouseCommandLineOption.java new file mode 100644 index 000000000..bb7636e53 --- /dev/null +++ b/clickhouse-cli-client/src/main/java/com/clickhouse/client/cli/config/ClickHouseCommandLineOption.java @@ -0,0 +1,94 @@ +package com.clickhouse.client.cli.config; + +import java.io.Serializable; + +import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.config.ClickHouseOption; + +public enum ClickHouseCommandLineOption implements ClickHouseOption { + /** + * ClickHouse native command-line client path. Empty value is treated as + * 'clickhouse-client'. + */ + CLICKHOUSE_CLI_PATH("clickhouse_cli_path", "", + "ClickHouse native command-line client path, empty value is treated as 'clickhouse-client'"), + /** + * ClickHouse docker image. Empty value is treated as + * 'clickhouse/clickhouse-server'. + */ + CLICKHOUSE_DOCKER_IMAGE("clickhouse_docker_image", "clickhouse/clickhouse-server", + "ClickHouse docker image, empty value is treated as 'clickhouse/clickhouse-server'"), + /** + * Docker command-line path. Empty value is treated as 'docker'. + */ + DOCKER_CLI_PATH("docker_cli_path", "", "Docker command-line path, empty value is treated as 'docker'"), + /** + * ClickHouse native command-line client configuration file. Empty value will + * disable {@link #USE_CLI_CONFIG}. + */ + CLI_CONFIG_FILE("cli_config_file", "~/.clickhouse-client/config.xml", + "ClickHouse native command-line client configuration file, empty value will disable 'use_cli_config'"), + /** + * Docker container ID or name. Empty value will result in new container being + * created for each query. + */ + CLI_CONTAINER_ID("cli_container_id", "clickhouse-cli-client", + "Docker container ID or name, empty value will result in new container being created for each query"), + /** + * Work directory inside container, only works running in docker mode(when + * {@link #CLICKHOUSE_CLI_PATH} is not available). Empty value is treated as + * '/data'. + */ + CLI_CONTAINER_DIRECTORY("cli_container_directory", "", + "Work directory inside container, empty value is treated as '/data'"), + /** + * Command-line work directory. Empty value is treated as system temporary + * directory(e.g. {@code System.getProperty("java.io.tmpdir")}). When running in + * docker mode, it's mounted as {@link #CLI_CONTAINER_DIRECTORY} in container. + */ + CLI_WORK_DIRECTORY("cli_work_directory", "", + "Command-line work directory, empty value is treate as system temporary directory"), + /** + * Whether to use native command-line client configuration file as defined in + * {@link #CLI_CONFIG_FILE}. + */ + USE_CLI_CONFIG("use_cli_config", false, + "Whether to use native command-line client configuration file as defined in 'cli_config_file'"), + /** + * Whether to use profile events or not. This is needed when you want to get + * meaningful response summary. + */ + USE_PROFILE_EVENTS("use_profile_events", false, "Whether to use profile events or not"); + + private final String key; + private final Serializable defaultValue; + private final Class clazz; + private final String description; + + ClickHouseCommandLineOption(String key, T defaultValue, String description) { + this.key = ClickHouseChecker.nonNull(key, "key"); + this.defaultValue = ClickHouseChecker.nonNull(defaultValue, "defaultValue"); + this.clazz = defaultValue.getClass(); + this.description = ClickHouseChecker.nonNull(description, "description"); + } + + @Override + public Serializable getDefaultValue() { + return defaultValue; + } + + @Override + public String getDescription() { + return description; + } + + @Override + public String getKey() { + return key; + } + + @Override + public Class getValueType() { + return clazz; + } +} diff --git a/clickhouse-cli-client/src/main/java9/module-info.java b/clickhouse-cli-client/src/main/java9/module-info.java new file mode 100644 index 000000000..f8cbe3eb9 --- /dev/null +++ b/clickhouse-cli-client/src/main/java9/module-info.java @@ -0,0 +1,8 @@ +module com.clickhouse.client.cli { + exports com.clickhouse.client.cli; + exports com.clickhouse.client.cli.config; + + provides com.clickhouse.client.ClickHouseClient with com.clickhouse.client.cli.ClickHouseCommandLineClient; + + requires transitive com.clickhouse.client; +} diff --git a/clickhouse-cli-client/src/main/resources/META-INF/services/com.clickhouse.client.ClickHouseClient b/clickhouse-cli-client/src/main/resources/META-INF/services/com.clickhouse.client.ClickHouseClient new file mode 100644 index 000000000..97d6b2209 --- /dev/null +++ b/clickhouse-cli-client/src/main/resources/META-INF/services/com.clickhouse.client.ClickHouseClient @@ -0,0 +1 @@ +com.clickhouse.client.cli.ClickHouseCommandLineClient diff --git a/clickhouse-cli-client/src/test/java/com/clickhouse/client/cli/ClickHouseCommandLineClientTest.java b/clickhouse-cli-client/src/test/java/com/clickhouse/client/cli/ClickHouseCommandLineClientTest.java new file mode 100644 index 000000000..ad9876088 --- /dev/null +++ b/clickhouse-cli-client/src/test/java/com/clickhouse/client/cli/ClickHouseCommandLineClientTest.java @@ -0,0 +1,67 @@ +package com.clickhouse.client.cli; + +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseClientBuilder; +import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseServerForTest; +import com.clickhouse.client.ClientIntegrationTest; +import com.clickhouse.client.cli.config.ClickHouseCommandLineOption; + +import org.testcontainers.containers.GenericContainer; +import org.testng.Assert; +import org.testng.SkipException; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class ClickHouseCommandLineClientTest extends ClientIntegrationTest { + @BeforeClass + static void init() { + System.setProperty(ClickHouseCommandLineOption.CLI_CONTAINER_DIRECTORY.getSystemProperty(), + ClickHouseServerForTest.getClickHouseContainerTmpDir()); + } + + @Override + protected ClickHouseProtocol getProtocol() { + return ClickHouseProtocol.TCP; + } + + @Override + protected Class getClientClass() { + return ClickHouseCommandLineClient.class; + } + + @Override + protected ClickHouseClientBuilder initClient(ClickHouseClientBuilder builder) { + return super.initClient(builder).option(ClickHouseCommandLineOption.CLI_CONTAINER_DIRECTORY, + ClickHouseServerForTest.getClickHouseContainerTmpDir()); + } + + @Override + protected ClickHouseNode getServer() { + GenericContainer container = ClickHouseServerForTest.getClickHouseContainer(); + if (container != null) { + return ClickHouseNode.of("localhost", getProtocol(), getProtocol().getDefaultPort(), null); + } + + return super.getServer(); + } + + @Test(groups = { "integration" }) + @Override + public void testLoadRawData() throws Exception { + throw new SkipException("Skip due to response summary is always empty"); + } + + @Test(groups = { "integration" }) + @Override + public void testReadWriteGeoTypes() { + throw new SkipException("Skip due to session is not supported"); + } + + @Test(groups = { "integration" }) + @Override + public void testTempTable() { + throw new SkipException("Skip due to session is not supported"); + } +} diff --git a/clickhouse-cli-client/src/test/resources/simplelogger.properties b/clickhouse-cli-client/src/test/resources/simplelogger.properties new file mode 100644 index 000000000..7589a0e57 --- /dev/null +++ b/clickhouse-cli-client/src/test/resources/simplelogger.properties @@ -0,0 +1,7 @@ +org.slf4j.simpleLogger.defaultLogLevel=info +org.slf4j.simpleLogger.log.com.clickhouse.client=debug +org.slf4j.simpleLogger.showDateTime=true +org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS Z +org.slf4j.simpleLogger.showThreadName=true +org.slf4j.simpleLogger.showLogName=true +org.slf4j.simpleLogger.showShortLogName=true diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java index 51077b20f..472114f98 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/AbstractClient.java @@ -78,9 +78,9 @@ protected final ClickHouseNode getServer() { /** * Checks if the underlying connection can be reused. In general, new connection - * will be created when {@code connection} is null or {@code requestServer} is - * different from {@code currentServer} - the existing connection will be closed - * in the later case. + * will be created when {@code connection} is {@code null} or + * {@code requestServer} is different from {@code currentServer} - the existing + * connection will be closed in the later case. * * @param connection existing connection which may or may not be null * @param requestServer non-null requested server, returned from previous call diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseChecker.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseChecker.java index 945d0592a..e3aa95819 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseChecker.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseChecker.java @@ -189,7 +189,7 @@ public static BigDecimal between(BigDecimal value, String name, BigDecimal minVa } /** - * Checks if the given string is null or empty. + * Checks if the given string is {@code null} or empty. * * @param value the string to check * @return true if the string is null or empty; false otherwise diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java index 9f00d4503..e018c6310 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java @@ -1,7 +1,5 @@ package com.clickhouse.client; -import java.io.FileNotFoundException; -import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.io.UncheckedIOException; @@ -252,6 +250,46 @@ static CompletableFuture submit(Callable task) { } } + /** + * Dumps a table or query result from server into a file. File will be + * created/overwrited as needed. + * + * @param server non-null server to connect to + * @param tableOrQuery table name or a select query + * @param file output file + * @return non-null future object to get result + * @throws IllegalArgumentException if any of server, tableOrQuery, and output + * is null + * @throws CompletionException when error occurred during execution + */ + static CompletableFuture dump(ClickHouseNode server, String tableOrQuery, + ClickHouseFile file) { + if (server == null || tableOrQuery == null || file == null) { + throw new IllegalArgumentException("Non-null server, tableOrQuery, and file are required"); + } + + // in case the protocol is ANY + final ClickHouseNode theServer = ClickHouseCluster.probe(server); + + final String theQuery = tableOrQuery.trim(); + + return submit(() -> { + try (ClickHouseClient client = newInstance(theServer.getProtocol())) { + ClickHouseRequest request = client.connect(theServer).output(file); + // FIXME what if the table name is `try me`? + if (theQuery.indexOf(' ') < 0) { + request.table(theQuery); + } else { + request.query(theQuery); + } + + try (ClickHouseResponse response = request.executeAndWait()) { + return response.getSummary(); + } + } + }); + } + /** * Dumps a table or query result from server into a file. File will be * created/overwrited as needed. @@ -265,12 +303,10 @@ static CompletableFuture submit(Callable task) { * @throws IllegalArgumentException if any of server, tableOrQuery, and output * is null * @throws CompletionException when error occurred during execution - * @throws IOException when failed to create the file or its parent - * directories */ static CompletableFuture dump(ClickHouseNode server, String tableOrQuery, - ClickHouseFormat format, ClickHouseCompression compression, String file) throws IOException { - return dump(server, tableOrQuery, format, compression, ClickHouseUtils.getFileOutputStream(file)); + ClickHouseFormat format, ClickHouseCompression compression, String file) { + return dump(server, tableOrQuery, ClickHouseFile.of(file, compression, 0, format)); } /** @@ -302,8 +338,8 @@ static CompletableFuture dump(ClickHouseNode server, return submit(() -> { try (ClickHouseClient client = newInstance(theServer.getProtocol())) { - ClickHouseRequest request = client.connect(theServer).compressServerResponse( - compression != null && compression != ClickHouseCompression.NONE, compression).format(format); + ClickHouseRequest request = client.connect(theServer).compressServerResponse(compression) + .format(format).output(output); // FIXME what if the table name is `try me`? if (theQuery.indexOf(' ') < 0) { request.table(theQuery); @@ -312,7 +348,6 @@ static CompletableFuture dump(ClickHouseNode server, } try (ClickHouseResponse response = request.executeAndWait()) { - response.pipe(output, request.getConfig().getWriteBufferSize()); return response.getSummary(); } } finally { @@ -325,6 +360,33 @@ static CompletableFuture dump(ClickHouseNode server, }); } + /** + * Loads data from given file into a table. + * + * @param server non-null server to connect to + * @param table non-null target table + * @param file non-null file + * @return future object to get result + * @throws IllegalArgumentException if any of server, table, and input is null + * @throws CompletionException when error occurred during execution + */ + static CompletableFuture load(ClickHouseNode server, String table, ClickHouseFile file) { + if (server == null || table == null || file == null) { + throw new IllegalArgumentException("Non-null server, table, and file are required"); + } + + // in case the protocol is ANY + final ClickHouseNode theServer = ClickHouseCluster.probe(server); + + return submit(() -> { + try (ClickHouseClient client = newInstance(theServer.getProtocol()); + ClickHouseResponse response = client.connect(theServer).write().table(table).data(file) + .executeAndWait()) { + return response.getSummary(); + } + }); + } + /** * Loads data from a file into table using specified format and compression * algorithm. @@ -337,11 +399,10 @@ static CompletableFuture dump(ClickHouseNode server, * @return future object to get result * @throws IllegalArgumentException if any of server, table, and input is null * @throws CompletionException when error occurred during execution - * @throws FileNotFoundException when file not found */ static CompletableFuture load(ClickHouseNode server, String table, - ClickHouseFormat format, ClickHouseCompression compression, String file) throws FileNotFoundException { - return load(server, table, format, compression, ClickHouseUtils.getFileInputStream(file)); + ClickHouseFormat format, ClickHouseCompression compression, String file) { + return load(server, table, ClickHouseFile.of(file, compression, 0, format)); } /** @@ -376,9 +437,8 @@ static CompletableFuture load(ClickHouseNode server, .createPipedOutputStream(client.getConfig(), null); // execute query in a separate thread(because async is explicitly set to true) CompletableFuture future = client.connect(theServer).write().table(table) - .decompressClientRequest(compression != null && compression != ClickHouseCompression.NONE, - compression) - .format(format).data(input = stream.getInputStream()).execute(); + .decompressClientRequest(compression).format(format).data(input = stream.getInputStream()) + .execute(); try { // write data into stream in current thread writer.write(stream); @@ -434,9 +494,7 @@ static CompletableFuture load(ClickHouseNode server, return submit(() -> { try (ClickHouseClient client = newInstance(theServer.getProtocol()); ClickHouseResponse response = client.connect(theServer).write().table(table) - .decompressClientRequest(compression != null && compression != ClickHouseCompression.NONE, - compression) - .format(format).data(input).executeAndWait()) { + .decompressClientRequest(compression).format(format).data(input).executeAndWait()) { return response.getSummary(); } finally { try { diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java index ad2a6f61d..64f1e6752 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClientBuilder.java @@ -90,7 +90,10 @@ public ClickHouseClient build() { boolean noSelector = nodeSelector == null || nodeSelector == ClickHouseNodeSelector.EMPTY; int counter = 0; + ClickHouseConfig conf = getConfig(); for (ClickHouseClient c : ServiceLoader.load(ClickHouseClient.class, getClass().getClassLoader())) { + c.init(conf); + counter++; if (noSelector || nodeSelector.match(c)) { client = c; @@ -101,8 +104,6 @@ public ClickHouseClient build() { if (client == null) { throw new IllegalStateException( ClickHouseUtils.format("No suitable ClickHouse client(out of %d) found in classpath.", counter)); - } else { - client.init(getConfig()); } return client; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java new file mode 100644 index 000000000..8af775f00 --- /dev/null +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java @@ -0,0 +1,159 @@ +package com.clickhouse.client; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.nio.file.Path; + +import com.clickhouse.client.config.ClickHouseClientOption; + +/** + * Wrapper of {@link java.io.File} with additional information like compression + * and format. + */ +public class ClickHouseFile { + /** + * Null file which has no compression and format. + */ + public static final ClickHouseFile NULL = new ClickHouseFile(null, ClickHouseCompression.NONE, 0, null); + + public static ClickHouseFile of(File file) { + return of(file, null, 0, null); + } + + public static ClickHouseFile of(Path path) { + return of(ClickHouseChecker.nonNull(path, "Path").toFile(), null, 0, null); + } + + public static ClickHouseFile of(String file) { + return of(new File(ClickHouseChecker.nonEmpty(file, "File")), null, 0, null); + } + + public static ClickHouseFile of(String file, ClickHouseCompression compression, int compressionLevel, + ClickHouseFormat format) { + return of(new File(ClickHouseChecker.nonEmpty(file, "File")), compression, compressionLevel, format); + } + + public static ClickHouseFile of(File file, ClickHouseCompression compression, int compressionLevel, + ClickHouseFormat format) { + return new ClickHouseFile(ClickHouseChecker.nonNull(file, "File"), + compression != null ? compression : ClickHouseCompression.fromFileName(file.getName()), + compressionLevel < 1 ? 0 : compressionLevel, + format != null ? format : ClickHouseFormat.fromFileName(file.getName())); + } + + private final File file; + private final ClickHouseCompression compress; + private final int compressLevel; + private final ClickHouseFormat format; + + protected ClickHouseFile(File file, ClickHouseCompression compress, int compressLevel, ClickHouseFormat format) { + this.file = file; + this.compress = compress; + this.compressLevel = compressLevel; + this.format = format; + } + + /** + * Creates an input stream for reading the file. + * + * @return non-null input stream for reading the file + */ + public ClickHouseInputStream asInputStream() { + if (!isAvailable()) { + return ClickHouseInputStream.empty(); + } + + try { + return ClickHouseInputStream.wrap(this, new FileInputStream(getFile()), + (int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), null, + getCompressionAlgorithm(), getCompressionLevel()); + } catch (FileNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Creates an output stream for writing data into the file. + * + * @return non-null input stream for writing data into the file + */ + public ClickHouseOutputStream asOutputStream() { + if (!isAvailable()) { + return ClickHouseOutputStream.empty(); + } + + try { + return ClickHouseOutputStream.wrap(this, new FileOutputStream(getFile()), + (int) ClickHouseClientOption.WRITE_BUFFER_SIZE.getDefaultValue(), null, + getCompressionAlgorithm(), getCompressionLevel()); + } catch (FileNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Gets file, which only works when {@link #isAvailable()} returns {@code true}. + * + * @return non-null file, except {@code null} for {@link #NULL} + */ + public File getFile() { + return file; + } + + /** + * Gets file format, which could be null. Use {@link #hasFormat()} to check + * first. + * + * @return file format, could be null + */ + public ClickHouseFormat getFormat() { + return format; + } + + /** + * Gets compression algorithm. + * + * @return non-null compression algorithm + */ + public ClickHouseCompression getCompressionAlgorithm() { + return compress; + } + + /** + * Gets compression level. + * + * @return compression level, which is always greater than or equal to zero + */ + public int getCompressionLevel() { + return compressLevel; + } + + /** + * Checks if the file format is defined or not. + * + * @return true if the file format is defined; false otherwise + */ + public boolean hasFormat() { + return format != null; + } + + /** + * Checks if the file is available or not. + * + * @return true if the file is available; false otherwise + */ + public boolean isAvailable() { + return file != null && file.exists(); + } + + /** + * Checks if the file is compressed or not. + * + * @return true if the file is compressed; false otherwise + */ + public boolean isCompressed() { + return compress != ClickHouseCompression.NONE; + } +} diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFormat.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFormat.java index 4a8eca651..c97abb9be 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFormat.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFormat.java @@ -68,6 +68,61 @@ public enum ClickHouseFormat { Vertical(false, true, false, false, false), // https://clickhouse.com/docs/en/interfaces/formats/#vertical XML(false, true, false, false, false); // https://clickhouse.com/docs/en/interfaces/formats/#xml + /** + * Gets format based on given file name. + * + * @param file file name + * @return format, could be null + */ + public static ClickHouseFormat fromFileName(String file) { + ClickHouseCompression compression = ClickHouseCompression.fromFileName(file); + if (compression != ClickHouseCompression.NONE) { + file = file.substring(0, file.lastIndexOf('.')); + } + ClickHouseFormat format = null; + + int index = 0; + if (file != null && (index = file.lastIndexOf('.')) > 0) { + String ext = file.substring(index + 1).toLowerCase(); + switch (ext) { + case "arrow": + format = Arrow; + break; + case "avro": + format = Avro; + break; + case "capnp": + format = CapnProto; + break; + case "csv": + format = CSV; + break; + case "json": + format = JSONEachRow; + break; + case "msgpack": + format = MsgPack; + break; + case "orc": + format = ORC; + break; + case "parquet": + format = Parquet; + break; + case "tsv": + format = TSV; + break; + case "xml": + format = XML; + break; + default: + break; + } + } + + return format; + } + private final boolean input; private final boolean output; private final boolean binary; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java index ca7c9a23e..bf3656248 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java @@ -3,15 +3,22 @@ import java.io.EOFException; import java.io.File; import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.io.UncheckedIOException; import java.net.URL; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.zip.GZIPInputStream; @@ -56,6 +63,42 @@ public abstract class ClickHouseInputStream extends InputStream { protected static final String ERROR_REUSE_BUFFER = "Please pass a different byte array instead of the same internal buffer for reading"; protected static final String ERROR_STREAM_CLOSED = "Input stream has been closed"; + /** + * Wraps the given input stream. + * + * @param file wrapped file, could be null + * @param input non-null input stream + * @param bufferSize buffer size + * @param postCloseAction custom action will be performed right after closing + * the wrapped input stream + * @param compression compression algorithm + * @param compressionLevel compression level + * @return non-null wrapped input stream + */ + static ClickHouseInputStream wrap(ClickHouseFile file, InputStream input, int bufferSize, Runnable postCloseAction, + ClickHouseCompression compression, int compressionLevel) { + final ClickHouseInputStream chInput; + if (compression == null || compression == ClickHouseCompression.NONE) { + chInput = new WrappedInputStream(file, input, bufferSize, postCloseAction); + } else { + switch (compression) { + case GZIP: + try { + chInput = new WrappedInputStream(file, new GZIPInputStream(input), bufferSize, postCloseAction); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to wrap input stream", e); + } + break; + case LZ4: + chInput = new Lz4InputStream(file, input, postCloseAction); + break; + default: + throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression); + } + } + return chInput; + } + /** * Gets an empty input stream that produces nothing and cannot be closed. * @@ -101,7 +144,29 @@ public static ClickHouseInputStream of(BlockingQueue queue, int time */ public static ClickHouseInputStream of(ClickHouseDeferredValue deferredInput, int bufferSize, Runnable postCloseAction) { - return new WrappedInputStream(new DeferredInputStream(deferredInput), bufferSize, postCloseAction); + return new WrappedInputStream(null, new DeferredInputStream(deferredInput), bufferSize, postCloseAction); + } + + /** + * Wraps the given file as input stream. + * + * @param file non-null file + * @param bufferSize buffer size which is always greater than zero(usually + * 8192 or larger) + * @param postCloseAction custom action will be performed right after closing + * the input stream + * @return wrapped input + */ + public static ClickHouseInputStream of(ClickHouseFile file, int bufferSize, Runnable postCloseAction) { + if (file == null || !file.isAvailable()) { + throw new IllegalArgumentException("Non-null file required"); + } + try { + return wrap(file, new FileInputStream(file.getFile()), bufferSize, postCloseAction, + file.getCompressionAlgorithm(), file.getCompressionLevel()); + } catch (FileNotFoundException e) { + throw new IllegalArgumentException(e); + } } /** @@ -165,30 +230,10 @@ public static ClickHouseInputStream of(InputStream input, int bufferSize, ClickH Runnable postCloseAction) { if (input == null) { return EmptyInputStream.INSTANCE; + } else if (input != EmptyInputStream.INSTANCE && input instanceof ClickHouseInputStream) { + return (ClickHouseInputStream) input; } - - ClickHouseInputStream chInput; - if (compression != null && compression != ClickHouseCompression.NONE) { - switch (compression) { - case GZIP: - try { - chInput = new WrappedInputStream(new GZIPInputStream(input), bufferSize, postCloseAction); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to wrap input stream", e); - } - break; - case LZ4: - chInput = new Lz4InputStream(input, postCloseAction); - break; - default: - throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression); - } - } else { - chInput = input instanceof ClickHouseInputStream ? (ClickHouseInputStream) input - : new WrappedInputStream(input, bufferSize, postCloseAction); - } - - return chInput; + return wrap(null, input, bufferSize, postCloseAction, compression, 0); } /** @@ -424,10 +469,60 @@ public static long pipe(InputStream input, OutputStream output, byte[] buffer) t return count; } + public static File save(InputStream in, int bufferSize, int timeout) { + return save(null, in, bufferSize, timeout, true); + } + + public static File save(File file, InputStream in, int bufferSize, int timeout, boolean deleteOnExit) { + final File tmp; + if (file != null) { + tmp = file; + if (deleteOnExit) { + tmp.deleteOnExit(); + } + } else { + try { + tmp = File.createTempFile("chc", "data"); + tmp.deleteOnExit(); + } catch (IOException e) { + throw new IllegalStateException("Failed to create temp file", e); + } + } + CompletableFuture data = CompletableFuture.supplyAsync(() -> { + try { + try (OutputStream out = new FileOutputStream(tmp)) { + pipe(in, out, bufferSize); + } + return tmp; + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + + try { + return data.get(timeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IllegalStateException(e); + } catch (TimeoutException e) { + throw new IllegalStateException(e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof UncheckedIOException) { + cause = ((UncheckedIOException) cause).getCause(); + } + throw new IllegalStateException(cause); + } + } + /** * Non-null reusable byte buffer. */ protected final ClickHouseByteBuffer byteBuffer; + /** + * Underlying file. + */ + protected final ClickHouseFile file; /** * Optional post close action. */ @@ -436,8 +531,9 @@ public static long pipe(InputStream input, OutputStream output, byte[] buffer) t protected boolean closed; protected OutputStream copyTo; - protected ClickHouseInputStream(OutputStream copyTo, Runnable postCloseAction) { + protected ClickHouseInputStream(ClickHouseFile file, OutputStream copyTo, Runnable postCloseAction) { this.byteBuffer = ClickHouseByteBuffer.newInstance(); + this.file = file != null ? file : ClickHouseFile.NULL; this.postCloseAction = postCloseAction; this.closed = false; @@ -467,6 +563,15 @@ protected void ensureOpen() throws IOException { } } + /** + * Gets underlying file. + * + * @return non-null underlying file + */ + public ClickHouseFile getUnderlyingFile() { + return file; + } + /** * Peeks one byte. It's similar as {@link #read()} except it never changes * cursor. diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java index 98208a65c..33bd4c2ac 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java @@ -1,5 +1,7 @@ package com.clickhouse.client; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; @@ -8,6 +10,7 @@ import java.util.zip.GZIPOutputStream; import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.stream.EmptyOutputStream; import com.clickhouse.client.stream.Lz4OutputStream; import com.clickhouse.client.stream.WrappedOutputStream; @@ -24,7 +27,76 @@ public abstract class ClickHouseOutputStream extends OutputStream { /** * Wraps the given output stream. * - * @param output non-null output stream + * @param file wrapped file, could be null + * @param output non-null output stream + * @param bufferSize buffer size + * @param postCloseAction custom action will be performed right after closing + * the wrapped output stream + * @param compression compression algorithm + * @param compressionLevel compression level + * @return non-null wrapped output stream + */ + static ClickHouseOutputStream wrap(ClickHouseFile file, OutputStream output, int bufferSize, + Runnable postCloseAction, ClickHouseCompression compression, int compressionLevel) { + final ClickHouseOutputStream chOutput; + if (compression == null || compression == ClickHouseCompression.NONE) { + chOutput = new WrappedOutputStream(file, output, bufferSize, postCloseAction); + } else { + switch (compression) { + case GZIP: + try { + chOutput = new WrappedOutputStream(file, new GZIPOutputStream(output), bufferSize, + postCloseAction); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to wrap input stream", e); + } + break; + case LZ4: + chOutput = new Lz4OutputStream(file, output, bufferSize, postCloseAction); + break; + default: + throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression); + } + } + return chOutput; + } + + /** + * Gets an empty output stream that consumes nothing and cannot be closed. + * + * @return empty output stream + */ + public static ClickHouseOutputStream empty() { + return EmptyOutputStream.INSTANCE; + } + + /** + * Wraps the given file as output stream. + * + * @param file non-null file + * @param postCloseAction custom action will be performed right after closing + * the output stream + * @param bufferSize buffer size which is always greater than zero(usually + * 8192 + * or larger) + * @return wrapped output + */ + public static ClickHouseOutputStream of(ClickHouseFile file, int bufferSize, Runnable postCloseAction) { + if (file == null || file == ClickHouseFile.NULL) { + throw new IllegalArgumentException("Non-null file required"); + } + try { + return wrap(file, new FileOutputStream(file.getFile()), bufferSize, postCloseAction, + file.getCompressionAlgorithm(), file.getCompressionLevel()); + } catch (FileNotFoundException e) { + throw new IllegalArgumentException(e); + } + } + + /** + * Wraps the given output stream. + * + * @param output output stream * @return wrapped output, or the same output if it's instance of * {@link ClickHouseOutputStream} */ @@ -35,7 +107,7 @@ public static ClickHouseOutputStream of(OutputStream output) { /** * Wraps the given output stream. * - * @param output non-null output stream + * @param output output stream * @param bufferSize buffer size which is always greater than zero(usually 8192 * or larger) * @return wrapped output, or the same output if it's instance of @@ -48,7 +120,7 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize) { /** * Wraps the given output stream. * - * @param output non-null output stream + * @param output output stream * @param bufferSize buffer size which is always greater than zero(usually * 8192 or larger) * @param compression compression algorithm, null or @@ -61,35 +133,26 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize) { */ public static ClickHouseOutputStream of(OutputStream output, int bufferSize, ClickHouseCompression compression, Runnable postCloseAction) { - ClickHouseOutputStream chOutput; - if (compression != null && compression != ClickHouseCompression.NONE) { - switch (compression) { - case GZIP: - try { - chOutput = new WrappedOutputStream(new GZIPOutputStream(output), bufferSize, postCloseAction); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to wrap input stream", e); - } - break; - case LZ4: - chOutput = new Lz4OutputStream(output, bufferSize, postCloseAction); - break; - default: - throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression); - } + final ClickHouseOutputStream chOutput; + if (output == null) { + chOutput = EmptyOutputStream.INSTANCE; + } else if (compression == null || compression == ClickHouseCompression.NONE) { + chOutput = output != EmptyOutputStream.INSTANCE && output instanceof ClickHouseOutputStream + ? (ClickHouseOutputStream) output + : new WrappedOutputStream(null, output, bufferSize, postCloseAction); } else { - chOutput = output instanceof ClickHouseOutputStream ? (ClickHouseOutputStream) output - : new WrappedOutputStream(output, bufferSize, postCloseAction); + chOutput = wrap(null, output, bufferSize, postCloseAction, compression, 0); } - return chOutput; } + protected final ClickHouseFile file; protected final Runnable postCloseAction; protected boolean closed; - protected ClickHouseOutputStream(Runnable postCloseAction) { + protected ClickHouseOutputStream(ClickHouseFile file, Runnable postCloseAction) { + this.file = file != null ? file : ClickHouseFile.NULL; this.postCloseAction = postCloseAction; this.closed = false; } @@ -100,6 +163,15 @@ protected void ensureOpen() throws IOException { } } + /** + * Gets underlying file. + * + * @return non-null underlying file + */ + public ClickHouseFile getUnderlyingFile() { + return file; + } + /** * Transfers bytes into output stream without creating a copy. * diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHousePipedOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHousePipedOutputStream.java index e3ba22362..dabc1897e 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHousePipedOutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHousePipedOutputStream.java @@ -5,7 +5,7 @@ */ public abstract class ClickHousePipedOutputStream extends ClickHouseOutputStream { protected ClickHousePipedOutputStream(Runnable postCloseAction) { - super(postCloseAction); + super(null, postCloseAction); } /** diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java index b00c7d618..389aac6aa 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java @@ -1,8 +1,7 @@ package com.clickhouse.client; -import java.io.FileInputStream; -import java.io.FileNotFoundException; import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; @@ -106,6 +105,26 @@ public Mutation format(ClickHouseFormat format) { return super.format(format); } + /** + * Loads data from given file which may or may not be compressed. + * + * @param file absolute or relative path of the file, file extension will be + * used to determine if it's compressed or not + * @return mutation request + */ + public Mutation data(ClickHouseFile file) { + checkSealed(); + + final ClickHouseRequest self = this; + if (ClickHouseChecker.nonNull(file, "File").hasFormat()) { + format(file.getFormat()); + } + decompressClientRequest(file.isCompressed(), file.getCompressionAlgorithm()); + this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue + .of(() -> ClickHouseInputStream.of(file, self.getConfig().getReadBufferSize(), null))); + return this; + } + /** * Loads data from given file which may or may not be compressed. * @@ -130,15 +149,12 @@ public Mutation data(String file, ClickHouseCompression compression) { checkSealed(); final ClickHouseRequest self = this; - final String fileName = ClickHouseChecker.nonEmpty(file, "File"); - this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue.of(() -> { - try { - return ClickHouseInputStream.of(new FileInputStream(fileName), - self.getConfig().getReadBufferSize(), compression); - } catch (FileNotFoundException e) { - throw new IllegalArgumentException(e); - } - })); + final ClickHouseFile wrappedFile = ClickHouseFile.of(file, compression, 0, null); + if (wrappedFile.hasFormat()) { + format(wrappedFile.getFormat()); + } + this.input = changeProperty(PROP_DATA, this.input, ClickHouseDeferredValue + .of(() -> ClickHouseInputStream.of(wrappedFile, self.getConfig().getReadBufferSize(), null))); return this; } @@ -236,6 +252,7 @@ public Mutation seal() { private static final long serialVersionUID = 4990313525960702287L; static final String PROP_DATA = "data"; + static final String PROP_OUTPUT = "output"; static final String PROP_PREPARED_QUERY = "preparedQuery"; static final String PROP_QUERY = "query"; static final String PROP_QUERY_ID = "queryId"; @@ -253,6 +270,7 @@ public Mutation seal() { protected final Map namedParameters; protected transient ClickHouseDeferredValue input; + protected transient ClickHouseDeferredValue output; protected String queryId; protected String sql; protected ClickHouseParameterizedQuery preparedQuery; @@ -334,6 +352,7 @@ public ClickHouseRequest copy() { req.settings.putAll(settings); req.namedParameters.putAll(namedParameters); req.input = input; + req.output = output; req.queryId = queryId; req.sql = sql; req.preparedQuery = preparedQuery; @@ -358,6 +377,17 @@ public boolean hasInputStream() { return this.input != null || !this.externalTables.isEmpty(); } + /** + * Checks if the response should be redirected to an output stream, which was + * defined by one of {@code output(*)} methods. + * + * @return true if response should be redirected to an output stream; false + * otherwise + */ + public boolean hasOutputStream() { + return this.output != null; + } + /** * Depending on the {@link java.util.function.Function} passed to the * constructor, this method may return different node for each call. @@ -410,6 +440,15 @@ public Optional getInputStream() { return input != null ? input.getOptional() : Optional.empty(); } + /** + * Gets output stream. + * + * @return output stream + */ + public Optional getOutputStream() { + return output != null ? output.getOptional() : Optional.empty(); + } + /** * Gets immutable list of external tables. * @@ -543,6 +582,21 @@ public SelfT compressServerResponse(boolean enable) { (int) ClickHouseClientOption.COMPRESS_LEVEL.getEffectiveDefaultValue()); } + /** + * Enable or disable compression of server response. Pay attention that + * {@link ClickHouseClientOption#COMPRESS_ALGORITHM} and + * {@link ClickHouseClientOption#COMPRESS_LEVEL} will be used. + * + * @param compressAlgorithm compression algorihtm, null or + * {@link ClickHouseCompression#NONE} means no + * compression + * @return the request itself + */ + public SelfT compressServerResponse(ClickHouseCompression compressAlgorithm) { + return compressServerResponse(compressAlgorithm != null && compressAlgorithm != ClickHouseCompression.NONE, + compressAlgorithm, (int) ClickHouseClientOption.COMPRESS_LEVEL.getEffectiveDefaultValue()); + } + /** * Enable or disable compression of server response. Pay attention that * {@link ClickHouseClientOption#COMPRESS_LEVEL} will be used. @@ -608,6 +662,21 @@ public SelfT decompressClientRequest(boolean enable) { (int) ClickHouseClientOption.DECOMPRESS_LEVEL.getEffectiveDefaultValue()); } + /** + * Enable or disable compression of client request. Pay attention that + * {@link ClickHouseClientOption#DECOMPRESS_LEVEL} will be used. + * + * @param compressAlgorithm compression algorithm, null is treated as + * {@link ClickHouseCompression#NONE} or + * {@link ClickHouseClientOption#DECOMPRESS_ALGORITHM} + * depending on whether enabled + * @return the request itself + */ + public SelfT decompressClientRequest(ClickHouseCompression compressAlgorithm) { + return decompressClientRequest(compressAlgorithm != null && compressAlgorithm != ClickHouseCompression.NONE, + compressAlgorithm, (int) ClickHouseClientOption.DECOMPRESS_LEVEL.getEffectiveDefaultValue()); + } + /** * Enable or disable compression of client request. Pay attention that * {@link ClickHouseClientOption#DECOMPRESS_LEVEL} will be used. @@ -828,6 +897,100 @@ public SelfT options(Properties options) { return (SelfT) this; } + /** + * Sets output file, to which response will be redirected. + * + * @param file non-null output file + * @return the request itself + */ + @SuppressWarnings("unchecked") + public SelfT output(ClickHouseFile file) { + checkSealed(); + + final int bufferSize = getConfig().getWriteBufferSize(); + if (ClickHouseChecker.nonNull(file, "File").hasFormat()) { + format(file.getFormat()); + } + compressServerResponse(file.isCompressed(), file.getCompressionAlgorithm()); + this.output = changeProperty(PROP_OUTPUT, this.output, ClickHouseDeferredValue + .of(() -> ClickHouseOutputStream.of(file, bufferSize, null))); + + return (SelfT) this; + } + + /** + * Sets output file, to which response will be redirected. + * + * @param file non-empty path to the file + * @return the request itself + */ + public SelfT output(String file) { + return output(file, ClickHouseCompression.fromFileName(file)); + } + + /** + * Sets compressed output file, to which response will be redirected. + * + * @param file non-empty path to the file + * @param compression compression algorithm, {@code null} or + * {@link ClickHouseCompression#NONE} means no compression + * @return the request itself + */ + @SuppressWarnings("unchecked") + public SelfT output(String file, ClickHouseCompression compression) { + checkSealed(); + + final int bufferSize = getConfig().getWriteBufferSize(); + final ClickHouseFile wrappedFile = ClickHouseFile.of(file, compression, 0, null); + if (wrappedFile.hasFormat()) { + format(wrappedFile.getFormat()); + } + this.output = changeProperty(PROP_OUTPUT, this.output, ClickHouseDeferredValue + .of(() -> ClickHouseOutputStream.of(wrappedFile, bufferSize, null))); + return (SelfT) this; + } + + /** + * Sets output stream, to which response will be redirected. + * + * @param output non-null output stream + * @return the request itself + */ + public SelfT output(OutputStream output) { + return output(ClickHouseOutputStream.of(output)); + } + + /** + * Sets output stream, to which response will be redirected. + * + * @param output non-null output stream + * @return the request itself + */ + @SuppressWarnings("unchecked") + public SelfT output(ClickHouseOutputStream output) { + checkSealed(); + + this.output = changeProperty(PROP_DATA, this.output, + ClickHouseDeferredValue.of(output, ClickHouseOutputStream.class)); + + return (SelfT) this; + } + + /** + * Sets output stream, to which response will be redirected. + * + * @param output non-null output stream + * @return the request itself + */ + @SuppressWarnings("unchecked") + public SelfT output(ClickHouseDeferredValue output) { + checkSealed(); + + this.output = changeProperty(PROP_OUTPUT, this.output, output); + + return (SelfT) this; + } + /** * Sets stringified parameters. Be aware of SQL injection risk as mentioned in * {@link #params(String, String...)}. @@ -1410,6 +1573,7 @@ public SelfT reset() { this.namedParameters.clear(); this.input = changeProperty(PROP_DATA, this.input, null); + this.output = changeProperty(PROP_OUTPUT, this.output, null); this.sql = changeProperty(PROP_QUERY, this.sql, null); this.preparedQuery = changeProperty(PROP_PREPARED_QUERY, this.preparedQuery, null); this.queryId = changeProperty(PROP_QUERY_ID, this.queryId, null); @@ -1437,6 +1601,7 @@ public ClickHouseRequest seal() { req.namedParameters.putAll(namedParameters); req.input = input; + req.output = output; req.queryId = queryId; req.sql = sql; req.preparedQuery = preparedQuery; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseUtils.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseUtils.java index 8f8a2d264..35f075cd8 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseUtils.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseUtils.java @@ -233,6 +233,20 @@ public static String format(String template, Object... args) { return String.format(Locale.ROOT, template, args); } + /** + * Normalizes given directory by appending back slash if it does exist. + * + * @param dir original directory + * @return normalized directory + */ + public static String normalizeDirectory(String dir) { + if (dir == null || dir.isEmpty()) { + return "./"; + } + + return dir.charAt(dir.length() - 1) == '/' ? dir : dir.concat("/"); + } + private static int readJsonArray(String json, List array, int startIndex, int len) { StringBuilder builder = new StringBuilder(); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseOption.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseOption.java index 5230446d4..905d0e8c4 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseOption.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseOption.java @@ -1,6 +1,7 @@ package com.clickhouse.client.config; import java.io.Serializable; +import java.util.Locale; import java.util.Optional; /** @@ -67,12 +68,7 @@ static T fromString(String value, Class clazz) { * @return trimmed default value defined in environment variable */ default Optional getDefaultValueFromEnvVar() { - String prefix = getPrefix().toUpperCase(); - String optionName = name(); - int length = optionName.length(); - - String value = System.getenv(new StringBuilder(length + prefix.length() + 1).append(prefix).append('_') - .append(optionName.toUpperCase()).toString()); + String value = System.getenv(getEnvironmentVariable()); if (value != null) { value = value.trim(); } @@ -87,12 +83,7 @@ default Optional getDefaultValueFromEnvVar() { * @return trimmed default value defined in system property */ default Optional getDefaultValueFromSysProp() { - String prefix = getPrefix().toLowerCase(); - String optionName = name(); - int length = optionName.length(); - - String value = System.getProperty(new StringBuilder(length + prefix.length() + 1).append(prefix).append('_') - .append(optionName.toLowerCase()).toString()); + String value = System.getProperty(getSystemProperty()); if (value != null) { value = value.trim(); } @@ -158,6 +149,30 @@ default String getPrefix() { return "CHC"; } + /** + * Gets environment variable for the option. + * + * @return environment variable + */ + default String getEnvironmentVariable() { + String name = name().toUpperCase(Locale.ROOT); + String prefix = getPrefix().toUpperCase(Locale.ROOT); + return new StringBuilder(prefix.length() + name.length() + 1).append(prefix).append('_').append(name) + .toString(); + } + + /** + * Gets system property for the option. + * + * @return system property + */ + default String getSystemProperty() { + String name = name().toLowerCase(Locale.ROOT); + String prefix = getPrefix().toLowerCase(Locale.ROOT); + return new StringBuilder(prefix.length() + name.length() + 1).append(prefix).append('_').append(name) + .toString(); + } + /** * Gets value type of the option. * diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java index 9f3f8afcf..a1f5b94e1 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java @@ -15,6 +15,7 @@ import com.clickhouse.client.ClickHouseColumn; import com.clickhouse.client.ClickHouseCompression; import com.clickhouse.client.ClickHouseDeferredValue; +import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseUtils; @@ -24,6 +25,7 @@ public class ClickHouseExternalTable { public static class Builder { private String name; + private ClickHouseFile file; private ClickHouseDeferredValue content; private ClickHouseCompression compression; private ClickHouseFormat format; @@ -44,6 +46,16 @@ public Builder compression(ClickHouseCompression compression) { return this; } + public Builder content(ClickHouseFile file) { + this.file = ClickHouseChecker.nonNull(file, "file"); + this.compression = file.getCompressionAlgorithm(); + this.content = ClickHouseDeferredValue.of(file.asInputStream(), InputStream.class); + if (file.hasFormat()) { + this.format = file.getFormat(); + } + return this; + } + public Builder content(InputStream content) { this.content = ClickHouseDeferredValue.of(ClickHouseChecker.nonNull(content, "content"), InputStream.class); return this; @@ -144,7 +156,7 @@ public Builder asExternalTable() { } public ClickHouseExternalTable build() { - return new ClickHouseExternalTable(name, content, compression, format, columns, asTempTable); + return new ClickHouseExternalTable(name, file, content, compression, format, columns, asTempTable); } } @@ -153,6 +165,7 @@ public static Builder builder() { } private final String name; + private final ClickHouseFile file; private final ClickHouseDeferredValue content; private final Optional compression; private final ClickHouseFormat format; @@ -161,10 +174,11 @@ public static Builder builder() { private final String structure; - protected ClickHouseExternalTable(String name, ClickHouseDeferredValue content, + protected ClickHouseExternalTable(String name, ClickHouseFile file, ClickHouseDeferredValue content, ClickHouseCompression compression, ClickHouseFormat format, Collection columns, boolean asTempTable) { this.name = name == null ? "" : name.trim(); + this.file = file != null ? file : ClickHouseFile.NULL; this.content = ClickHouseChecker.nonNull(content, "content"); if (compression == null) { compression = ClickHouseCompression.fromFileName(this.name); @@ -200,6 +214,10 @@ public String getName() { return name; } + public ClickHouseFile getFile() { + return file; + } + public InputStream getContent() { return content.get(); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java index fd8fe5393..2f6e2a85c 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseStreamResponse.java @@ -105,7 +105,10 @@ public void close() { } try { - log.debug("%d bytes skipped before closing input stream", input.skip(Long.MAX_VALUE)); + long skipped = input.skip(Long.MAX_VALUE); + if (skipped > 0L) { + log.debug("%d bytes skipped before closing input stream", skipped); + } } catch (Exception e) { // ignore log.debug("Failed to skip reading input stream due to: %s", e.getMessage()); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteArrayInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteArrayInputStream.java index 7c2a2b697..571cba19b 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteArrayInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteArrayInputStream.java @@ -7,6 +7,7 @@ import com.clickhouse.client.ClickHouseByteBuffer; import com.clickhouse.client.ClickHouseDataUpdater; +import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHouseUtils; @@ -19,8 +20,8 @@ public abstract class AbstractByteArrayInputStream extends ClickHouseInputStream protected int position; protected int limit; - protected AbstractByteArrayInputStream(OutputStream copyTo, Runnable postCloseAction) { - super(copyTo, postCloseAction); + protected AbstractByteArrayInputStream(ClickHouseFile file, OutputStream copyTo, Runnable postCloseAction) { + super(file, copyTo, postCloseAction); buffer = ClickHouseByteBuffer.EMPTY_BYTES; position = 0; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteArrayOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteArrayOutputStream.java index 3875ebc0e..c2bfb1216 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteArrayOutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteArrayOutputStream.java @@ -3,6 +3,7 @@ import java.io.IOException; import com.clickhouse.client.ClickHouseDataUpdater; +import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.ClickHouseOutputStream; public abstract class AbstractByteArrayOutputStream extends ClickHouseOutputStream { @@ -17,8 +18,8 @@ protected void flushBuffer() throws IOException { protected abstract void flushBuffer(byte[] bytes, int offset, int length) throws IOException; - protected AbstractByteArrayOutputStream(int bufferSize, Runnable postCloseAction) { - super(postCloseAction); + protected AbstractByteArrayOutputStream(ClickHouseFile file, int bufferSize, Runnable postCloseAction) { + super(file, postCloseAction); buffer = new byte[bufferSize]; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteBufferInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteBufferInputStream.java index 825971169..a62d91dbd 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteBufferInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/AbstractByteBufferInputStream.java @@ -9,6 +9,7 @@ import com.clickhouse.client.ClickHouseByteBuffer; import com.clickhouse.client.ClickHouseDataUpdater; +import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHouseUtils; @@ -19,8 +20,8 @@ public abstract class AbstractByteBufferInputStream extends ClickHouseInputStream { protected ByteBuffer buffer; - protected AbstractByteBufferInputStream(OutputStream copyTo, Runnable postCloseAction) { - super(copyTo, postCloseAction); + protected AbstractByteBufferInputStream(ClickHouseFile file, OutputStream copyTo, Runnable postCloseAction) { + super(file, copyTo, postCloseAction); this.buffer = null; } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/BlockingInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/BlockingInputStream.java index 8ff4b92c9..4d6fa0d9c 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/BlockingInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/BlockingInputStream.java @@ -22,7 +22,7 @@ public class BlockingInputStream extends AbstractByteBufferInputStream { private final int timeout; public BlockingInputStream(BlockingQueue queue, int timeout, Runnable postCloseAction) { - super(null, postCloseAction); + super(null, null, postCloseAction); this.queue = ClickHouseChecker.nonNull(queue, "Queue"); this.timeout = timeout > 0 ? timeout : 0; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/EmptyInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/EmptyInputStream.java index 5fe211039..ec0624850 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/EmptyInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/EmptyInputStream.java @@ -15,7 +15,7 @@ public final class EmptyInputStream extends ClickHouseInputStream { public static final EmptyInputStream INSTANCE = new EmptyInputStream(); private EmptyInputStream() { - super(null, null); + super(null, null, null); } @Override diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/EmptyOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/EmptyOutputStream.java new file mode 100644 index 000000000..4cc8a6c6d --- /dev/null +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/EmptyOutputStream.java @@ -0,0 +1,47 @@ +package com.clickhouse.client.stream; + +import java.io.IOException; + +import com.clickhouse.client.ClickHouseDataUpdater; +import com.clickhouse.client.ClickHouseOutputStream; + +/** + * Empty output stream consumes nothing and it can never be closed. + */ +public final class EmptyOutputStream extends ClickHouseOutputStream { + public static final EmptyOutputStream INSTANCE = new EmptyOutputStream(); + + private EmptyOutputStream() { + super(null, null); + } + + @Override + public boolean isClosed() { + return false; + } + + @Override + public void close() throws IOException { + // do nothing + } + + @Override + public ClickHouseOutputStream transferBytes(byte[] bytes, int offset, int length) throws IOException { + return this; + } + + @Override + public ClickHouseOutputStream writeByte(byte b) throws IOException { + return this; + } + + @Override + public ClickHouseOutputStream writeBytes(byte[] bytes, int offset, int length) throws IOException { + return this; + } + + @Override + public ClickHouseOutputStream writeCustom(ClickHouseDataUpdater writer) throws IOException { + return this; + } +} diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableByteArrayInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableByteArrayInputStream.java index 7dbb7b842..c7d5375cd 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableByteArrayInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableByteArrayInputStream.java @@ -10,7 +10,7 @@ public class IterableByteArrayInputStream extends AbstractByteArrayInputStream { private final Iterator it; public IterableByteArrayInputStream(Iterable source, Runnable postCloseAction) { - super(null, postCloseAction); + super(null, null, postCloseAction); it = ClickHouseChecker.nonNull(source, "Source").iterator(); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableByteBufferInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableByteBufferInputStream.java index 038ce6b9c..94d370c1b 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableByteBufferInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableByteBufferInputStream.java @@ -12,7 +12,7 @@ public class IterableByteBufferInputStream extends AbstractByteBufferInputStream private final Iterator it; public IterableByteBufferInputStream(Iterable source, Runnable postCloseAction) { - super(null, postCloseAction); + super(null, null, postCloseAction); it = ClickHouseChecker.nonNull(source, "Source").iterator(); } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableMultipleInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableMultipleInputStream.java index 1d973a97a..bbb9130a8 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableMultipleInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableMultipleInputStream.java @@ -60,7 +60,7 @@ protected int updateBuffer() throws IOException { public IterableMultipleInputStream(Iterable source, Function converter, Runnable postCloseAction) { - super(null, postCloseAction); + super(null, null, postCloseAction); func = ClickHouseChecker.nonNull(converter, "Converter"); it = ClickHouseChecker.nonNull(source, "Source").iterator(); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableObjectInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableObjectInputStream.java index d9bab532f..ef6f80211 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableObjectInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/IterableObjectInputStream.java @@ -12,7 +12,7 @@ public class IterableObjectInputStream extends AbstractByteArrayInputStream { private final Iterator it; public IterableObjectInputStream(Iterable source, Function converter, Runnable postCloseAction) { - super(null, postCloseAction); + super(null, null, postCloseAction); func = ClickHouseChecker.nonNull(converter, "Converter"); it = ClickHouseChecker.nonNull(source, "Source").iterator(); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4InputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4InputStream.java index 353531cdf..6b1354de3 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4InputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4InputStream.java @@ -5,6 +5,7 @@ import com.clickhouse.client.ClickHouseByteBuffer; import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.ClickHouseUtils; import com.clickhouse.client.data.BinaryStreamUtils; import com.clickhouse.client.data.ClickHouseCityHash; @@ -83,11 +84,11 @@ protected int updateBuffer() throws IOException { } public Lz4InputStream(InputStream stream) { - this(stream, null); + this(null, stream, null); } - public Lz4InputStream(InputStream stream, Runnable postCloseAction) { - super(null, postCloseAction); + public Lz4InputStream(ClickHouseFile file, InputStream stream, Runnable postCloseAction) { + super(file, null, postCloseAction); this.decompressor = factory.fastDecompressor(); this.stream = ClickHouseChecker.nonNull(stream, "InputStream"); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java index f70ff7e8d..15d0a3889 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java @@ -4,6 +4,7 @@ import java.io.OutputStream; import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.data.BinaryStreamUtils; import com.clickhouse.client.data.ClickHouseCityHash; @@ -48,7 +49,12 @@ protected void flushBuffer(byte[] bytes, int offset, int length) throws IOExcept } public Lz4OutputStream(OutputStream stream, int maxCompressBlockSize, Runnable postCloseAction) { - super(maxCompressBlockSize, postCloseAction); + this(null, stream, maxCompressBlockSize, postCloseAction); + } + + public Lz4OutputStream(ClickHouseFile file, OutputStream stream, int maxCompressBlockSize, + Runnable postCloseAction) { + super(file, maxCompressBlockSize, postCloseAction); output = ClickHouseChecker.nonNull(stream, "OutputStream"); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/NonBlockingInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/NonBlockingInputStream.java index 42b31eec1..dcfda0384 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/NonBlockingInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/NonBlockingInputStream.java @@ -23,7 +23,7 @@ public class NonBlockingInputStream extends ClickHouseInputStream { private int position; public NonBlockingInputStream(AdaptiveQueue queue, int timeout, Runnable postCloseAction) { - super(null, postCloseAction); + super(null, null, postCloseAction); this.queue = ClickHouseChecker.nonNull(queue, "Queue"); this.timeout = timeout > 0 ? timeout : 0; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/WrappedInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/WrappedInputStream.java index 7c68f9c57..c581b47c1 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/WrappedInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/WrappedInputStream.java @@ -7,6 +7,7 @@ import com.clickhouse.client.ClickHouseByteBuffer; import com.clickhouse.client.ClickHouseChecker; import com.clickhouse.client.ClickHouseDataUpdater; +import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHouseUtils; import com.clickhouse.client.config.ClickHouseClientOption; @@ -43,8 +44,8 @@ protected int updateBuffer() throws IOException { return limit - position; } - public WrappedInputStream(InputStream input, int bufferSize, Runnable postCloseAction) { - super(null, postCloseAction); + public WrappedInputStream(ClickHouseFile file, InputStream input, int bufferSize, Runnable postCloseAction) { + super(file, null, postCloseAction); in = ClickHouseChecker.nonNull(input, "InputStream"); // fixed buffer diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/WrappedOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/WrappedOutputStream.java index b391f426f..4a02b8fa6 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/WrappedOutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/WrappedOutputStream.java @@ -4,6 +4,7 @@ import java.io.OutputStream; import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.ClickHouseUtils; import com.clickhouse.client.config.ClickHouseClientOption; @@ -18,8 +19,8 @@ protected void flushBuffer(byte[] bytes, int offset, int length) throws IOExcept output.write(bytes, offset, length); } - public WrappedOutputStream(OutputStream stream, int bufferSize, Runnable postCloseAction) { - super(ClickHouseUtils.getBufferSize(bufferSize, + public WrappedOutputStream(ClickHouseFile file, OutputStream stream, int bufferSize, Runnable postCloseAction) { + super(file, ClickHouseUtils.getBufferSize(bufferSize, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), (int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue()), postCloseAction); diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseOutputStreamTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseOutputStreamTest.java index bdfcf9c2b..df8e65033 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseOutputStreamTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseOutputStreamTest.java @@ -42,7 +42,7 @@ public void testWriteString() throws IOException { @Test(groups = { "unit" }) public void testNullOrClosedOutput() throws IOException { - Assert.assertThrows(IllegalArgumentException.class, () -> ClickHouseOutputStream.of(null)); + Assert.assertEquals(ClickHouseOutputStream.of(null), ClickHouseOutputStream.empty()); ByteArrayOutputStream inner = new ByteArrayOutputStream(); OutputStream out = new BufferedOutputStream(inner); ClickHouseOutputStream empty = ClickHouseOutputStream.of(out); diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseServerForTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseServerForTest.java index 7fe3518af..e59ee5326 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseServerForTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseServerForTest.java @@ -48,6 +48,8 @@ public class ClickHouseServerForTest { // ignore } + final String containerName = System.getenv("CHC_TEST_CONTAINER_ID"); + String host = ClickHouseUtils.getProperty("clickhouseServer", properties); clickhouseServer = ClickHouseChecker.isNullOrEmpty(host) ? null : host; @@ -102,7 +104,13 @@ public class ClickHouseServerForTest { ? new GenericContainer<>(imageNameWithTag) : new GenericContainer<>(new ImageFromDockerfile().withDockerfileFromBuilder(builder -> builder .from(imageNameWithTag).run("apt-get update && apt-get install -y " + additionalPackages)))) - .withCreateContainerCmdModifier(it -> it.withEntrypoint("/bin/sh")) + .withCreateContainerCmdModifier( + it -> { + it.withEntrypoint("/bin/sh"); + if (!ClickHouseChecker.isNullOrBlank(containerName)) { + it.withName(containerName); + } + }) .withCommand("-c", String.format("chmod +x %1$s/patch && %1$s/patch", customDirectory)) .withEnv("TZ", timezone) .withExposedPorts(ClickHouseProtocol.GRPC.getDefaultPort(), @@ -113,6 +121,8 @@ public class ClickHouseServerForTest { ClickHouseProtocol.TCP.getDefaultSecurePort(), ClickHouseProtocol.POSTGRESQL.getDefaultPort()) .withClasspathResourceMapping("containers/clickhouse-server", customDirectory, BindMode.READ_ONLY) + .withFileSystemBind(System.getProperty("java.io.tmpdir"), getClickHouseContainerTmpDir(), + BindMode.READ_WRITE) .waitingFor(Wait.forHttp("/ping").forPort(ClickHouseProtocol.HTTP.getDefaultPort()) .forStatusCode(200).withStartupTimeout(Duration.of(60, SECONDS))); } @@ -126,6 +136,10 @@ public static GenericContainer getClickHouseContainer() { return clickhouseContainer; } + public static String getClickHouseContainerTmpDir() { + return "/tmp"; + } + public static String getClickHouseAddress() { return getClickHouseAddress(ClickHouseProtocol.ANY, false); } @@ -134,7 +148,7 @@ public static String getClickHouseAddress(ClickHouseProtocol protocol, boolean u StringBuilder builder = new StringBuilder(); if (clickhouseContainer != null) { - builder.append(useIPaddress ? clickhouseContainer.getContainerIpAddress() : clickhouseContainer.getHost()) + builder.append(useIPaddress ? clickhouseContainer.getHost() : clickhouseContainer.getHost()) .append(':').append(clickhouseContainer.getMappedPort(protocol.getDefaultPort())); } else { String port = ClickHouseUtils @@ -154,7 +168,7 @@ public static ClickHouseNode getClickHouseNode(ClickHouseProtocol protocol, bool int port = useSecurePort ? protocol.getDefaultSecurePort() : protocol.getDefaultPort(); if (clickhouseContainer != null) { - host = clickhouseContainer.getContainerIpAddress(); + host = clickhouseContainer.getHost(); port = clickhouseContainer.getMappedPort(port); } else { String config = ClickHouseUtils @@ -171,7 +185,7 @@ public static ClickHouseNode getClickHouseNode(ClickHouseProtocol protocol, int String host = clickhouseServer; if (clickhouseContainer != null) { - host = clickhouseContainer.getContainerIpAddress(); + host = clickhouseContainer.getHost(); port = clickhouseContainer.getMappedPort(port); } diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index 5618b7a8b..a29054e90 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -13,6 +13,7 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -60,12 +61,16 @@ protected ClickHouseResponseSummary execute(ClickHouseRequest request, String protected abstract Class getClientClass(); + protected ClickHouseClientBuilder initClient(ClickHouseClientBuilder builder) { + return builder; + } + protected ClickHouseClient getClient() { - return ClickHouseClient.newInstance(getProtocol()); + return initClient(ClickHouseClient.builder()).nodeSelector(ClickHouseNodeSelector.of(getProtocol())).build(); } protected ClickHouseClient getSecureClient() { - return ClickHouseClient.builder() + return initClient(ClickHouseClient.builder()) .nodeSelector(ClickHouseNodeSelector.of(getProtocol())) .option(ClickHouseClientOption.SSL, true) .option(ClickHouseClientOption.SSL_MODE, ClickHouseSslMode.STRICT) @@ -166,7 +171,7 @@ public void testInitialization() throws Exception { @Test(groups = { "integration" }) public void testOpenCloseClient() throws Exception { - int count = 100; + int count = 10; int timeout = 3000; ClickHouseNode server = getServer(); for (int i = 0; i < count; i++) { @@ -902,6 +907,41 @@ public void testCustomRead() throws Exception { Assert.assertEquals(count, 1000L); } + @Test(groups = { "integration" }) + public void testDumpAndLoadFile() throws Exception { + // super.testLoadRawData(); + ClickHouseNode server = getServer(); + ClickHouseClient.send(server, "drop table if exists test_dump_load_file", + "create table test_dump_load_file(a UInt64, b Nullable(String)) engine=MergeTree() order by tuple()") + .get(); + + final int rows = 10000; + final Path tmp = Paths.get(System.getProperty("java.io.tmpdir"), "file.json"); + ClickHouseFile file = ClickHouseFile.of(tmp); + ClickHouseClient.dump(server, + ClickHouseUtils.format( + "select number a, if(modulo(number, 2) = 0, null, toString(number)) b from numbers(%d)", + rows), + file).get(); + Assert.assertTrue(Files.exists(tmp), ClickHouseUtils.format("File [%s] should exist", tmp)); + Assert.assertTrue(Files.size(tmp) > 0, ClickHouseUtils.format("File [%s] should have content", tmp)); + + ClickHouseClient.load(server, "test_dump_load_file", file).get(); + + try (ClickHouseClient client = getClient(); + ClickHouseResponse response = client.connect(server).query("select count(1) from test_dump_load_file") + .executeAndWait()) { + Assert.assertEquals(response.firstRecord().getValue(0).asInteger(), rows); + } + + try (ClickHouseClient client = getClient(); + ClickHouseResponse response = client.connect(server) + .query("select count(1) from test_dump_load_file where b is null") + .executeAndWait()) { + Assert.assertEquals(response.firstRecord().getValue(0).asInteger(), rows / 2); + } + } + @Test(groups = { "integration" }) public void testDump() throws Exception { ClickHouseNode server = getServer(); diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/stream/InputStreamImplTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/stream/InputStreamImplTest.java index cf9238218..119dfe3ea 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/stream/InputStreamImplTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/stream/InputStreamImplTest.java @@ -302,7 +302,7 @@ private Object[][] getInputStreamsWithData() { @DataProvider(name = "streamWithData") private Object[][] getInputStreamWithData() { return new Object[][] { - new Object[] { new WrappedInputStream( + new Object[] { new WrappedInputStream(null, new ByteArrayInputStream(new byte[] { -1, 1, 2, 3, 4, 5, 6 }, 1, 5), 1, null) }, new Object[] { new IterableByteArrayInputStream( diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/stream/OutputStreamImplTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/stream/OutputStreamImplTest.java index 0b6bb892c..6aa0c362b 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/stream/OutputStreamImplTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/stream/OutputStreamImplTest.java @@ -35,7 +35,7 @@ private Object[][] getOutputStreamBufferSize() { @DataProvider(name = "streamWithData") private Object[][] getOutputStreamWithData() { return new Object[][] { - new Object[] { new WrappedInputStream( + new Object[] { new WrappedInputStream(null, new ByteArrayInputStream(new byte[] { -1, 1, 2, 3, 4, 5, 6 }, 1, 5), 1, null) }, new Object[] { @@ -65,7 +65,7 @@ private Object[][] getOutputStreamWithData() { @Test(dataProvider = "bufferSizeProvider", groups = { "unit" }) public void testEmptyOrClosedOutput(int bufferSize) throws IOException { try (ByteArrayOutputStream bas = new ByteArrayOutputStream(); - WrappedOutputStream out = new WrappedOutputStream(bas, bufferSize, null)) { + WrappedOutputStream out = new WrappedOutputStream(null, bas, bufferSize, null)) { Assert.assertFalse(out.isClosed()); out.write(12); out.flush(); @@ -76,7 +76,7 @@ public void testEmptyOrClosedOutput(int bufferSize) throws IOException { @Test(dataProvider = "bufferSizeProvider", groups = { "unit" }) public void testWriteCustom(int bufferSize) throws IOException { try (ByteArrayOutputStream bas = new ByteArrayOutputStream(); - ClickHouseOutputStream out = new WrappedOutputStream(bas, bufferSize, null)) { + ClickHouseOutputStream out = new WrappedOutputStream(null, bas, bufferSize, null)) { out.writeCustom((bytes, position, limit) -> 0); Assert.assertEquals(bas.toByteArray(), new byte[0]); out.writeCustom((bytes, position, limit) -> { diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java index 409688e4b..ad49f65e2 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseGrpcClient.java @@ -178,7 +178,7 @@ protected CompletableFuture executeAsync(ClickHouseRequest requestObserver = stub.executeQueryWithStreamIO(responseObserver); if (sealedRequest.hasInputStream()) { diff --git a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseStreamObserver.java b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseStreamObserver.java index 6aa8ae85e..f1b19f9dd 100644 --- a/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseStreamObserver.java +++ b/clickhouse-grpc-client/src/main/java/com/clickhouse/client/grpc/ClickHouseStreamObserver.java @@ -11,6 +11,7 @@ import com.clickhouse.client.ClickHouseException; import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHousePipedOutputStream; import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.client.ClickHouseUtils; @@ -30,21 +31,28 @@ public class ClickHouseStreamObserver implements StreamObserver { private final CountDownLatch startLatch; private final CountDownLatch finishLatch; - private final ClickHousePipedOutputStream stream; + private final ClickHouseOutputStream stream; private final ClickHouseInputStream input; private final ClickHouseResponseSummary summary; private Throwable error; - protected ClickHouseStreamObserver(ClickHouseConfig config, ClickHouseNode server) { + protected ClickHouseStreamObserver(ClickHouseConfig config, ClickHouseNode server, ClickHouseOutputStream output) { this.server = server; this.startLatch = new CountDownLatch(1); this.finishLatch = new CountDownLatch(1); - this.stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, null); - this.input = ClickHouseGrpcResponse.getInput(config, this.stream.getInputStream()); + if (output != null) { + this.stream = output; + this.input = ClickHouseInputStream.empty(); + } else { + ClickHousePipedOutputStream pipedStream = ClickHouseDataStreamFactory.getInstance() + .createPipedOutputStream(config, null); + this.stream = pipedStream; + this.input = ClickHouseGrpcResponse.getInput(config, pipedStream.getInputStream()); + } this.summary = new ClickHouseResponseSummary(null, null); diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java index 09f99f1cd..99041f9be 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java @@ -18,6 +18,7 @@ import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseCredentials; import com.clickhouse.client.ClickHouseNode; +import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseUtils; import com.clickhouse.client.config.ClickHouseClientOption; @@ -153,6 +154,8 @@ static String buildUrl(ClickHouseNode server, ClickHouseRequest request) { protected final ClickHouseNode server; protected final Map defaultHeaders; + protected final ClickHouseOutputStream output; + protected final String url; protected ClickHouseHttpConnection(ClickHouseNode server, ClickHouseRequest request) { @@ -163,6 +166,8 @@ protected ClickHouseHttpConnection(ClickHouseNode server, ClickHouseRequest r this.config = request.getConfig(); this.server = server; + this.output = request.getOutputStream().orElse(null); + this.url = buildUrl(server, request); Map map = new LinkedHashMap<>(); diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java index 6c38817c2..dbd0be494 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java @@ -22,6 +22,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.UncheckedIOException; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.Charset; @@ -81,9 +83,24 @@ private ClickHouseHttpResponse buildResponse() throws IOException { : timeZone; } + final InputStream source; + final Runnable action; + if (output != null) { + source = ClickHouseInputStream.empty(); + action = () -> { + try (OutputStream o = output) { + ClickHouseInputStream.pipe(conn.getInputStream(), o, c.getWriteBufferSize()); + } catch (IOException e) { + throw new UncheckedIOException("Failed to redirect response to given output stream", e); + } + }; + } else { + source = conn.getInputStream(); + action = null; + } return new ClickHouseHttpResponse(this, - hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, conn.getInputStream(), null) - : ClickHouseClient.getResponseInputStream(c, conn.getInputStream(), null), + hasQueryResult ? ClickHouseClient.getAsyncResponseInputStream(c, source, action) + : ClickHouseClient.getResponseInputStream(c, source, action), displayName, queryId, summary, format, timeZone); } diff --git a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java index e4abcc77d..ec018bffb 100644 --- a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java +++ b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java @@ -19,8 +19,10 @@ import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; +import java.io.OutputStream; import java.io.OutputStreamWriter; import java.io.Reader; +import java.io.UncheckedIOException; import java.net.HttpURLConnection; import java.net.URI; import java.net.http.HttpClient; @@ -69,8 +71,24 @@ private ClickHouseHttpResponse buildResponse(HttpResponse r) throws : timeZone; } - return new ClickHouseHttpResponse(this, - ClickHouseClient.getResponseInputStream(config, checkResponse(r).body(), this::closeQuietly), + final InputStream source; + final Runnable action; + if (output != null) { + source = ClickHouseInputStream.empty(); + action = () -> { + try (OutputStream o = output) { + ClickHouseInputStream.pipe(checkResponse(r).body(), o, config.getWriteBufferSize()); + } catch (IOException e) { + throw new UncheckedIOException("Failed to redirect response to given output stream", e); + } finally { + closeQuietly(); + } + }; + } else { + source = checkResponse(r).body(); + action = this::closeQuietly; + } + return new ClickHouseHttpResponse(this, ClickHouseClient.getResponseInputStream(config, source, action), displayName, queryId, summary, format, timeZone); } diff --git a/clickhouse-jdbc/pom.xml b/clickhouse-jdbc/pom.xml index a27f8556c..b6494d3f5 100644 --- a/clickhouse-jdbc/pom.xml +++ b/clickhouse-jdbc/pom.xml @@ -30,6 +30,18 @@ ${revision} provided + + ${project.parent.groupId} + clickhouse-cli-client + shaded + ${revision} + + + * + * + + + ${project.parent.groupId} clickhouse-grpc-client @@ -319,9 +331,6 @@ *:* mozilla/** - **/darwin/** - **/linux/** - **/win32/** **/module-info.class META-INF/DEPENDENCIES META-INF/MANIFEST.MF diff --git a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDatabaseMetaData.java b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDatabaseMetaData.java index 6a21879e7..6ef117ff3 100644 --- a/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDatabaseMetaData.java +++ b/clickhouse-jdbc/src/main/java/com/clickhouse/jdbc/ClickHouseDatabaseMetaData.java @@ -1266,7 +1266,7 @@ public ResultSet getFunctions(String catalog, String schemaPattern, String funct + "1 as FUNCTION_TYPE, name as SPECIFIC_NAME from system.functions\n" + "where alias_to = '' and name like :pattern order by name union all\n" + "select null as FUNCTION_CAT, 'system' as FUNCTION_SCHEM, name as FUNCTION_NAME,\n" - + "'case-sensistive table function' as REMARKS, 2 as FUNCTION_TYPE, name as SPECIFIC_NAME from system.table_functions\n" + + "'case-sensitive table function' as REMARKS, 2 as FUNCTION_TYPE, name as SPECIFIC_NAME from system.table_functions\n" + "order by name) where :filter", params); return query(sql); diff --git a/pom.xml b/pom.xml index e6385c5ad..b7ddae4f3 100644 --- a/pom.xml +++ b/pom.xml @@ -36,9 +36,10 @@ clickhouse-client + clickhouse-cli-client clickhouse-grpc-client clickhouse-http-client - clickhouse-tcp-client + clickhouse-jdbc clickhouse-benchmark @@ -78,11 +79,12 @@ UTF-8 6.0.53 - 9.2 - 3.0.5 + 9.3 + 1.12.10 + 3.1.0 1.21 3.4.4 - 3.5.0 + 3.5.1 8.5.8 1.45.1 2.9.0 @@ -92,36 +94,36 @@ 0.9.25 2.0.0-alpha5 3.12.4 - 2.32.0 - 1.16.3 + 2.33.2 + 1.17.1 7.5 - 3.0.3 - 8.0.28 - 42.3.3 + 3.0.4 + 8.0.29 + 42.3.5 1.2.0 3.3.0 - 3.8.1 - 3.0.0-M1 - 3.0.0-M3 - 3.0.0-M5 + 3.10.1 + 3.0.0-M2 + 3.0.0 + 3.0.0-M6 1.2.7 4.9.9 - 1.6 - 3.2.0 - 0.8.6 - 3.2.0 - 3.2.0 + 3.0.1 + 3.3.0 + 0.8.8 + 3.2.2 + 3.4.0 1.7.0 0.6.1 - 3.2.4 + 3.3.0 3.2.1 - 1.6.8 - 3.0.0-M5 + 1.6.13 + 3.0.0-M6 3.0.0 - 2.8.1 + 2.10.0 1.8 false @@ -525,6 +527,23 @@ true + @@ -655,6 +674,26 @@ + release @@ -677,6 +716,9 @@ 11 + @@ -692,6 +734,10 @@ java11 compile + @@ -877,6 +923,9 @@ 11 + @@ -892,6 +941,10 @@ java11 compile + @@ -929,4 +982,4 @@ - + \ No newline at end of file