diff --git a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataType.java b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataType.java index c4fa05f22..9a22fb4b9 100644 --- a/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataType.java +++ b/clickhouse-data/src/main/java/com/clickhouse/data/ClickHouseDataType.java @@ -130,7 +130,7 @@ public enum ClickHouseDataType implements SQLType { Dynamic(Object.class, true, true, false, 0, 0, 0, 0, 0, true, 0x2B), Time(LocalDateTime.class, true, false, false, 4, 9, 0, 0, 9, false, 0x32), // 0x33 for Time(Timezone) Time64(LocalDateTime.class, true, false, false, 8, 9, 0, 0, 0, false, 0x34), // 0x35 for Time64(P, Timezone) - QBit(Double.class, true, true, false, 0, 0, 0, 0, 0, true, 0x36), + QBit(Double.class, true, true, false, 0, 0, 0, 0, 0, false, 0x36), ; public static final List ORDERED_BY_RANGE_INT_TYPES = diff --git a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseColumnTest.java b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseColumnTest.java index 886ce9027..2c08cd48d 100644 --- a/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseColumnTest.java +++ b/clickhouse-data/src/test/java/com/clickhouse/data/ClickHouseColumnTest.java @@ -423,7 +423,7 @@ public boolean isWidenUnsignedTypes() { if (type.isNested() || type == ClickHouseDataType.AggregateFunction || type == ClickHouseDataType.SimpleAggregateFunction || type == ClickHouseDataType.Enum || type == ClickHouseDataType.Nullable || type == ClickHouseDataType.BFloat16 || - type == ClickHouseDataType.Time || type == ClickHouseDataType.Time64) { + type == ClickHouseDataType.Time || type == ClickHouseDataType.Time64 || type == ClickHouseDataType.QBit) { continue; } diff --git a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java index c8d801ce9..06dcfe1fa 100644 --- a/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java +++ b/clickhouse-http-client/src/main/java/com/clickhouse/client/http/ClickHouseHttpConnection.java @@ -275,11 +275,10 @@ protected static Map createDefaultHeaders(ClickHouseConfig confi } // Also, you can use the ‘default_format’ URL parameter map.put("x-clickhouse-format", config.getFormat().name()); - if (config.isResponseCompressed()) { + if (config.isResponseCompressed() && config.getResponseCompressAlgorithm() != ClickHouseCompression.LZ4) { map.put("accept-encoding", config.getResponseCompressAlgorithm().encoding()); } - if (config.isRequestCompressed() - && config.getRequestCompressAlgorithm() != ClickHouseCompression.LZ4) { + if (config.isRequestCompressed() && config.getRequestCompressAlgorithm() != ClickHouseCompression.LZ4) { map.put("content-encoding", config.getRequestCompressAlgorithm().encoding()); } return map; diff --git a/client-v2/pom.xml b/client-v2/pom.xml index f178105e3..2908cadfb 100644 --- a/client-v2/pom.xml +++ b/client-v2/pom.xml @@ -141,6 +141,12 @@ 5.19.0 test + + com.github.luben + zstd-jni + 1.5.7-6 + test + diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/CompressedEntity.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/CompressedEntity.java new file mode 100644 index 000000000..83b2ac885 --- /dev/null +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/CompressedEntity.java @@ -0,0 +1,110 @@ +package com.clickhouse.client.api.internal; + +import org.apache.commons.compress.compressors.CompressorException; +import org.apache.commons.compress.compressors.CompressorStreamFactory; +import org.apache.hc.core5.function.Supplier; +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.List; +import java.util.Set; + +public class CompressedEntity implements HttpEntity { + + private HttpEntity httpEntity; + private final boolean isResponse; + private final CompressorStreamFactory compressorStreamFactory; + private final String compressionAlgo; + + CompressedEntity(HttpEntity httpEntity, boolean isResponse, CompressorStreamFactory compressorStreamFactory) { + this.httpEntity = httpEntity; + this.isResponse = isResponse; + this.compressorStreamFactory = compressorStreamFactory; + this.compressionAlgo = getCompressionAlgoName(httpEntity.getContentEncoding()); + } + + @Override + public boolean isRepeatable() { + return httpEntity.isRepeatable(); + } + + @Override + public InputStream getContent() throws IOException, UnsupportedOperationException { + if (!isResponse) { + throw new UnsupportedOperationException("Unsupported: getting compressed content of request"); + } + + try { + return compressorStreamFactory.createCompressorInputStream(compressionAlgo, httpEntity.getContent()); + } catch (CompressorException e) { + throw new IOException("Failed to create decompressing input stream", e); + } + } + + @Override + public void writeTo(OutputStream outStream) throws IOException { + if (isResponse) { + // called by us to get compressed response + throw new UnsupportedOperationException("Unsupported: writing compressed response to elsewhere"); + } + + try { + httpEntity.writeTo(compressorStreamFactory.createCompressorOutputStream(compressionAlgo, outStream)); + } catch (CompressorException e) { + throw new IOException("Failed to create compressing output stream", e); + } + } + + @Override + public boolean isStreaming() { + return httpEntity.isStreaming(); + } + + @Override + public Supplier> getTrailers() { + return httpEntity.getTrailers(); + } + + @Override + public void close() throws IOException { + httpEntity.close(); + } + + @Override + public long getContentLength() { + return httpEntity.getContentLength(); + } + + @Override + public String getContentType() { + return httpEntity.getContentType(); + } + + @Override + public String getContentEncoding() { + return httpEntity.getContentEncoding(); + } + + @Override + public boolean isChunked() { + return httpEntity.isChunked(); + } + + @Override + public Set getTrailerNames() { + return httpEntity.getTrailerNames(); + } + + private String getCompressionAlgoName(String contentEncoding) { + String algo = contentEncoding; + if (algo.equalsIgnoreCase("gzip")) { + algo = CompressorStreamFactory.GZIP; + } else if (algo.equalsIgnoreCase("lz4")) { + algo = CompressorStreamFactory.LZ4_FRAMED; + } + return algo; + } +} diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java index df449c90e..de3f4de78 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java @@ -16,6 +16,7 @@ import com.clickhouse.client.api.transport.Endpoint; import com.clickhouse.data.ClickHouseFormat; import net.jpountz.lz4.LZ4Factory; +import org.apache.commons.compress.compressors.CompressorStreamFactory; import org.apache.hc.client5.http.ConnectTimeoutException; import org.apache.hc.client5.http.classic.methods.HttpPost; import org.apache.hc.client5.http.config.ConnectionConfig; @@ -105,6 +106,8 @@ public class HttpAPIClientHelper { private static final int ERROR_BODY_BUFFER_SIZE = 1024; // Error messages are usually small + private final String DEFAULT_HTTP_COMPRESSION_ALGO = "lz4"; + private static final Pattern PATTERN_HEADER_VALUE_ASCII = Pattern.compile( "\\p{Graph}+(?:[ ]\\p{Graph}+)*"); @@ -322,6 +325,8 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map TimeValue.ofMilliseconds(keepAliveTimeout)); } + clientBuilder.disableContentCompression(); // will handle ourselves + return clientBuilder.build(); } @@ -427,14 +432,12 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r // req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding addHeaders(req, requestConfig); - boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); - boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); - boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); - // setting entity. wrapping if compression is enabled - req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback), - clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig)); + String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null; + req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback), + lz4Factory, + requestConfig)); HttpClientContext context = HttpClientContext.create(); Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig); @@ -448,8 +451,11 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map r ClassicHttpResponse httpResponse = null; try { httpResponse = httpClient.executeOpen(null, req, context); - boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); - httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(), httpResponse.getCode(), serverCompression, useHttpCompression, lz4Factory, requestConfig)); + + httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(), + httpResponse.getCode(), + lz4Factory, + requestConfig)); if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) { throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings."); @@ -493,30 +499,30 @@ public static void closeQuietly(ClassicHttpResponse httpResponse) { private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8"); private void addHeaders(HttpPost req, Map requestConfig) { - addHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType()); + setHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType()); if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) { - addHeader( + setHeader( req, ClickHouseHttpProto.HEADER_FORMAT, ((ClickHouseFormat) requestConfig.get(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())).name()); } if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) { - addHeader( + setHeader( req, ClickHouseHttpProto.HEADER_QUERY_ID, (String) requestConfig.get(ClientConfigProperties.QUERY_ID.getKey())); } - addHeader( + setHeader( req, ClickHouseHttpProto.HEADER_DATABASE, ClientConfigProperties.DATABASE.getOrDefault(requestConfig)); if (ClientConfigProperties.SSL_AUTH.getOrDefault(requestConfig).booleanValue()) { - addHeader( + setHeader( req, ClickHouseHttpProto.HEADER_DB_USER, ClientConfigProperties.USER.getOrDefault(requestConfig)); - addHeader( + setHeader( req, ClickHouseHttpProto.HEADER_SSL_CERT_AUTH, "on"); @@ -529,11 +535,11 @@ private void addHeaders(HttpPost req, Map requestConfig) { "Basic " + Base64.getEncoder().encodeToString( (user + ":" + password).getBytes(StandardCharsets.UTF_8))); } else { - addHeader( + setHeader( req, ClickHouseHttpProto.HEADER_DB_USER, ClientConfigProperties.USER.getOrDefault(requestConfig)); - addHeader( + setHeader( req, ClickHouseHttpProto.HEADER_DB_PASSWORD, ClientConfigProperties.PASSWORD.getOrDefault(requestConfig)); @@ -551,10 +557,11 @@ private void addHeaders(HttpPost req, Map requestConfig) { if (useHttpCompression) { if (serverCompression) { - addHeader(req, HttpHeaders.ACCEPT_ENCODING, "lz4"); + setHeader(req, HttpHeaders.ACCEPT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO); } + if (clientCompression && !appCompressedData) { - addHeader(req, HttpHeaders.CONTENT_ENCODING, "lz4"); + setHeader(req, HttpHeaders.CONTENT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO); } } @@ -562,7 +569,7 @@ private void addHeaders(HttpPost req, Map requestConfig) { if (key.startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) { Object val = requestConfig.get(key); if (val != null) { - addHeader( + setHeader( req, key.substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()), String.valueOf(val)); @@ -626,11 +633,19 @@ private void addQueryParams(URIBuilder req, Map requestConfig) { } } - private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompression, boolean useHttpCompression, - boolean appControlledCompression, LZ4Factory lz4Factory, Map requestConfig) { - LOG.debug("wrapRequestEntity: client compression: {}, http compression: {}", clientCompression, useHttpCompression); + private HttpEntity wrapRequestEntity(HttpEntity httpEntity, LZ4Factory lz4Factory, Map requestConfig) { - if (clientCompression && !appControlledCompression) { + boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig); + boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); + boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig); + + LOG.debug("wrapRequestEntity: client compression: {}, http compression: {}, content encoding: {}", + clientCompression, useHttpCompression, httpEntity.getContentEncoding()); + + if (httpEntity.getContentEncoding() != null && !appCompressedData) { + // http header is set and data is not compressed + return new CompressedEntity(httpEntity, false, CompressorStreamFactory.getSingleton()); + } else if (clientCompression && !appCompressedData) { int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig); return new LZ4Entity(httpEntity, useHttpCompression, false, true, buffSize, false, lz4Factory); @@ -639,25 +654,22 @@ private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompre } } - private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, boolean serverCompression, boolean useHttpCompression, LZ4Factory lz4Factory, Map requestConfig) { - LOG.debug("wrapResponseEntity: server compression: {}, http compression: {}", serverCompression, useHttpCompression); - - if (serverCompression) { - // Server doesn't compress certain errors like 403 - switch (httpStatus) { - case HttpStatus.SC_OK: - case HttpStatus.SC_CREATED: - case HttpStatus.SC_ACCEPTED: - case HttpStatus.SC_NO_CONTENT: - case HttpStatus.SC_PARTIAL_CONTENT: - case HttpStatus.SC_RESET_CONTENT: - case HttpStatus.SC_NOT_MODIFIED: - case HttpStatus.SC_BAD_REQUEST: - case HttpStatus.SC_INTERNAL_SERVER_ERROR: - case HttpStatus.SC_NOT_FOUND: - int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig); - return new LZ4Entity(httpEntity, useHttpCompression, true, false, buffSize, true, lz4Factory); - } + private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, LZ4Factory lz4Factory, Map requestConfig) { + boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig); + boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig); + + LOG.debug("wrapResponseEntity: server compression: {}, http compression: {}, content encoding: {}", + serverCompression, useHttpCompression, httpEntity.getContentEncoding()); + + if (httpEntity.getContentEncoding() != null) { + // http compressed response + return new CompressedEntity(httpEntity, true, CompressorStreamFactory.getSingleton()); + } + + // data compression + if (serverCompression && !(httpStatus == HttpStatus.SC_FORBIDDEN || httpStatus == HttpStatus.SC_UNAUTHORIZED)) { + int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig); + return new LZ4Entity(httpEntity, useHttpCompression, true, false, buffSize, true, lz4Factory); } return httpEntity; @@ -803,8 +815,8 @@ public void close() { httpClient.close(CloseMode.IMMEDIATE); } - private static void addHeader(HttpRequest req, String headerName, - String value) + private static void setHeader(HttpRequest req, String headerName, + String value) { if (value == null) { return; @@ -814,10 +826,10 @@ private static void addHeader(HttpRequest req, String headerName, return; } if (PATTERN_HEADER_VALUE_ASCII.matcher(value).matches()) { - req.addHeader(headerName, value); + req.setHeader(headerName, value); } else { try { - req.addHeader( + req.setHeader( headerName + "*", "UTF-8''" + URLEncoder.encode(value, StandardCharsets.UTF_8.name())); } catch (UnsupportedEncodingException e) { diff --git a/client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java b/client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java index 0d37c6624..c9822b1de 100644 --- a/client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java +++ b/client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java @@ -15,7 +15,7 @@ class LZ4Entity implements HttpEntity { - private HttpEntity httpEntity; + private final HttpEntity httpEntity; private final boolean useHttpCompression; diff --git a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java index f46dfc12a..26b404968 100644 --- a/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java @@ -1,6 +1,5 @@ package com.clickhouse.client; -import com.clickhouse.client.api.ClickHouseException; import com.clickhouse.client.api.Client; import com.clickhouse.client.api.ClientConfigProperties; import com.clickhouse.client.api.ClientException; @@ -405,10 +404,12 @@ public static Object[][] testServerErrorHandlingDataProvider() { EnumSet formats = EnumSet.of(ClickHouseFormat.CSV, ClickHouseFormat.TSV, ClickHouseFormat.JSON, ClickHouseFormat.JSONCompact); - Object[][] result = new Object[formats.size() * 3][]; + int permutations = 3; + Object[][] result = new Object[formats.size() * permutations][]; int i = 0; for (ClickHouseFormat format : formats) { + // format, server compression, http compression result[i++] = new Object[]{format, false, false}; result[i++] = new Object[]{format, true, false}; result[i++] = new Object[]{format, true, true}; diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertClientHttpCompressionTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertClientHttpCompressionTests.java index 0eef19c97..18a8f5d3f 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertClientHttpCompressionTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertClientHttpCompressionTests.java @@ -1,8 +1,70 @@ package com.clickhouse.client.insert; +import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.insert.InsertSettings; +import com.clickhouse.client.api.metrics.OperationMetrics; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.data.ClickHouseFormat; +import org.apache.hc.core5.http.HttpHeaders; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.PrintWriter; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.testng.Assert.assertEquals; + public class InsertClientHttpCompressionTests extends InsertTests { public InsertClientHttpCompressionTests() { super(true, true); } + + + @Test(groups = { "integration" }, dataProvider = "insertRawDataCompressedProvider") + public void insertRawDataCompressed(String compressionAlgo) throws Exception { + final String tableName = "raw_data_table"; + final String createSQL = "CREATE TABLE " + tableName + + " (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()"; + + initTable(tableName, createSQL); + + InsertSettings insertSettings = InsertSettings.merge(settings, new InsertSettings()); + insertSettings.setInputStreamCopyBufferSize(8198 * 2); + insertSettings.httpHeader(HttpHeaders.CONTENT_ENCODING, compressionAlgo); + ByteArrayOutputStream data = new ByteArrayOutputStream(); + PrintWriter writer = new PrintWriter(data); + for (int i = 0; i < 1000; i++) { + writer.printf("%d\t%s\t%s\t%d\t%s\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2"); + } + writer.flush(); + InsertResponse response = client.insert(tableName, new ByteArrayInputStream(data.toByteArray()), + ClickHouseFormat.TSV, insertSettings).get(30, TimeUnit.SECONDS); + OperationMetrics metrics = response.getMetrics(); + assertEquals((int)response.getWrittenRows(), 1000 ); + + List records = client.queryAll("SELECT * FROM " + tableName); + assertEquals(records.size(), 1000); + + for (int i = 0; i < records.size(); i++) { + assertEquals(records.get(i).getInteger(1), i); + assertEquals(records.get(i).getString("event_ts"), "2021-01-01 00:00:00"); + assertEquals(records.get(i).getString("name"), "name" + i); + assertEquals(records.get(i).getInteger("p1"), i); + assertEquals(records.get(i).getString("p2"), "p2"); + } + } + + @DataProvider(name = "insertRawDataCompressedProvider") + public Object[][] insertRawDataCompressedProvider() { + return new Object[][] { + { "lz4" }, + { "zstd" }, + { "deflate" }, + { "gzip" }, + }; + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java index a04ae6fc3..ee98c184b 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertTests.java @@ -62,14 +62,14 @@ @Test(groups = {"integration"}) public class InsertTests extends BaseIntegrationTest { - private Client client; - private InsertSettings settings; + protected Client client; + protected InsertSettings settings; private boolean useClientCompression = false; private boolean useHttpCompression = false; - private static final int EXECUTE_CMD_TIMEOUT = 10; // seconds + static final int EXECUTE_CMD_TIMEOUT = 10; // seconds InsertTests() { } @@ -262,6 +262,14 @@ public void insertRawData() throws Exception { List records = client.queryAll("SELECT * FROM " + tableName); assertEquals(records.size(), 1000); + + for (int i = 0; i < records.size(); i++) { + assertEquals(records.get(i).getInteger(1), i); + assertEquals(records.get(i).getString("event_ts"), "2021-01-01 00:00:00"); + assertEquals(records.get(i).getString("name"), "name" + i); + assertEquals(records.get(i).getInteger("p1"), i); + assertEquals(records.get(i).getString("p2"), "p2"); + } } @Test(groups = { "integration" }, dataProvider = "insertRawDataAsyncProvider", dataProviderClass = InsertTests.class) diff --git a/client-v2/src/test/java/com/clickhouse/client/query/QueryServerHttpCompressionTests.java b/client-v2/src/test/java/com/clickhouse/client/query/QueryServerHttpCompressionTests.java index cddf6a4b5..57e9cad16 100644 --- a/client-v2/src/test/java/com/clickhouse/client/query/QueryServerHttpCompressionTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/query/QueryServerHttpCompressionTests.java @@ -1,7 +1,59 @@ package com.clickhouse.client.query; +import com.clickhouse.client.api.data_formats.internal.BinaryStreamReader; +import com.clickhouse.client.api.query.GenericRecord; +import com.clickhouse.client.api.query.QuerySettings; +import org.apache.hc.core5.http.HttpHeaders; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + public class QueryServerHttpCompressionTests extends QueryTests { QueryServerHttpCompressionTests() { super(true, true); } + + + @Test(groups = {"integration"}, dataProvider = "testQueryCompressedProvider") + public void testQueryCompressed(String compressAlgo) throws Exception { + List> dataset = prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 10); + QuerySettings settings = new QuerySettings(); + settings.httpHeader(HttpHeaders.ACCEPT_ENCODING, compressAlgo); + List records = client.queryAll("SELECT * FROM " + DATASET_TABLE + " LIMIT " + dataset.size(), settings); + Assert.assertFalse(records.isEmpty()); + + for (String colDefinition : DATASET_COLUMNS) { + // result values + String colName = colDefinition.split(" ")[0]; + List colValues = records.stream().map(r -> { + Object v = r.getObject(colName); + if (v instanceof BinaryStreamReader.ArrayValue) { + v = ((BinaryStreamReader.ArrayValue)v).asList(); + } + + return v; + } + + ).collect(Collectors.toList()); + Assert.assertEquals(colValues.size(), dataset.size()); + + // dataset values + List dataValue = dataset.stream().map(d -> d.get(colName)).collect(Collectors.toList()); + Assert.assertEquals(colValues, dataValue, "Failed for column " + colName); + } + } + + @DataProvider(name = "testQueryCompressedProvider") + public Object[][] testQueryCompressedProvider() { + return new Object[][] { + { "lz4" }, + { "zstd" }, + { "deflate" }, + { "gzip" }, + }; + } } diff --git a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java index ce6d97690..b8b1da74d 100644 --- a/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/query/QueryTests.java @@ -96,7 +96,7 @@ public class QueryTests extends BaseIntegrationTest { private final static Random RANDOM = new Random(); - private Client client; + protected Client client; private boolean useServerCompression = false; @@ -1455,7 +1455,7 @@ public void testQueryMetrics() throws Exception { } } - private final static List DATASET_COLUMNS = Arrays.asList( + protected final static List DATASET_COLUMNS = Arrays.asList( "col1 UInt32", "col2 Int32", "col3 String", @@ -1465,7 +1465,7 @@ public void testQueryMetrics() throws Exception { "col7 Array(Int32)" ); - private final static List> DATASET_VALUE_GENERATORS = Arrays.asList( + protected final static List> DATASET_VALUE_GENERATORS = Arrays.asList( c -> Long.valueOf(RANDOM.nextInt(Integer.MAX_VALUE)), c -> RANDOM.nextInt(Integer.MAX_VALUE), c -> "value_" + RANDOM.nextInt(Integer.MAX_VALUE), @@ -1475,13 +1475,13 @@ public void testQueryMetrics() throws Exception { c -> RANDOM.ints(10, 0, Integer.MAX_VALUE).boxed().collect(Collectors.toList()) ); - private final static String DATASET_TABLE = "query_test_table"; + protected final static String DATASET_TABLE = "query_test_table"; private Map prepareSimpleDataSet() { return prepareDataSet(DATASET_TABLE, DATASET_COLUMNS, DATASET_VALUE_GENERATORS, 1).get(0); } - private List> prepareDataSet(String table, List columns, List> valueGenerators, + protected List> prepareDataSet(String table, List columns, List> valueGenerators, int rows) { List> data = new ArrayList<>(rows);