diff --git a/clickhouse-client/pom.xml b/clickhouse-client/pom.xml index a4da78571..b710da623 100644 --- a/clickhouse-client/pom.xml +++ b/clickhouse-client/pom.xml @@ -1,4 +1,6 @@ - + 4.0.0 @@ -18,7 +20,7 @@ ${project.parent.groupId} org.roaringbitmap - provided + true * @@ -27,50 +29,70 @@ + + com.aayushatharva.brotli4j + brotli4j + true + com.github.ben-manes.caffeine caffeine - provided + true + + + com.github.luben + zstd-jni + true com.google.code.gson gson - provided + true dnsjava dnsjava - provided + true org.apache.avro avro - provided + true + + + org.brotli + dec + true org.jctools jctools-core - provided + true org.lz4 lz4-java - provided + true org.msgpack msgpack-core - provided + true org.slf4j slf4j-api - provided + true + + + org.tukaani + xz + true org.xerial.snappy snappy-java - provided + true diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java index 5a78599d7..b6e72ebeb 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseClient.java @@ -75,11 +75,11 @@ static ClickHouseOutputStream getRequestOutputStream(ClickHouseConfig config, Ou Runnable postCloseAction) { if (config == null) { return ClickHouseOutputStream.of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), - ClickHouseCompression.NONE, postCloseAction); + ClickHouseCompression.NONE, -1, postCloseAction); } return ClickHouseOutputStream.of(output, config.getWriteBufferSize(), config.getRequestCompressAlgorithm(), - postCloseAction); + config.getRequestCompressLevel(), postCloseAction); } /** diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseCompression.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseCompression.java index e3e59df98..f825c8a74 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseCompression.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseCompression.java @@ -11,9 +11,9 @@ public enum ClickHouseCompression { BZ2("application/x-bzip2", "bz2", "bz2"), DEFLATE("application/deflate", "deflate", "zz"), GZIP("application/gzip", "gzip", "gz"), - LZMA("application/x-lzma", "lzma", "xz"), LZ4("application/x-lz4", "lz4", "lz4"), - ZIP("application/zip", "zip", "zip"), + SNAPPY("application/x-snappy", "snappy", "sz"), + XZ("application/x-xz", "xz", "xz"), ZSTD("application/zstd", "zstd", "zst"); private String mimeType; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java index 1714f7eaa..106347526 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseConfig.java @@ -378,12 +378,12 @@ public ClickHouseCompression getResponseCompressAlgorithm() { /** * Gets input compress level. When {@link #isResponseCompressed()} is - * {@code false}, this will return {@code 0}. + * {@code false}, this will return {@code -1}. * * @return compress level */ public int getResponseCompressLevel() { - return decompressResponse ? decompressLevel : 0; + return decompressResponse ? decompressLevel : -1; } /** @@ -407,12 +407,12 @@ public ClickHouseCompression getRequestCompressAlgorithm() { /** * Gets input compress level. When {@link #isRequestCompressed()} is - * {@code false}, this will return {@code 0}. + * {@code false}, this will return {@code -1}. * * @return compress level */ public int getRequestCompressLevel() { - return compressRequest ? compressLevel : 0; + return compressRequest ? compressLevel : -1; } public int getConnectionTimeout() { diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java index fa75c13b8..84b808f91 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseFile.java @@ -127,7 +127,8 @@ public ClickHouseCompression getCompressionAlgorithm() { /** * Gets compression level. * - * @return compression level, which is always greater than or equal to zero + * @return compression level, which in general should be greater than or equal + * to zero */ public int getCompressionLevel() { return compressLevel; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java index 68d7e7a2d..9c8acfe40 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseInputStream.java @@ -24,9 +24,11 @@ import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.zip.GZIPInputStream; +import java.util.zip.InflaterInputStream; import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.stream.BlockingInputStream; +import com.clickhouse.client.stream.CompressionUtils; import com.clickhouse.client.stream.DeferredInputStream; import com.clickhouse.client.stream.EmptyInputStream; import com.clickhouse.client.stream.Lz4InputStream; @@ -71,18 +73,42 @@ public static ClickHouseInputStream wrap(ClickHouseFile file, InputStream input, : new WrappedInputStream(file, input, bufferSize, postCloseAction); } else { switch (compression) { + case BROTLI: + chInput = new WrappedInputStream(file, CompressionUtils.createBrotliInputStream(input, bufferSize), + bufferSize, postCloseAction); + break; + case BZ2: + chInput = new WrappedInputStream(file, CompressionUtils.createBz2InputStream(input), bufferSize, + postCloseAction); + break; + case DEFLATE: + chInput = new WrappedInputStream(file, new InflaterInputStream(input), bufferSize, postCloseAction); + break; case GZIP: try { chInput = new WrappedInputStream(file, new GZIPInputStream(input), bufferSize, postCloseAction); } catch (IOException e) { - throw new IllegalArgumentException("Failed to wrap input stream", e); + throw new IllegalArgumentException(CompressionUtils.ERROR_FAILED_TO_WRAP_INPUT, e); } break; case LZ4: chInput = new Lz4InputStream(file, input, postCloseAction); break; + case SNAPPY: + // https://github.com/ClickHouse/ClickHouse/issues/44885 + chInput = new WrappedInputStream(file, CompressionUtils.createSnappyInputStream(input), + bufferSize, postCloseAction); + break; + case ZSTD: + chInput = new WrappedInputStream(file, CompressionUtils.createZstdInputStream(input), bufferSize, + postCloseAction); + break; + case XZ: + chInput = new WrappedInputStream(file, CompressionUtils.createXzInputStream(input), bufferSize, + postCloseAction); + break; default: - throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression); + throw new UnsupportedOperationException("Unsupported decompression algorithm: " + compression); } } return chInput; diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java index e999fd73d..736b0f893 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseOutputStream.java @@ -7,9 +7,9 @@ import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.zip.GZIPOutputStream; import com.clickhouse.client.config.ClickHouseClientOption; +import com.clickhouse.client.stream.CompressionUtils; import com.clickhouse.client.stream.EmptyOutputStream; import com.clickhouse.client.stream.Lz4OutputStream; import com.clickhouse.client.stream.WrappedOutputStream; @@ -42,18 +42,54 @@ static ClickHouseOutputStream wrap(ClickHouseFile file, OutputStream output, int if (compression == null || compression == ClickHouseCompression.NONE) { chOutput = new WrappedOutputStream(file, output, bufferSize, postCloseAction); } else { + // never got brotli, bz2, deflate, gzip, and xz working :< switch (compression) { - case GZIP: - try { - chOutput = new WrappedOutputStream(file, new GZIPOutputStream(output), bufferSize, - postCloseAction); - } catch (IOException e) { - throw new IllegalArgumentException("Failed to wrap input stream", e); - } - break; + // case BROTLI: + // chOutput = new WrappedOutputStream(file, + // CompressionUtils.createBrotliOutputStream(output, compressionLevel, + // bufferSize), bufferSize, postCloseAction); + // break; + // case BZ2: + // chOutput = new WrappedOutputStream(file, + // CompressionUtils.createBz2OutputStream(output, compressionLevel), bufferSize, + // postCloseAction); + // break; + // case DEFLATE: + // chOutput = new WrappedOutputStream(file, new InflaterOutputStream(output), + // bufferSize, + // postCloseAction); + // break; + // case GZIP: + // try { + // GzipParameters params = new GzipParameters(); + // params.setBufferSize(bufferSize); + // params.setCompressionLevel(3); + // chOutput = new WrappedOutputStream(file, new + // GzipCompressorOutputStream(output, params), + // bufferSize, postCloseAction); + // } catch (IOException e) { + // throw new + // IllegalArgumentException(CompressionUtils.ERROR_FAILED_TO_WRAP_OUTPUT, e); + // } + // break; case LZ4: - chOutput = new Lz4OutputStream(file, output, bufferSize, postCloseAction); + chOutput = new Lz4OutputStream(file, output, compressionLevel, bufferSize, postCloseAction); + break; + case SNAPPY: + chOutput = new WrappedOutputStream(file, + CompressionUtils.createSnappyOutputStream(output, compressionLevel), bufferSize, + postCloseAction); + break; + case ZSTD: + chOutput = new WrappedOutputStream(file, + CompressionUtils.createZstdOutputStream(output, compressionLevel), bufferSize, + postCloseAction); break; + // case XZ: + // chOutput = new WrappedOutputStream(file, + // CompressionUtils.createXzOutputStream(output, 6), bufferSize, + // postCloseAction); + // break; default: throw new UnsupportedOperationException("Unsupported compression algorithm: " + compression); } @@ -101,7 +137,7 @@ public static ClickHouseOutputStream of(ClickHouseFile file, int bufferSize, Run * {@link ClickHouseOutputStream} */ public static ClickHouseOutputStream of(OutputStream output) { - return of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), null, null); + return of(output, (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), null, -1, null); } /** @@ -114,7 +150,7 @@ public static ClickHouseOutputStream of(OutputStream output) { * {@link ClickHouseOutputStream} */ public static ClickHouseOutputStream of(OutputStream output, int bufferSize) { - return of(output, bufferSize, null, null); + return of(output, bufferSize, null, -1, null); } /** @@ -126,13 +162,14 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize) { * @param compression compression algorithm, null or * {@link ClickHouseCompression#NONE} means no * compression + * @param level compression level * @param postCloseAction custom action will be performed right after closing * the output stream * @return wrapped output, or the same output if it's instance of * {@link ClickHouseOutputStream} */ public static ClickHouseOutputStream of(OutputStream output, int bufferSize, ClickHouseCompression compression, - Runnable postCloseAction) { + int level, Runnable postCloseAction) { final ClickHouseOutputStream chOutput; if (output == null) { chOutput = EmptyOutputStream.INSTANCE; @@ -141,7 +178,7 @@ public static ClickHouseOutputStream of(OutputStream output, int bufferSize, Cli ? (ClickHouseOutputStream) output : new WrappedOutputStream(null, output, bufferSize, postCloseAction); } else { - chOutput = wrap(null, output, bufferSize, postCloseAction, compression, 0); + chOutput = wrap(null, output, bufferSize, postCloseAction, compression, level); } return chOutput; } diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java index 5e1aedb25..45470e5c8 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/ClickHouseRequest.java @@ -819,12 +819,6 @@ public SelfT compressServerResponse(boolean enable, ClickHouseCompression compre : ClickHouseCompression.NONE; } - if (compressLevel < 0) { - compressLevel = 0; - } else if (compressLevel > 9) { - compressLevel = 9; - } - return option(ClickHouseClientOption.COMPRESS, enable) .option(ClickHouseClientOption.COMPRESS_ALGORITHM, compressAlgorithm) .option(ClickHouseClientOption.COMPRESS_LEVEL, compressLevel); @@ -894,12 +888,6 @@ public SelfT decompressClientRequest(boolean enable, ClickHouseCompression compr : ClickHouseCompression.NONE; } - if (compressLevel < 0) { - compressLevel = 0; - } else if (compressLevel > 9) { - compressLevel = 9; - } - return option(ClickHouseClientOption.DECOMPRESS, enable) .option(ClickHouseClientOption.DECOMPRESS_ALGORITHM, compressAlgorithm) .option(ClickHouseClientOption.DECOMPRESS_LEVEL, compressLevel); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java index a54fd2126..57cb64654 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/config/ClickHouseClientOption.java @@ -140,11 +140,11 @@ public enum ClickHouseClientOption implements ClickHouseOption { /** * Compression level for compressing server response. */ - COMPRESS_LEVEL("compress_level", 3, "Compression level for response, from 0 to 9(low to high)"), + COMPRESS_LEVEL("compress_level", -1, "Compression level for response, -1 standards for default"), /** * Compression level for decompress client request. */ - DECOMPRESS_LEVEL("decompress_level", 3, "Compression level for request, from 0 to 9(low to high)"), + DECOMPRESS_LEVEL("decompress_level", -1, "Compression level for request, -1 standards for default"), /** * Connection timeout in milliseconds. diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java index 4b2a2fabe..648750c2b 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/data/ClickHouseExternalTable.java @@ -18,7 +18,9 @@ import com.clickhouse.client.ClickHouseDeferredValue; import com.clickhouse.client.ClickHouseFile; import com.clickhouse.client.ClickHouseFormat; +import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseUtils; +import com.clickhouse.client.config.ClickHouseClientOption; /** * "Attached" temporary table. @@ -182,7 +184,6 @@ protected ClickHouseExternalTable(String name, ClickHouseFile file, ClickHouseDe boolean asTempTable) { this.name = name == null ? "" : name.trim(); this.file = file != null ? file : ClickHouseFile.NULL; - this.content = ClickHouseChecker.nonNull(content, "content"); if (compression == null) { compression = ClickHouseCompression.fromFileName(this.name); this.compression = Optional.ofNullable(compression == ClickHouseCompression.NONE ? null : compression); @@ -191,6 +192,18 @@ protected ClickHouseExternalTable(String name, ClickHouseFile file, ClickHouseDe } this.format = format == null ? ClickHouseFormat.TabSeparated : format; + if (content == null) { + throw new IllegalArgumentException("Non-null content is required"); + } + this.content = compression == ClickHouseCompression.NONE ? content + // unfortunately ClickHouse does not support compressed external data + : ClickHouseDeferredValue + .of(() -> ClickHouseInputStream.of(content.get(), ClickHouseUtils.getBufferSize( + (int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), + (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), + (int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue()), this.compression.get(), + null)); + int size = columns == null ? 0 : columns.size(); if (size == 0) { this.columns = Collections.emptyList(); diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/CompressionUtils.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/CompressionUtils.java new file mode 100644 index 000000000..82b511be4 --- /dev/null +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/CompressionUtils.java @@ -0,0 +1,187 @@ +package com.clickhouse.client.stream; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import com.clickhouse.client.ClickHouseCompression; +import com.clickhouse.client.ClickHouseUtils; + +public final class CompressionUtils { + public static final String ERROR_FAILED_TO_WRAP_INPUT = "Failed to wrap input stream"; + public static final String ERROR_FAILED_TO_WRAP_OUTPUT = "Failed to wrap output stream"; + public static final String ERROR_UNSUPPORTED_COMPRESS_ALG = "Compression algorithm [%s] is not supported due to %s"; + public static final String ERROR_UNSUPPORTED_DECOMPRESS_ALG = "Decompression algorithm [%s] is not supported due to %s"; + + private static class Brotli4jUtils { + static { + com.aayushatharva.brotli4j.Brotli4jLoader.ensureAvailability(); + } + + static InputStream createInputStream(InputStream input, int bufferSize) throws IOException { + return new com.aayushatharva.brotli4j.decoder.BrotliInputStream(input, bufferSize); + } + + static OutputStream createOutputStream(OutputStream output, int quality, int bufferSize) + throws IOException { + com.aayushatharva.brotli4j.encoder.Encoder.Parameters params = new com.aayushatharva.brotli4j.encoder.Encoder.Parameters() + .setQuality(quality); + return new com.aayushatharva.brotli4j.encoder.BrotliOutputStream(output, params, bufferSize); + } + + private Brotli4jUtils() { + } + } + + public static InputStream createBrotliInputStream(InputStream input, int bufferSize) { + try { + // Brotli4jUtils.createInputStream(input, bufferSize) + return new org.brotli.dec.BrotliInputStream(input, bufferSize); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_INPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_DECOMPRESS_ALG, ClickHouseCompression.BROTLI, + e.getMessage())); + } + } + + public static OutputStream createBrotliOutputStream(OutputStream output, int quality, int bufferSize) { + try { + if (quality < -1 || quality > 11) { + quality = 4; + } + return Brotli4jUtils.createOutputStream(output, quality, bufferSize); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_OUTPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_COMPRESS_ALG, ClickHouseCompression.BROTLI, + e.getMessage())); + } + } + + public static InputStream createBz2InputStream(InputStream input) { + try { + return new org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream(input); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_INPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_DECOMPRESS_ALG, ClickHouseCompression.BZ2, + e.getMessage())); + } + } + + public static OutputStream createBz2OutputStream(OutputStream output, int blockSize) { + try { + if (blockSize < org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream.MIN_BLOCKSIZE + || blockSize > org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream.MAX_BLOCKSIZE) { + blockSize = org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream.MAX_BLOCKSIZE; + } + return new org.apache.commons.compress.compressors.bzip2.BZip2CompressorOutputStream(output, blockSize); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_OUTPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_COMPRESS_ALG, ClickHouseCompression.BROTLI, + e.getMessage())); + } + } + + public static InputStream createSnappyInputStream(InputStream input) { + try { + return new org.xerial.snappy.SnappyInputStream(input); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_INPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_DECOMPRESS_ALG, ClickHouseCompression.SNAPPY, + e.getMessage())); + } + } + + public static OutputStream createSnappyOutputStream(OutputStream output, int blockSize) { + try { + if (blockSize < 1024) { // MIN_BLOCK_SIZE + blockSize = 32 * 1024; // DEFAULT_BLOCK_SIZE + } + return new org.xerial.snappy.SnappyOutputStream(output, blockSize); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_COMPRESS_ALG, ClickHouseCompression.SNAPPY, + e.getMessage())); + } + } + + public static InputStream createZstdInputStream(InputStream input) { + try { + return new com.github.luben.zstd.ZstdInputStream(input).setContinuous(true); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_INPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_DECOMPRESS_ALG, ClickHouseCompression.ZSTD, + e.getMessage())); + } + } + + /** + * Creates an output stream for {@link ClickHouseCompression#ZSTD} compression. + * + * @param output non-null output stream + * @param level compression level, any number outside of [0, 22] will be + * treated as default + * @return output stream for compression + */ + public static OutputStream createZstdOutputStream(OutputStream output, int level) { + try { + if (level < 0 || level > 22) { + level = com.github.luben.zstd.Zstd.defaultCompressionLevel(); + } + return new com.github.luben.zstd.ZstdOutputStream(output, level); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_OUTPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_COMPRESS_ALG, ClickHouseCompression.ZSTD, e.getMessage())); + } + } + + public static InputStream createXzInputStream(InputStream input) { + try { + return new org.tukaani.xz.XZInputStream(input); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_INPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_DECOMPRESS_ALG, ClickHouseCompression.XZ, e.getMessage())); + } + } + + /** + * Creates an output stream for {@link ClickHouseCompression#XZ} compression. + * + * @param output non-null output stream + * @param preset compression preset level, any number outside of [0,9] will be + * treated as 6, which is the default + * @return output stream for compression + */ + public static OutputStream createXzOutputStream(OutputStream output, int preset) { + try { + if (preset < org.tukaani.xz.LZMA2Options.PRESET_MIN || preset > org.tukaani.xz.LZMA2Options.PRESET_MAX) { + preset = org.tukaani.xz.LZMA2Options.PRESET_DEFAULT; + } + return new org.tukaani.xz.XZOutputStream(output, new org.tukaani.xz.LZMA2Options(preset), + org.tukaani.xz.XZ.CHECK_CRC64); + } catch (IOException e) { + throw new IllegalArgumentException(ERROR_FAILED_TO_WRAP_OUTPUT, e); + } catch (NoClassDefFoundError e) { + throw new UnsupportedOperationException( + ClickHouseUtils.format(ERROR_UNSUPPORTED_COMPRESS_ALG, ClickHouseCompression.XZ, e.getMessage())); + } + } + + private CompressionUtils() { + } +} diff --git a/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java b/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java index b96ae6fe7..e649da957 100644 --- a/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java +++ b/clickhouse-client/src/main/java/com/clickhouse/client/stream/Lz4OutputStream.java @@ -51,16 +51,24 @@ protected void flushBuffer(byte[] bytes, int offset, int length) throws IOExcept } public Lz4OutputStream(OutputStream stream, int maxCompressBlockSize, Runnable postCloseAction) { - this(null, stream, maxCompressBlockSize, postCloseAction); + this(null, stream, -1, maxCompressBlockSize, postCloseAction); } - public Lz4OutputStream(ClickHouseFile file, OutputStream stream, int maxCompressBlockSize, + public Lz4OutputStream(OutputStream stream, int compressLevel, int maxCompressBlockSize, Runnable postCloseAction) { + this(null, stream, compressLevel, maxCompressBlockSize, postCloseAction); + } + + public Lz4OutputStream(ClickHouseFile file, OutputStream stream, int compressLevel, int maxCompressBlockSize, Runnable postCloseAction) { super(file, maxCompressBlockSize, postCloseAction); output = ClickHouseChecker.nonNull(stream, "OutputStream"); - compressor = factory.fastCompressor(); + if (compressLevel < 0) { + compressor = factory.fastCompressor(); + } else { + compressor = factory.highCompressor(compressLevel); + } // reserve the first 9 bytes for calculating checksum compressedBlock = new byte[compressor.maxCompressedLength(maxCompressBlockSize) + 15]; } diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index bb1f345c1..249ec1393 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -89,6 +89,14 @@ protected void checkRowCount(ClickHouseRequest request, String queryOrTableNa } } + protected boolean checkServerVersion(ClickHouseClient client, ClickHouseNode server, String range) + throws ClickHouseException { + try (ClickHouseResponse response = newRequest(client, server) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes).query("select version()").executeAndWait()) { + return ClickHouseVersion.of(response.firstRecord().getValue(0).asString()).check(range); + } + } + protected List sendAndWait(ClickHouseNode server, String sql, String... more) throws ClickHouseException { try { diff --git a/clickhouse-grpc-client/pom.xml b/clickhouse-grpc-client/pom.xml index 66279ffb2..0813e2c3e 100644 --- a/clickhouse-grpc-client/pom.xml +++ b/clickhouse-grpc-client/pom.xml @@ -46,7 +46,7 @@ io.grpc grpc-protobuf - provided + true diff --git a/clickhouse-http-client/pom.xml b/clickhouse-http-client/pom.xml index 20758bf88..528887e91 100644 --- a/clickhouse-http-client/pom.xml +++ b/clickhouse-http-client/pom.xml @@ -1,4 +1,6 @@ - + 4.0.0 @@ -20,18 +22,60 @@ clickhouse-client ${revision} + + + org.brotli + dec + true + + + com.github.luben + zstd-jni + true + com.google.code.gson gson - provided + true - org.lz4 - lz4-java + org.apache.commons + commons-compress + true org.apache.httpcomponents.client5 httpclient5 + true + + + org.lz4 + lz4-java + + + org.tukaani + xz + true + + + org.xerial.snappy + snappy-java + true diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java index 57323711e..57ad19e3c 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ApacheHttpConnectionImpl.java @@ -22,7 +22,7 @@ import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse; import org.apache.hc.client5.http.impl.classic.HttpClientBuilder; -import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager; +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager; import org.apache.hc.client5.http.socket.ConnectionSocketFactory; import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory; import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; @@ -45,18 +45,15 @@ import java.io.OutputStream; import java.io.UncheckedIOException; import java.net.ConnectException; +import java.net.HttpURLConnection; import java.net.Socket; -import java.net.URISyntaxException; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.TimeZone; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLException; @@ -65,33 +62,12 @@ * Created by wujianchao on 2022/12/1. */ public class ApacheHttpConnectionImpl extends ClickHouseHttpConnection { - private static final Logger log = LoggerFactory.getLogger(ApacheHttpConnectionImpl.class); - private static final byte[] HEADER_CONTENT_DISPOSITION = "content-disposition: form-data; name=\"" - .getBytes(StandardCharsets.US_ASCII); - private static final byte[] HEADER_OCTET_STREAM = "content-type: application/octet-stream\r\n" - .getBytes(StandardCharsets.US_ASCII); - private static final byte[] HEADER_BINARY_ENCODING = "content-transfer-encoding: binary\r\n\r\n" - .getBytes(StandardCharsets.US_ASCII); - - private static final byte[] ERROR_MSG_PREFIX = "Code: ".getBytes(StandardCharsets.US_ASCII); - - private static final byte[] DOUBLE_DASH = new byte[] { '-', '-' }; - private static final byte[] END_OF_NAME = new byte[] { '"', '\r', '\n' }; - private static final byte[] LINE_PREFIX = new byte[] { '\r', '\n', '-', '-' }; - private static final byte[] LINE_SUFFIX = new byte[] { '\r', '\n' }; - - private static final byte[] SUFFIX_QUERY = "query\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_FORMAT = "_format\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_STRUCTURE = "_structure\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_FILENAME = "\"; filename=\"".getBytes(StandardCharsets.US_ASCII); - private final CloseableHttpClient client; - private final AtomicBoolean isBusy = new AtomicBoolean(false); protected ApacheHttpConnectionImpl(ClickHouseNode server, ClickHouseRequest request, ExecutorService executor) - throws IOException, URISyntaxException { + throws IOException { super(server, request); client = newConnection(); } @@ -176,17 +152,19 @@ private void setHeaders(HttpRequest request, Map headers) { } private void checkResponse(CloseableHttpResponse response) throws IOException { - if (response.getEntity() == null) { - throw new ConnectException( - ClickHouseUtils.format("HTTP response %d, %s", response.getCode(), response.getReasonPhrase())); - } - - if (response.getCode() == 200) { + if (response.getCode() == HttpURLConnection.HTTP_OK) { return; } - Header errorCode = response.getFirstHeader("X-ClickHouse-Exception-Code"); - Header serverName = response.getFirstHeader("X-ClickHouse-Server-Display-Name"); + final Header errorCode = response.getFirstHeader("X-ClickHouse-Exception-Code"); + final Header serverName = response.getFirstHeader("X-ClickHouse-Server-Display-Name"); + if (response.getEntity() == null) { + throw new ConnectException( + ClickHouseUtils.format("HTTP response %d %s(code %s returned from server %s)", + response.getCode(), response.getReasonPhrase(), + errorCode == null ? null : errorCode.getValue(), + serverName == null ? null : serverName.getValue())); + } String errorMsg; @@ -203,123 +181,42 @@ private void checkResponse(CloseableHttpResponse response) throws IOException { builder.append(errorMsg).append('\n'); } errorMsg = builder.toString(); - } catch (IOException e) { - log.debug("Failed to read error message[code=%s] from server [%s] due to: %s", - errorCode.getValue(), - serverName.getValue(), - e.getMessage()); - int index = ClickHouseUtils.indexOf(bytes, ERROR_MSG_PREFIX); - errorMsg = index > 0 ? new String(bytes, index, bytes.length - index, StandardCharsets.UTF_8) - : new String(bytes, StandardCharsets.UTF_8); + errorMsg = parseErrorFromException(errorCode != null ? errorCode.getValue() : null, + serverName != null ? serverName.getValue() : null, e, bytes); } throw new IOException(errorMsg); } @Override protected boolean isReusable() { - return !isBusy.get(); + return true; } protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List tables, - String url, Map headers, ClickHouseConfig config, - Runnable postCloseAction) throws IOException { - // Connection is reusable, ensure that only one request is on fly. - if (!isBusy.compareAndSet(false, true)) - throw new IOException("Connection is busy"); - + String url, Map headers, ClickHouseConfig config, Runnable postCloseAction) + throws IOException { HttpPost post = new HttpPost(url == null ? this.url : url); setHeaders(post, headers); byte[] boundary = null; String contentType = "text/plain; charset=UTF-8"; - if (tables != null && !tables.isEmpty()) { String uuid = rm.createUniqueId(); contentType = "multipart/form-data; boundary=".concat(uuid); boundary = uuid.getBytes(StandardCharsets.US_ASCII); } - post.setHeader("Content-Type", contentType); - final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable(); - final boolean hasInput = data != null || boundary != null; - - Charset ascii = StandardCharsets.US_ASCII; - Charset utf8 = StandardCharsets.UTF_8; - - List inputParts = new ArrayList<>(); - - byte[] sqlBytes = hasFile ? new byte[0] : sql.getBytes(utf8); - if (boundary != null) { - // head - inputParts.add(ClickHouseInputStream.of(LINE_PREFIX, boundary, LINE_SUFFIX, HEADER_CONTENT_DISPOSITION, - SUFFIX_QUERY, sqlBytes)); - - for (ClickHouseExternalTable t : tables) { - byte[] tableName = t.getName().getBytes(utf8); - // table head - List tableHead = new ArrayList<>(); - for (int i = 0; i < 3; i++) { - tableHead.add(LINE_PREFIX); - tableHead.add(boundary); - tableHead.add(LINE_SUFFIX); - tableHead.add(HEADER_CONTENT_DISPOSITION); - tableHead.add(tableName); - if (i == 0) { - tableHead.add(SUFFIX_FORMAT); - tableHead.add(t.getFormat().name().getBytes(ascii)); - } else if (i == 1) { - tableHead.add(SUFFIX_STRUCTURE); - tableHead.add(t.getStructure().getBytes(utf8)); - } else { - tableHead.add(SUFFIX_FILENAME); - tableHead.add(tableName); - tableHead.add(END_OF_NAME); - break; - } - } - tableHead.add(HEADER_OCTET_STREAM); - tableHead.add(HEADER_BINARY_ENCODING); - inputParts.add(ClickHouseInputStream.of(tableHead, byte[].class, null, null)); - - // table content - inputParts.add(t.getContent()); - } - // tail - inputParts.add(ClickHouseInputStream.of(LINE_PREFIX, boundary, DOUBLE_DASH, LINE_SUFFIX)); - } else { - List content = new ArrayList<>(2); - content.add(sqlBytes); - if (data != null && data.available() > 0) { - // append \n - if (sqlBytes.length > 0 && sqlBytes[sqlBytes.length - 1] != (byte) '\n') { - content.add(new byte[] { '\n' }); - } - inputParts.add(ClickHouseInputStream.of(content, byte[].class, null, null)); - inputParts.add(data); - } else { - inputParts.add(ClickHouseInputStream.of(content, byte[].class, null, null)); - } - } - - ClickHouseInputStream input = ClickHouseInputStream.of(inputParts, InputStream.class, null, null); - String contentEncoding = headers == null ? null : headers.getOrDefault("content-encoding", null); - ClickHouseHttpEntity postBody = new ClickHouseHttpEntity(input, config, contentType, contentEncoding, hasFile, - hasInput); - + ClickHouseHttpEntity postBody = new ClickHouseHttpEntity(config, contentType, contentEncoding, boundary, sql, + data, tables); post.setEntity(postBody); CloseableHttpResponse response = client.execute(post); checkResponse(response); // buildResponse should use the config of current request in case of reusable // connection. - return buildResponse(response, config, () -> { - isBusy.compareAndSet(true, false); - if (postCloseAction != null) { - postCloseAction.run(); - } - }); + return buildResponse(response, config, postCloseAction); } @Override @@ -372,7 +269,7 @@ private SSLSocketFactory(ClickHouseConfig config) throws SSLException { .orElse(SSLContexts.createDefault())), config.getSslMode() == ClickHouseSslMode.STRICT ? HttpsURLConnection.getDefaultHostnameVerifier() - : (hostname, session) -> true); + : (hostname, session) -> true); // NOSONAR this.config = config; } @@ -386,21 +283,41 @@ public static SSLSocketFactory create(ClickHouseConfig config) throws SSLExcepti } } - static class HttpConnectionManager extends BasicHttpClientConnectionManager { - public HttpConnectionManager(Registry socketFactory, ClickHouseConfig config) - throws SSLException { + static class HttpConnectionManager extends PoolingHttpClientConnectionManager { + public HttpConnectionManager(Registry socketFactory, ClickHouseConfig config) { super(socketFactory); ConnectionConfig connConfig = ConnectionConfig.custom() .setConnectTimeout(Timeout.of(config.getConnectionTimeout(), TimeUnit.MILLISECONDS)) .build(); - setConnectionConfig(connConfig); - SocketConfig socketConfig = SocketConfig.custom() + setDefaultConnectionConfig(connConfig); + + SocketConfig.Builder builder = SocketConfig.custom() .setSoTimeout(Timeout.of(config.getSocketTimeout(), TimeUnit.MILLISECONDS)) .setRcvBufSize(config.getReadBufferSize()) - .setSndBufSize(config.getWriteBufferSize()) - .build(); - setSocketConfig(socketConfig); + .setSndBufSize(config.getWriteBufferSize()); + if (config.hasOption(ClickHouseClientOption.SOCKET_KEEPALIVE)) { + builder.setSoKeepAlive(config.getBoolOption(ClickHouseClientOption.SOCKET_KEEPALIVE)); + } + if (config.hasOption(ClickHouseClientOption.SOCKET_LINGER)) { + int solinger = config.getIntOption(ClickHouseClientOption.SOCKET_LINGER); + builder.setSoLinger(solinger, TimeUnit.SECONDS); + } + if (config.hasOption(ClickHouseClientOption.SOCKET_REUSEADDR)) { + builder.setSoReuseAddress(config.getBoolOption(ClickHouseClientOption.SOCKET_REUSEADDR)); + } + if (config.hasOption(ClickHouseClientOption.SOCKET_RCVBUF)) { + int bufferSize = config.getIntOption(ClickHouseClientOption.SOCKET_RCVBUF); + builder.setRcvBufSize(bufferSize > 0 ? bufferSize : config.getReadBufferSize()); + } + if (config.hasOption(ClickHouseClientOption.SOCKET_SNDBUF)) { + int bufferSize = config.getIntOption(ClickHouseClientOption.SOCKET_SNDBUF); + builder.setSndBufSize(bufferSize > 0 ? bufferSize : config.getWriteBufferSize()); + } + if (config.hasOption(ClickHouseClientOption.SOCKET_TCP_NODELAY)) { + builder.setTcpNoDelay(config.getBoolOption(ClickHouseClientOption.SOCKET_TCP_NODELAY)); + } + setDefaultSocketConfig(builder.build()); } } } diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java index b1b362dce..edb531ffe 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java @@ -1,6 +1,7 @@ package com.clickhouse.client.http; import java.io.IOException; +import java.io.OutputStream; import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; @@ -15,6 +16,7 @@ import java.util.Map.Entry; import com.clickhouse.client.ClickHouseChecker; +import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseCompression; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseCredentials; @@ -28,8 +30,31 @@ import com.clickhouse.client.config.ClickHouseOption; import com.clickhouse.client.data.ClickHouseExternalTable; import com.clickhouse.client.http.config.ClickHouseHttpOption; +import com.clickhouse.client.logging.Logger; +import com.clickhouse.client.logging.LoggerFactory; public abstract class ClickHouseHttpConnection implements AutoCloseable { + private static final Logger log = LoggerFactory.getLogger(ClickHouseHttpConnection.class); + + private static final byte[] HEADER_CONTENT_DISPOSITION = "content-disposition: form-data; name=\"" + .getBytes(StandardCharsets.US_ASCII); + private static final byte[] HEADER_OCTET_STREAM = "content-type: application/octet-stream\r\n" + .getBytes(StandardCharsets.US_ASCII); + private static final byte[] HEADER_BINARY_ENCODING = "content-transfer-encoding: binary\r\n\r\n" + .getBytes(StandardCharsets.US_ASCII); + + private static final byte[] ERROR_MSG_PREFIX = "Code: ".getBytes(StandardCharsets.US_ASCII); + + private static final byte[] DOUBLE_DASH = new byte[] { '-', '-' }; + private static final byte[] END_OF_NAME = new byte[] { '"', '\r', '\n' }; + private static final byte[] LINE_PREFIX = new byte[] { '\r', '\n', '-', '-' }; + private static final byte[] LINE_SUFFIX = new byte[] { '\r', '\n' }; + + private static final byte[] SUFFIX_QUERY = "query\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); + private static final byte[] SUFFIX_FORMAT = "_format\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); + private static final byte[] SUFFIX_STRUCTURE = "_structure\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); + private static final byte[] SUFFIX_FILENAME = "\"; filename=\"".getBytes(StandardCharsets.US_ASCII); + private static StringBuilder appendQueryParameter(StringBuilder builder, String key, String value) { return builder.append(urlEncode(key, StandardCharsets.UTF_8)).append('=') .append(urlEncode(value, StandardCharsets.UTF_8)).append('&'); @@ -212,6 +237,85 @@ protected static Map createDefaultHeaders(ClickHouseConfig confi return map; } + protected static String parseErrorFromException(String errorCode, String serverName, IOException e, byte[] bytes) { + log.debug("Failed to read error message[code=%s] from server [%s] due to: %s", errorCode, serverName, + e.getMessage()); + + int index = ClickHouseUtils.indexOf(bytes, ERROR_MSG_PREFIX); + final String errorMsg; + if (index > 0) { + errorMsg = new String(bytes, index, bytes.length - index, StandardCharsets.UTF_8); + } else if (!ClickHouseChecker.isNullOrBlank(errorCode)) { + errorMsg = new StringBuilder().append("Code: ").append(errorCode).append(", server: ").append(serverName) + .append(", ").append(new String(bytes, StandardCharsets.UTF_8)).toString(); + } else { + errorMsg = new String(bytes, StandardCharsets.UTF_8); + } + return errorMsg; + } + + protected static void postData(ClickHouseConfig config, byte[] boundary, String sql, ClickHouseInputStream data, + List tables, OutputStream requestStream) throws IOException { + final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable(); + + try (OutputStream rawOut = requestStream; + ClickHouseOutputStream out = hasFile + ? ClickHouseOutputStream.of(rawOut, config.getWriteBufferSize()) + : (data != null || boundary != null // NOSONAR + ? ClickHouseClient.getAsyncRequestOutputStream(config, rawOut, null) // latch::countDown) + : ClickHouseClient.getRequestOutputStream(config, rawOut, null))) { + byte[] sqlBytes = hasFile ? new byte[0] : sql.getBytes(StandardCharsets.UTF_8); + if (boundary != null) { + rawOut.write(LINE_PREFIX); + rawOut.write(boundary); + rawOut.write(LINE_SUFFIX); + rawOut.write(HEADER_CONTENT_DISPOSITION); + rawOut.write(SUFFIX_QUERY); + rawOut.write(sqlBytes); + + final int writeBufferSize = config.getWriteBufferSize(); + for (ClickHouseExternalTable t : tables) { + byte[] tableName = t.getName().getBytes(StandardCharsets.UTF_8); + for (int i = 0; i < 3; i++) { + rawOut.write(LINE_PREFIX); + rawOut.write(boundary); + rawOut.write(LINE_SUFFIX); + rawOut.write(HEADER_CONTENT_DISPOSITION); + rawOut.write(tableName); + if (i == 0) { + rawOut.write(SUFFIX_FORMAT); + rawOut.write(t.getFormat().name().getBytes(StandardCharsets.US_ASCII)); + } else if (i == 1) { + rawOut.write(SUFFIX_STRUCTURE); + rawOut.write(t.getStructure().getBytes(StandardCharsets.UTF_8)); + } else { + rawOut.write(SUFFIX_FILENAME); + rawOut.write(tableName); + rawOut.write(END_OF_NAME); + break; + } + } + rawOut.write(HEADER_OCTET_STREAM); + rawOut.write(HEADER_BINARY_ENCODING); + ClickHouseInputStream.pipe(t.getContent(), rawOut, writeBufferSize); + } + rawOut.write(LINE_PREFIX); + rawOut.write(boundary); + rawOut.write(DOUBLE_DASH); + rawOut.write(LINE_SUFFIX); + } else { + out.writeBytes(sqlBytes); + if (data != null && data.available() > 0) { + // append \n + if (sqlBytes.length > 0 && sqlBytes[sqlBytes.length - 1] != (byte) '\n') { + out.write(10); + } + ClickHouseInputStream.pipe(data, out, config.getWriteBufferSize()); + } + } + } + } + protected final ClickHouseConfig config; protected final ClickHouseNode server; protected final ClickHouseOutputStream output; diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpEntity.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpEntity.java index 4df4c7f07..7fde27b92 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpEntity.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpEntity.java @@ -1,46 +1,40 @@ package com.clickhouse.client.http; -import com.clickhouse.client.ClickHouseClient; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseInputStream; -import com.clickhouse.client.ClickHouseOutputStream; +import com.clickhouse.client.data.ClickHouseExternalTable; + import org.apache.hc.core5.http.io.entity.AbstractHttpEntity; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Objects; +import java.util.List; /** * Used to encapsulate post request. */ public class ClickHouseHttpEntity extends AbstractHttpEntity { - /** - * Data to send - */ - private final ClickHouseInputStream in; private final ClickHouseConfig config; - /** - * Indicate that there is extra data which comes from file. - */ - private final boolean hasFile; - /** - * Indicate that there is extra data which comes from external tables. - */ - private final boolean hasInput; + private final byte[] boundary; + private final String sql; + private final ClickHouseInputStream data; + private final List tables; + + protected ClickHouseHttpEntity(ClickHouseConfig config, String contentType, String contentEncoding, byte[] boundary, + String sql, ClickHouseInputStream data, List tables) { + super(contentType, contentEncoding, data != null || boundary != null); - public ClickHouseHttpEntity(ClickHouseInputStream in, ClickHouseConfig config, String contentType, - String contentEncoding, boolean hasFile, boolean hasInput) { - super(contentType, contentEncoding, hasInput); - this.in = in; this.config = config; - this.hasFile = hasFile; - this.hasInput = hasInput; + this.boundary = boundary; + this.sql = sql; + this.data = data; + this.tables = tables; } @Override - public boolean isRepeatable() { - return false; + public InputStream getContent() throws IOException, UnsupportedOperationException { + return ClickHouseInputStream.empty(); } @Override @@ -49,35 +43,22 @@ public long getContentLength() { } @Override - public InputStream getContent() throws IOException, UnsupportedOperationException { - return in; + public boolean isRepeatable() { + return false; } @Override - public void writeTo(OutputStream outStream) throws IOException { - Objects.requireNonNull(outStream, "outStream"); - try { - ClickHouseOutputStream wrappedOut = hasFile - ? ClickHouseOutputStream.of(outStream, config.getWriteBufferSize()) - : (hasInput - ? ClickHouseClient.getAsyncRequestOutputStream(config, outStream, null) - : ClickHouseClient.getRequestOutputStream(config, outStream, null)); - in.pipe(wrappedOut); - wrappedOut.flush(); - } finally { - in.close(); - } + public boolean isStreaming() { + return false; } @Override - public boolean isStreaming() { - return false; + public void writeTo(OutputStream outStream) throws IOException { + ClickHouseHttpConnection.postData(config, boundary, sql, data, tables, outStream); } @Override public void close() throws IOException { - if (in != null) { - in.close(); - } + // nothing to do } } diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java index 3ff5068fa..99ec2d425 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/HttpUrlConnectionImpl.java @@ -6,7 +6,6 @@ import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; -import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseSslContextProvider; import com.clickhouse.client.ClickHouseUtils; @@ -29,7 +28,6 @@ import java.net.HttpURLConnection; import java.net.Proxy; import java.net.URL; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; @@ -44,25 +42,6 @@ public class HttpUrlConnectionImpl extends ClickHouseHttpConnection { private static final Logger log = LoggerFactory.getLogger(HttpUrlConnectionImpl.class); - private static final byte[] HEADER_CONTENT_DISPOSITION = "content-disposition: form-data; name=\"" - .getBytes(StandardCharsets.US_ASCII); - private static final byte[] HEADER_OCTET_STREAM = "content-type: application/octet-stream\r\n" - .getBytes(StandardCharsets.US_ASCII); - private static final byte[] HEADER_BINARY_ENCODING = "content-transfer-encoding: binary\r\n\r\n" - .getBytes(StandardCharsets.US_ASCII); - - private static final byte[] ERROR_MSG_PREFIX = "Code: ".getBytes(StandardCharsets.US_ASCII); - - private static final byte[] DOUBLE_DASH = new byte[] { '-', '-' }; - private static final byte[] END_OF_NAME = new byte[] { '"', '\r', '\n' }; - private static final byte[] LINE_PREFIX = new byte[] { '\r', '\n', '-', '-' }; - private static final byte[] LINE_SUFFIX = new byte[] { '\r', '\n' }; - - private static final byte[] SUFFIX_QUERY = "query\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_FORMAT = "_format\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_STRUCTURE = "_structure\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_FILENAME = "\"; filename=\"".getBytes(StandardCharsets.US_ASCII); - private final HttpURLConnection conn; private ClickHouseHttpResponse buildResponse(Runnable postCloseAction) throws IOException { @@ -175,8 +154,9 @@ private void checkResponse(HttpURLConnection conn) throws IOException { InputStream errorInput = conn.getErrorStream(); if (errorInput == null) { // TODO follow redirects? - throw new ConnectException(ClickHouseUtils.format("HTTP response %d %s", conn.getResponseCode(), - conn.getResponseMessage())); + throw new ConnectException(ClickHouseUtils.format( + "HTTP response %d %s (ClickHouse error %s returned from %s)", conn.getResponseCode(), + conn.getResponseMessage(), errorCode, serverName)); } String errorMsg; @@ -194,13 +174,8 @@ private void checkResponse(HttpURLConnection conn) throws IOException { } errorMsg = builder.toString(); } catch (IOException e) { - log.debug("Failed to read error message[code=%s] from server [%s] due to: %s", errorCode, serverName, - e.getMessage()); - int index = ClickHouseUtils.indexOf(bytes, ERROR_MSG_PREFIX); - errorMsg = index > 0 ? new String(bytes, index, bytes.length - index, StandardCharsets.UTF_8) - : new String(bytes, StandardCharsets.UTF_8); + errorMsg = parseErrorFromException(errorCode, serverName, e, bytes); } - throw new IOException(errorMsg); } } @@ -221,79 +196,23 @@ protected boolean isReusable() { protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, List tables, String url, Map headers, ClickHouseConfig config, Runnable postCloseAction) throws IOException { - Charset ascii = StandardCharsets.US_ASCII; byte[] boundary = null; if (tables != null && !tables.isEmpty()) { String uuid = rm.createUniqueId(); conn.setRequestProperty("content-type", "multipart/form-data; boundary=".concat(uuid)); - boundary = uuid.getBytes(ascii); + boundary = uuid.getBytes(StandardCharsets.US_ASCII); } else { conn.setRequestProperty("content-type", "text/plain; charset=UTF-8"); } setHeaders(conn, headers); - ClickHouseConfig c = config; - final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable(); - final boolean hasInput = data != null || boundary != null; - if (hasInput) { + if (data != null || boundary != null) { conn.setChunkedStreamingMode(config.getRequestChunkSize()); } else { // TODO conn.setFixedLengthStreamingMode(contentLength); } - try (ClickHouseOutputStream out = hasFile - ? ClickHouseOutputStream.of(conn.getOutputStream(), config.getWriteBufferSize()) - : (hasInput - ? ClickHouseClient.getAsyncRequestOutputStream(config, conn.getOutputStream(), null) // latch::countDown) - : ClickHouseClient.getRequestOutputStream(c, conn.getOutputStream(), null))) { - Charset utf8 = StandardCharsets.UTF_8; - byte[] sqlBytes = hasFile ? new byte[0] : sql.getBytes(utf8); - if (boundary != null) { - out.writeBytes(LINE_PREFIX); - out.writeBytes(boundary); - out.writeBytes(LINE_SUFFIX); - out.writeBytes(HEADER_CONTENT_DISPOSITION); - out.writeBytes(SUFFIX_QUERY); - out.writeBytes(sqlBytes); - for (ClickHouseExternalTable t : tables) { - byte[] tableName = t.getName().getBytes(utf8); - for (int i = 0; i < 3; i++) { - out.writeBytes(LINE_PREFIX); - out.writeBytes(boundary); - out.writeBytes(LINE_SUFFIX); - out.writeBytes(HEADER_CONTENT_DISPOSITION); - out.writeBytes(tableName); - if (i == 0) { - out.writeBytes(SUFFIX_FORMAT); - out.writeBytes(t.getFormat().name().getBytes(ascii)); - } else if (i == 1) { - out.writeBytes(SUFFIX_STRUCTURE); - out.writeBytes(t.getStructure().getBytes(utf8)); - } else { - out.writeBytes(SUFFIX_FILENAME); - out.writeBytes(tableName); - out.writeBytes(END_OF_NAME); - break; - } - } - out.writeBytes(HEADER_OCTET_STREAM); - out.writeBytes(HEADER_BINARY_ENCODING); - ClickHouseInputStream.pipe(t.getContent(), out, c.getWriteBufferSize()); - } - out.writeBytes(LINE_PREFIX); - out.writeBytes(boundary); - out.writeBytes(DOUBLE_DASH); - out.writeBytes(LINE_SUFFIX); - } else { - out.writeBytes(sqlBytes); - if (data != null && data.available() > 0) { - // append \n - if (sqlBytes.length > 0 && sqlBytes[sqlBytes.length - 1] != (byte) '\n') { - out.write(10); - } - ClickHouseInputStream.pipe(data, out, c.getWriteBufferSize()); - } - } - } + + postData(config, boundary, sql, data, tables, conn.getOutputStream()); checkResponse(conn); diff --git a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java index 1cd687347..eb084acad 100644 --- a/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java +++ b/clickhouse-http-client/src/main/java11/com/clickhouse/client/http/HttpClientConnectionImpl.java @@ -7,11 +7,9 @@ import com.clickhouse.client.ClickHouseFormat; import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; -import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHousePipedOutputStream; import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseSslContextProvider; -import com.clickhouse.client.ClickHouseUtils; import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.data.ClickHouseExternalTable; import com.clickhouse.client.http.config.ClickHouseHttpOption; @@ -19,14 +17,12 @@ import com.clickhouse.client.logging.LoggerFactory; import java.io.BufferedReader; -import java.io.BufferedWriter; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.OutputStream; -import java.io.OutputStreamWriter; import java.io.Reader; import java.io.UncheckedIOException; import java.net.ConnectException; @@ -42,7 +38,6 @@ import java.net.http.HttpResponse; import java.net.http.HttpClient.Redirect; import java.net.http.HttpClient.Version; -import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.List; @@ -53,7 +48,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.function.Function; import javax.net.ssl.SSLContext; @@ -79,25 +73,6 @@ public List select(URI uri) { private static final Logger log = LoggerFactory.getLogger(HttpClientConnectionImpl.class); - private static final byte[] HEADER_CONTENT_DISPOSITION = "content-disposition: form-data; name=\"" - .getBytes(StandardCharsets.US_ASCII); - private static final byte[] HEADER_OCTET_STREAM = "content-type: application/octet-stream\r\n" - .getBytes(StandardCharsets.US_ASCII); - private static final byte[] HEADER_BINARY_ENCODING = "content-transfer-encoding: binary\r\n\r\n" - .getBytes(StandardCharsets.US_ASCII); - - private static final byte[] ERROR_MSG_PREFIX = "Code: ".getBytes(StandardCharsets.US_ASCII); - - private static final byte[] DOUBLE_DASH = new byte[] { '-', '-' }; - private static final byte[] END_OF_NAME = new byte[] { '"', '\r', '\n' }; - private static final byte[] LINE_PREFIX = new byte[] { '\r', '\n', '-', '-' }; - private static final byte[] LINE_SUFFIX = new byte[] { '\r', '\n' }; - - private static final byte[] SUFFIX_QUERY = "query\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_FORMAT = "_format\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_STRUCTURE = "_structure\"\r\n\r\n".getBytes(StandardCharsets.US_ASCII); - private static final byte[] SUFFIX_FILENAME = "\"; filename=\"".getBytes(StandardCharsets.US_ASCII); - private final HttpClient httpClient; private final HttpRequest pingRequest; @@ -177,11 +152,7 @@ private HttpResponse checkResponse(ClickHouseConfig config, HttpRes } errorMsg = builder.toString(); } catch (IOException e) { - log.debug("Failed to read error message[code=%s] from server [%s] due to: %s", errorCode, serverName, - e.getMessage()); - int index = ClickHouseUtils.indexOf(bytes, ERROR_MSG_PREFIX); - errorMsg = index > 0 ? new String(bytes, index, bytes.length - index, StandardCharsets.UTF_8) - : new String(bytes, StandardCharsets.UTF_8); + errorMsg = parseErrorFromException(errorCode, serverName, e, bytes); } throw new IOException(errorMsg); @@ -231,68 +202,13 @@ private CompletableFuture> postRequest(HttpRequest req private ClickHouseHttpResponse postStream(ClickHouseConfig config, HttpRequest.Builder reqBuilder, byte[] boundary, String sql, ClickHouseInputStream data, List tables, Runnable postAction) throws IOException { - Charset ascii = StandardCharsets.US_ASCII; - ClickHouseConfig c = config; - final boolean hasFile = data != null && data.getUnderlyingFile().isAvailable(); - - ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(c, + ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config, null); reqBuilder.POST(HttpRequest.BodyPublishers.ofInputStream(stream::getInputStream)); // running in async is necessary to avoid deadlock of the piped stream CompletableFuture> f = postRequest(reqBuilder.build()); - try (ClickHouseOutputStream out = stream) { - Charset utf8 = StandardCharsets.UTF_8; - byte[] sqlBytes = hasFile ? new byte[0] : sql.getBytes(utf8); - - if (boundary != null) { - out.writeBytes(LINE_PREFIX); - out.writeBytes(boundary); - out.writeBytes(LINE_SUFFIX); - out.writeBytes(HEADER_CONTENT_DISPOSITION); - out.writeBytes(SUFFIX_QUERY); - out.writeBytes(sqlBytes); - for (ClickHouseExternalTable t : tables) { - byte[] tableName = t.getName().getBytes(utf8); - for (int i = 0; i < 3; i++) { - out.writeBytes(LINE_PREFIX); - out.writeBytes(boundary); - out.writeBytes(LINE_SUFFIX); - out.writeBytes(HEADER_CONTENT_DISPOSITION); - out.writeBytes(tableName); - if (i == 0) { - out.writeBytes(SUFFIX_FORMAT); - out.writeBytes(t.getFormat().name().getBytes(ascii)); - } else if (i == 1) { - out.writeBytes(SUFFIX_STRUCTURE); - out.writeBytes(t.getStructure().getBytes(utf8)); - } else { - out.writeBytes(SUFFIX_FILENAME); - out.writeBytes(tableName); - out.writeBytes(END_OF_NAME); - break; - } - } - out.writeBytes(HEADER_OCTET_STREAM); - out.writeBytes(HEADER_BINARY_ENCODING); - ClickHouseInputStream.pipe(t.getContent(), out, c.getWriteBufferSize()); - } - - out.writeBytes(LINE_PREFIX); - out.writeBytes(boundary); - out.writeBytes(DOUBLE_DASH); - out.writeBytes(LINE_SUFFIX); - } else { - out.writeBytes(sqlBytes); - if (data != null && data.available() > 0) { - // append \n - if (sqlBytes.length > 0 && sqlBytes[sqlBytes.length - 1] != (byte) '\n') { - out.write(10); - } - ClickHouseInputStream.pipe(data, out, c.getWriteBufferSize()); - } - } - } + postData(config, boundary, sql, data, tables, stream); HttpResponse r; try { @@ -355,7 +271,8 @@ protected ClickHouseHttpResponse post(String sql, ClickHouseInputStream data, Li } } - return boundary != null || data != null ? postStream(c, reqBuilder, boundary, sql, data, tables, postAction) + return boundary != null || data != null || c.isRequestCompressed() + ? postStream(c, reqBuilder, boundary, sql, data, tables, postAction) : postString(c, reqBuilder, sql, postAction); } diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java index 6533efd08..08edf014d 100644 --- a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java +++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java @@ -1,5 +1,7 @@ package com.clickhouse.client.http; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.Serializable; import java.util.Collections; @@ -7,6 +9,7 @@ import java.util.UUID; import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseCompression; import com.clickhouse.client.ClickHouseConfig; import com.clickhouse.client.ClickHouseCredentials; import com.clickhouse.client.ClickHouseException; @@ -14,6 +17,7 @@ import com.clickhouse.client.ClickHouseInputStream; import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseNodeSelector; +import com.clickhouse.client.ClickHouseOutputStream; import com.clickhouse.client.ClickHouseParameterizedQuery; import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.ClickHouseRecord; @@ -21,6 +25,7 @@ import com.clickhouse.client.ClickHouseRequestManager; import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.ClickHouseResponseSummary; +import com.clickhouse.client.ClickHouseUtils; import com.clickhouse.client.ClickHouseVersion; import com.clickhouse.client.ClientIntegrationTest; import com.clickhouse.client.config.ClickHouseClientOption; @@ -32,9 +37,41 @@ import com.clickhouse.client.http.config.HttpConnectionProvider; import org.testng.Assert; +import org.testng.SkipException; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; public class ClickHouseHttpClientTest extends ClientIntegrationTest { + @DataProvider(name = "requestCompressionMatrix") + protected Object[][] getRequestCompressionMatrix() { + return new Object[][] { + { ClickHouseCompression.NONE, -2, 2, 1 }, + { ClickHouseCompression.LZ4, -2, 19, 1 }, // [0, 18] + { ClickHouseCompression.SNAPPY, -2, 33, 1024 }, // [1 * 1024, 32 * 1024] + { ClickHouseCompression.ZSTD, -2, 23, 1 }, // [0, 22] + }; + } + + @DataProvider(name = "mixedCompressionMatrix") + protected Object[][] getMixedCompressionMatrix() { + // ClickHouse Code: 638. DB::Exception: hadoop snappy decode + // error:INVALID_INPUT. (SNAPPY_UNCOMPRESS_FAILED) + ClickHouseCompression[] supportedRequestCompression = { ClickHouseCompression.NONE, ClickHouseCompression.LZ4, + ClickHouseCompression.ZSTD }; + ClickHouseCompression[] supportedResponseCompression = { ClickHouseCompression.NONE, + ClickHouseCompression.BROTLI, ClickHouseCompression.BZ2, ClickHouseCompression.DEFLATE, + ClickHouseCompression.GZIP, ClickHouseCompression.LZ4, ClickHouseCompression.XZ, + ClickHouseCompression.ZSTD }; + Object[][] matrix = new Object[supportedRequestCompression.length * supportedResponseCompression.length][]; + int i = 0; + for (ClickHouseCompression reqComp : supportedRequestCompression) { + for (ClickHouseCompression respComp : supportedResponseCompression) { + matrix[i++] = new Object[] { reqComp, respComp }; + } + } + return matrix; + } + @Override protected ClickHouseProtocol getProtocol() { return ClickHouseProtocol.HTTP; @@ -51,6 +88,81 @@ protected Map getClientOptions() { HttpConnectionProvider.HTTP_URL_CONNECTION); } + @Test(dataProvider = "requestCompressionMatrix", groups = "integration") + public void testCompressedRequest(ClickHouseCompression compression, int startLevel, int endLevel, int step) + throws ClickHouseException { + final ClickHouseNode server = getServer(); + final int readBufferSize = ClickHouseUtils.getBufferSize( + (int) ClickHouseClientOption.READ_BUFFER_SIZE.getDefaultValue(), + (int) ClickHouseClientOption.BUFFER_SIZE.getDefaultValue(), + (int) ClickHouseClientOption.MAX_BUFFER_SIZE.getDefaultValue()); + + if (compression == ClickHouseCompression.SNAPPY) { + if (!checkServerVersion(getClient(), server, "[22.3,)")) { + throw new SkipException("Snappy decompression was supported since 22.3"); + } + } + + for (int i = startLevel; i <= endLevel; i += step) { + final ByteArrayOutputStream o = new ByteArrayOutputStream(); + try (ClickHouseOutputStream out = ClickHouseOutputStream.of(o, readBufferSize, compression, i, null)) { + out.write("1,23\n4,56".getBytes()); + out.flush(); + } catch (IOException e) { + throw ClickHouseException.of(e, server); + } + try (ClickHouseClient client = getClient(); + ClickHouseResponse response = newRequest(client, server) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .compressServerResponse(false) + .decompressClientRequest(true, compression, i) + .external( + // external table with compressed data + ClickHouseExternalTable.builder().name("x").columns("i Int32, s String") + .compression(compression) + .format(ClickHouseFormat.CSV) + .content(new ByteArrayInputStream(o.toByteArray())).build(), + // external table without compression + ClickHouseExternalTable.builder().name("y").columns("s String, i Int32") + .format(ClickHouseFormat.TSV) + .content(new ByteArrayInputStream("32\t1\n43\t2\n54\t3\n65\t4".getBytes())) + .build()) + .query("select x.* from x inner join y on x.i = y.i").executeAndWait()) { + int j = 0; + for (ClickHouseRecord r : response.records()) { + Assert.assertEquals(r.getValue(0).asInteger(), j == 0 ? 1 : 4); + Assert.assertEquals(r.getValue(1).asInteger(), j == 0 ? 23 : 56); + j++; + } + Assert.assertEquals(j, 2); + } + } + } + + @Test(dataProvider = "mixedCompressionMatrix", groups = "integration") + public void testDecompressResponse(ClickHouseCompression reqComp, ClickHouseCompression respComp) + throws ClickHouseException { + if (reqComp == ClickHouseCompression.SNAPPY || respComp == ClickHouseCompression.BZ2) { + if (!checkServerVersion(getClient(), getServer(), "[22.10,)")) { + throw new SkipException("Snappy and bz2 were all supported since 22.10"); + } + } + + try (ClickHouseClient client = getClient(); + ClickHouseResponse response = newRequest(client, getServer()) + .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) + .decompressClientRequest(true, reqComp) + .compressServerResponse(true, respComp) + .query("select number n, toString(number+1) s from numbers(10)").executeAndWait()) { + int i = 0; + for (ClickHouseRecord r : response.records()) { + Assert.assertEquals(r.getValue(0).asInteger(), i++); + Assert.assertEquals(r.getValue(1).asInteger(), i); + } + Assert.assertEquals(i, 10); + } + } + @Test(groups = "integration") public void testAuthentication() throws ClickHouseException { String sql = "select currentUser()"; diff --git a/clickhouse-r2dbc/pom.xml b/clickhouse-r2dbc/pom.xml index 3f34372f0..b7b8e84ef 100644 --- a/clickhouse-r2dbc/pom.xml +++ b/clickhouse-r2dbc/pom.xml @@ -74,7 +74,7 @@ ${project.parent.groupId} org.roaringbitmap - provided + true * diff --git a/pom.xml b/pom.xml index 59e64d038..b3bec5be8 100644 --- a/pom.xml +++ b/pom.xml @@ -1,5 +1,7 @@ - + 4.0.0 com.clickhouse @@ -81,6 +83,8 @@ 6.0.53 5.2.1 1.11.1 + 0.1.2 + 1.9.0 1.12.13 3.1.2 1.22 @@ -95,6 +99,8 @@ 0.9.36 2.0.6 1.1.8.4 + 1.9 + 1.5.2-5 1.17.6 7.5 @@ -161,6 +167,11 @@ ${repackaged.version} + + com.aayushatharva.brotli4j + brotli4j + ${brotli4j.version} + com.google.code.gson gson @@ -171,6 +182,11 @@ caffeine ${caffeine.version} + + com.github.luben + zstd-jni + ${zstd-jni.version} + dnsjava dnsjava @@ -218,6 +234,11 @@ annotations-api ${annotations-api.version} + + org.brotli + dec + ${brotli.version} + org.jctools jctools-core @@ -248,6 +269,11 @@ slf4j-simple ${slf4j.version} + + org.tukaani + xz + ${xz.version} + org.xerial.snappy snappy-java @@ -545,7 +571,7 @@ true - + @@ -774,7 +800,7 @@ - + release @@ -834,10 +860,10 @@ java11 compile - + @@ -1052,10 +1078,10 @@ java11 compile - +