Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.clickhouse.client.ClickHouseInputStream;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseOutputStream;
import com.clickhouse.client.ClickHousePassThruStream;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseUtils;
import com.clickhouse.client.cli.config.ClickHouseCommandLineOption;
Expand Down Expand Up @@ -265,7 +266,7 @@ static Process startProcess(ClickHouseRequest<?> request) {
ClickHouseFile tableFile = table.getFile();
commands.add("--external");
String filePath;
if (!tableFile.isAvailable() || !tableFile.getFile().getAbsolutePath().startsWith(hostDir)) {
if (!tableFile.hasOutput() || !tableFile.getFile().getAbsolutePath().startsWith(hostDir)) {
// creating a hard link is faster but it's not platform-independent
File f = ClickHouseInputStream.save(
Paths.get(hostDir, "chc_".concat(request.getManager().createUniqueId())).toFile(),
Expand Down Expand Up @@ -321,10 +322,13 @@ static Process startProcess(ClickHouseRequest<?> request) {

if (request.hasOutputStream()) {
final ClickHouseOutputStream chOutput = request.getOutputStream().get(); // NOSONAR
final ClickHouseFile outputFile = chOutput.getUnderlyingFile();
final ClickHousePassThruStream customStream = chOutput.getUnderlyingStream();

if (outputFile.isAvailable()) {
File f = outputFile.getFile();
if (customStream.hasOutput()) {
File f = customStream instanceof ClickHouseFile ? ((ClickHouseFile) customStream).getFile() : null;
if (f == null) {
throw new UncheckedIOException(new IOException("Output file not found in " + customStream));
}
if (hostDir.equals(containerDir)) {
builder.redirectOutput(f);
} else if (f.getAbsolutePath().startsWith(hostDir)) {
Expand Down Expand Up @@ -364,15 +368,15 @@ static Process startProcess(ClickHouseRequest<?> request) {
final Process process;
if (in.isPresent()) {
final ClickHouseInputStream chInput = in.get();
final ClickHousePassThruStream customStream = chInput.getUnderlyingStream();
final File inputFile;
if (chInput.getUnderlyingFile().isAvailable()) {
inputFile = chInput.getUnderlyingFile().getFile();
if (customStream.hasInput() && customStream instanceof ClickHouseFile) {
inputFile = ((ClickHouseFile) customStream).getFile();
} else {
CompletableFuture<File> data = ClickHouseClient.submit(() -> {
File tmp = Files.createTempFile("tmp", "data").toFile();
tmp.deleteOnExit();
File tmp = ClickHouseUtils.createTempFile("tmp", "data", true);
try (ClickHouseOutputStream out = ClickHouseOutputStream.of(new FileOutputStream(tmp))) {
request.getInputStream().get().pipe(out);
chInput.pipe(out);
}
return tmp;
});
Expand Down Expand Up @@ -415,7 +419,7 @@ public ClickHouseInputStream getInputStream() throws IOException {
throw new UncheckedIOException(exp);
}
};
if (out != null && !out.getUnderlyingFile().isAvailable()) {
if (out != null && !out.getUnderlyingStream().hasOutput()) {
try (OutputStream o = out) {
ClickHouseInputStream.pipe(process.getInputStream(), o, request.getConfig().getWriteBufferSize());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ boolean isDone() {
* @param config non-null configuration
* @param socket non-null socket
* @return the given socket
* @throws SocketException when there's error setting socket options
*/
public static Socket setSocketOptions(ClickHouseConfig config, Socket socket) throws SocketException {
if (socket == null || socket.isClosed()) {
Expand Down Expand Up @@ -108,6 +109,7 @@ public static Socket setSocketOptions(ClickHouseConfig config, Socket socket) th
* @param config non-null configuration
* @param socket non-null socket channel
* @return the given socket channel
* @throws IOException when there's error setting socket options
*/
public static SocketChannel setSocketOptions(ClickHouseConfig config, SocketChannel socket) throws IOException {
if (socket == null || socket.socket().isClosed()) {
Expand Down Expand Up @@ -230,7 +232,7 @@ protected boolean onRead(ClickHouseConfig config, SocketChannel sc, ClickHouseOu
final long socketTimeout = config.getSocketTimeout();
final long startTime = socketTimeout > 0L ? System.currentTimeMillis() : 0L;

// final ClickHouseFile f = out.getUnderlyingFile();
// final ClickHousePassThruStream s = out.getUnderlyingStream();
ByteBuffer buffer = ByteBuffer.allocate(config.getWriteBufferSize());
byte[] bytes = buffer.array();
int len = 0;
Expand All @@ -251,9 +253,9 @@ protected long onWrite(ClickHouseConfig config, SocketChannel sc, ClickHouseInpu
final long socketTimeout = config.getSocketTimeout();
final long startTime = socketTimeout > 0L ? System.currentTimeMillis() : 0L;

final ClickHouseFile f = in.getUnderlyingFile();
if (f.isAvailable()) {
try (FileChannel fc = FileChannel.open(f.getFile().toPath())) {
final ClickHousePassThruStream s = in.getUnderlyingStream();
if (s.hasInput() && s instanceof ClickHouseFile) {
try (FileChannel fc = FileChannel.open(((ClickHouseFile) s).getFile().toPath())) {
long size = fc.size();
long chunkSize = config.getRequestChunkSize();
long offset = startPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,22 +227,65 @@ static <T> T run(Callable<T> task) {
}
}

/**
* Runs the given task immediately in current thread. Exception will be wrapped
* as {@link CompletionException}.
*
* @param task non-null task to run
* @throws CompletionException when failed to execute the task
*/
static void run(Runnable task) {
try {
task.run();
} catch (CompletionException e) {
throw e;
} catch (Exception e) {
Throwable cause = e instanceof ClickHouseException ? e : e.getCause();
if (cause instanceof CompletionException) {
throw (CompletionException) cause;
} else if (cause == null) {
cause = e;
}
throw new CompletionException(cause);
}
}

/**
* Submits task for execution. Depending on {@link ClickHouseDefaults#ASYNC}, it
* may or may not use {@link #getExecutorService()} to run the task in a
* separate thread.
*
*
* @param <T> return type of the task
* @param task non-null task
* @return non-null future object to get result
* @throws CompletionException when failed to complete the task
* @throws CompletionException when failed to complete the task in synchronous
* mode
*/
static <T> CompletableFuture<T> submit(Callable<T> task) {
return (boolean) ClickHouseDefaults.ASYNC.getEffectiveDefaultValue()
? CompletableFuture.supplyAsync(() -> run(task), getExecutorService())
: CompletableFuture.completedFuture(run(task));
}

/**
* Submits task for execution. Depending on {@link ClickHouseDefaults#ASYNC}, it
* may or may not use {@link #getExecutorService()} to run the task in a
* separate thread.
*
* @param task non-null task
* @return null
* @throws CompletionException when failed to complete the task in synchronous
* mode
*/
static CompletableFuture<Void> submit(Runnable task) {
if ((boolean) ClickHouseDefaults.ASYNC.getEffectiveDefaultValue()) {
return CompletableFuture.runAsync(() -> run(task), getExecutorService());
}

run(task);
return CompletableFuture.completedFuture(null);
}

/**
* Dumps a table or query result from server into a file. File will be
* created/overwrited as needed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ public void close() {
}

String prefix = "ClickHouseClientWorker";
defaultExecutor = ClickHouseUtils.newThreadPool(prefix, maxThreads, maxThreads * 2 + 1, maxRequests,
defaultExecutor = ClickHouseUtils.newThreadPool(prefix, maxThreads, maxThreads * 3 + 1, maxRequests,
keepAliveTimeoutMs, false);
prefix = "ClickHouseClientScheduler";
defaultScheduler = maxSchedulers == 1 ? Executors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.Serializable;
import java.nio.file.Path;

import com.clickhouse.client.config.ClickHouseClientOption;
Expand All @@ -13,24 +12,24 @@
* Wrapper of {@link java.io.File} with additional information like compression
* and format.
*/
public class ClickHouseFile implements Serializable {
public class ClickHouseFile extends ClickHousePassThruStream {
private static final long serialVersionUID = -2641191818870839568L;

/**
* Null file which has no compression and format.
*/
public static final ClickHouseFile NULL = new ClickHouseFile(null, ClickHouseCompression.NONE, 0, null);
public static final ClickHouseFile NULL = new ClickHouseFile(null, ClickHouseCompression.NONE, -1, null);

public static ClickHouseFile of(File file) {
return of(file, null, 0, null);
}

public static ClickHouseFile of(Path path) {
return of(ClickHouseChecker.nonNull(path, "Path").toFile(), null, 0, null);
return of(ClickHouseChecker.nonNull(path, "Path").toFile(), null, -1, null);
}

public static ClickHouseFile of(String file) {
return of(new File(ClickHouseChecker.nonEmpty(file, "File")), null, 0, null);
return of(new File(ClickHouseChecker.nonEmpty(file, "File")), null, -1, null);
}

public static ClickHouseFile of(String file, ClickHouseCompression compression, int compressionLevel,
Expand All @@ -42,122 +41,72 @@ public static ClickHouseFile of(File file, ClickHouseCompression compression, in
ClickHouseFormat format) {
return new ClickHouseFile(ClickHouseChecker.nonNull(file, "File"),
compression != null ? compression : ClickHouseCompression.fromFileName(file.getName()),
compressionLevel < 1 ? 0 : compressionLevel,
format != null ? format : ClickHouseFormat.fromFileName(file.getName()));
compressionLevel, format != null ? format : ClickHouseFormat.fromFileName(file.getName()));
}

private final File file;
private final ClickHouseCompression compress;
private final int compressLevel;
private final ClickHouseFormat format;

protected ClickHouseFile(File file, ClickHouseCompression compress, int compressLevel, ClickHouseFormat format) {
super(null, null, compress, compressLevel, format);

this.file = file;
this.compress = compress;
this.compressLevel = compressLevel;
this.format = format;
}

/**
* Creates an input stream for reading the file.
*
* @return non-null input stream for reading the file
*/
@Override
public ClickHouseInputStream asInputStream() {
if (!isAvailable()) {
return asInputStream((int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), null);
}

@Override
public ClickHouseInputStream asInputStream(int bufferSize, Runnable postCloseAction) {
if (!hasInput()) {
return ClickHouseInputStream.empty();
}

try {
return ClickHouseInputStream.wrap(this, new FileInputStream(getFile()),
(int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), null,
return ClickHouseInputStream.wrap(this, new FileInputStream(getFile()), bufferSize, postCloseAction,
ClickHouseCompression.NONE, getCompressionLevel());
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Creates an output stream for writing data into the file.
*
* @return non-null input stream for writing data into the file
*/
@Override
public ClickHouseOutputStream asOutputStream() {
if (!isAvailable()) {
return asOutputStream((int) ClickHouseClientOption.WRITE_BUFFER_SIZE.getDefaultValue(), null);
}

@Override
public ClickHouseOutputStream asOutputStream(int bufferSize, Runnable postCloseAction) {
if (!hasOutput()) {
return ClickHouseOutputStream.empty();
}

try {
return ClickHouseOutputStream.wrap(this, new FileOutputStream(getFile()),
(int) ClickHouseClientOption.WRITE_BUFFER_SIZE.getDefaultValue(), null,
ClickHouseCompression.NONE, getCompressionLevel());
bufferSize, postCloseAction, ClickHouseCompression.NONE, getCompressionLevel());
} catch (FileNotFoundException e) {
throw new IllegalArgumentException(e);
}
}

/**
* Gets file, which only works when {@link #isAvailable()} returns {@code true}.
* Gets file. Use {@code #hasInput()} or {@code #hasOutput()} to check file
* availability first.
*
* @return non-null file, except {@code null} for {@link #NULL}
* @return file, could be null
*/
public File getFile() {
return file;
}

/**
* Gets file format, which could be null. Use {@link #hasFormat()} to check
* first.
*
* @return file format, could be null
*/
public ClickHouseFormat getFormat() {
return format;
}

/**
* Gets compression algorithm.
*
* @return non-null compression algorithm
*/
public ClickHouseCompression getCompressionAlgorithm() {
return compress;
}

/**
* Gets compression level.
*
* @return compression level, which in general should be greater than or equal
* to zero
*/
public int getCompressionLevel() {
return compressLevel;
}

/**
* Checks if the file format is defined or not.
*
* @return true if the file format is defined; false otherwise
*/
public boolean hasFormat() {
return format != null;
}

/**
* Checks if the file is available or not.
*
* @return true if the file is available; false otherwise
*/
public boolean isAvailable() {
@Override
public boolean hasInput() {
return file != null && file.exists();
}

/**
* Checks if the file is compressed or not.
*
* @return true if the file is compressed; false otherwise
*/
public boolean isCompressed() {
return compress != ClickHouseCompression.NONE;
@Override
public boolean hasOutput() {
return file != null;
}
}
Loading