Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public enum ClickHouseDataType implements SQLType {
Dynamic(Object.class, true, true, false, 0, 0, 0, 0, 0, true, 0x2B),
Time(LocalDateTime.class, true, false, false, 4, 9, 0, 0, 9, false, 0x32), // 0x33 for Time(Timezone)
Time64(LocalDateTime.class, true, false, false, 8, 9, 0, 0, 0, false, 0x34), // 0x35 for Time64(P, Timezone)
QBit(Double.class, true, true, false, 0, 0, 0, 0, 0, true, 0x36),
QBit(Double.class, true, true, false, 0, 0, 0, 0, 0, false, 0x36),
;

public static final List<ClickHouseDataType> ORDERED_BY_RANGE_INT_TYPES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,7 +423,7 @@ public boolean isWidenUnsignedTypes() {
if (type.isNested() || type == ClickHouseDataType.AggregateFunction
|| type == ClickHouseDataType.SimpleAggregateFunction || type == ClickHouseDataType.Enum
|| type == ClickHouseDataType.Nullable || type == ClickHouseDataType.BFloat16 ||
type == ClickHouseDataType.Time || type == ClickHouseDataType.Time64) {
type == ClickHouseDataType.Time || type == ClickHouseDataType.Time64 || type == ClickHouseDataType.QBit) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,10 @@ protected static Map<String, String> createDefaultHeaders(ClickHouseConfig confi
}
// Also, you can use the ‘default_format’ URL parameter
map.put("x-clickhouse-format", config.getFormat().name());
if (config.isResponseCompressed()) {
if (config.isResponseCompressed() && config.getResponseCompressAlgorithm() != ClickHouseCompression.LZ4) {
map.put("accept-encoding", config.getResponseCompressAlgorithm().encoding());
}
if (config.isRequestCompressed()
&& config.getRequestCompressAlgorithm() != ClickHouseCompression.LZ4) {
if (config.isRequestCompressed() && config.getRequestCompressAlgorithm() != ClickHouseCompression.LZ4) {
map.put("content-encoding", config.getRequestCompressAlgorithm().encoding());
}
return map;
Expand Down
6 changes: 6 additions & 0 deletions client-v2/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,12 @@
<version>5.19.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
<version>1.5.7-6</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package com.clickhouse.client.api.internal;

import org.apache.commons.compress.compressors.CompressorException;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.hc.core5.function.Supplier;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;
import java.util.Set;

public class CompressedEntity implements HttpEntity {

private HttpEntity httpEntity;
Copy link

Copilot AI Nov 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The httpEntity field should be declared as final for consistency with the LZ4Entity class (line 18 of LZ4Entity.java) and to prevent accidental reassignment since it's set only in the constructor.

Suggested change
private HttpEntity httpEntity;
private final HttpEntity httpEntity;

Copilot uses AI. Check for mistakes.
private final boolean isResponse;
private final CompressorStreamFactory compressorStreamFactory;
private final String compressionAlgo;

CompressedEntity(HttpEntity httpEntity, boolean isResponse, CompressorStreamFactory compressorStreamFactory) {
this.httpEntity = httpEntity;
this.isResponse = isResponse;
this.compressorStreamFactory = compressorStreamFactory;
this.compressionAlgo = getCompressionAlgoName(httpEntity.getContentEncoding());
}

@Override
public boolean isRepeatable() {
return httpEntity.isRepeatable();
}

@Override
public InputStream getContent() throws IOException, UnsupportedOperationException {
if (!isResponse) {
throw new UnsupportedOperationException("Unsupported: getting compressed content of request");
}

try {
return compressorStreamFactory.createCompressorInputStream(compressionAlgo, httpEntity.getContent());
} catch (CompressorException e) {
throw new IOException("Failed to create decompressing input stream", e);
}
}

@Override
public void writeTo(OutputStream outStream) throws IOException {
if (isResponse) {
// called by us to get compressed response
throw new UnsupportedOperationException("Unsupported: writing compressed response to elsewhere");
}

try {
httpEntity.writeTo(compressorStreamFactory.createCompressorOutputStream(compressionAlgo, outStream));
} catch (CompressorException e) {
throw new IOException("Failed to create compressing output stream", e);
}
}

@Override
public boolean isStreaming() {
return httpEntity.isStreaming();
}

@Override
public Supplier<List<? extends Header>> getTrailers() {
return httpEntity.getTrailers();
}

@Override
public void close() throws IOException {
httpEntity.close();
}

@Override
public long getContentLength() {
return httpEntity.getContentLength();
}

@Override
public String getContentType() {
return httpEntity.getContentType();
}

@Override
public String getContentEncoding() {
return httpEntity.getContentEncoding();
Comment on lines +87 to +88
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The getContentEncoding() method returns the encoding from the wrapped entity, but this might not match the contentEncoding field used for compression/decompression. Consider returning the actual compression encoding used or documenting this behavior.

}

@Override
public boolean isChunked() {
return httpEntity.isChunked();
}

@Override
public Set<String> getTrailerNames() {
return httpEntity.getTrailerNames();
}

private String getCompressionAlgoName(String contentEncoding) {
String algo = contentEncoding;
if (algo.equalsIgnoreCase("gzip")) {
algo = CompressorStreamFactory.GZIP;
} else if (algo.equalsIgnoreCase("lz4")) {
algo = CompressorStreamFactory.LZ4_FRAMED;
}
return algo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.clickhouse.client.api.transport.Endpoint;
import com.clickhouse.data.ClickHouseFormat;
import net.jpountz.lz4.LZ4Factory;
import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.ConnectionConfig;
Expand Down Expand Up @@ -105,6 +106,8 @@ public class HttpAPIClientHelper {

private static final int ERROR_BODY_BUFFER_SIZE = 1024; // Error messages are usually small

private final String DEFAULT_HTTP_COMPRESSION_ALGO = "lz4";

private static final Pattern PATTERN_HEADER_VALUE_ASCII = Pattern.compile(
"\\p{Graph}+(?:[ ]\\p{Graph}+)*");

Expand Down Expand Up @@ -322,6 +325,8 @@ public CloseableHttpClient createHttpClient(boolean initSslContext, Map<String,
clientBuilder.setKeepAliveStrategy((response, context) -> TimeValue.ofMilliseconds(keepAliveTimeout));
}

clientBuilder.disableContentCompression(); // will handle ourselves

return clientBuilder.build();
}

Expand Down Expand Up @@ -427,14 +432,12 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
// req.setVersion(new ProtocolVersion("HTTP", 1, 0)); // to disable chunk transfer encoding
addHeaders(req, requestConfig);

boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig);
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);
boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig);


// setting entity. wrapping if compression is enabled
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback),
clientCompression, useHttpCompression, appCompressedData, lz4Factory, requestConfig));
String contentEncoding = req.containsHeader(HttpHeaders.CONTENT_ENCODING) ? req.getHeader(HttpHeaders.CONTENT_ENCODING).getValue() : null;
req.setEntity(wrapRequestEntity(new EntityTemplate(-1, CONTENT_TYPE, contentEncoding , writeCallback),
lz4Factory,
requestConfig));

HttpClientContext context = HttpClientContext.create();
Number responseTimeout = ClientConfigProperties.SOCKET_OPERATION_TIMEOUT.getOrDefault(requestConfig);
Expand All @@ -448,8 +451,11 @@ public ClassicHttpResponse executeRequest(Endpoint server, Map<String, Object> r
ClassicHttpResponse httpResponse = null;
try {
httpResponse = httpClient.executeOpen(null, req, context);
boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig);
httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(), httpResponse.getCode(), serverCompression, useHttpCompression, lz4Factory, requestConfig));

httpResponse.setEntity(wrapResponseEntity(httpResponse.getEntity(),
httpResponse.getCode(),
lz4Factory,
requestConfig));

if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) {
throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings.");
Expand Down Expand Up @@ -493,30 +499,30 @@ public static void closeQuietly(ClassicHttpResponse httpResponse) {
private static final ContentType CONTENT_TYPE = ContentType.create(ContentType.TEXT_PLAIN.getMimeType(), "UTF-8");

private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
addHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType());
setHeader(req, HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType());
if (requestConfig.containsKey(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())) {
addHeader(
setHeader(
req,
ClickHouseHttpProto.HEADER_FORMAT,
((ClickHouseFormat) requestConfig.get(ClientConfigProperties.INPUT_OUTPUT_FORMAT.getKey())).name());
}
if (requestConfig.containsKey(ClientConfigProperties.QUERY_ID.getKey())) {
addHeader(
setHeader(
req,
ClickHouseHttpProto.HEADER_QUERY_ID,
(String) requestConfig.get(ClientConfigProperties.QUERY_ID.getKey()));
}
addHeader(
setHeader(
req,
ClickHouseHttpProto.HEADER_DATABASE,
ClientConfigProperties.DATABASE.getOrDefault(requestConfig));

if (ClientConfigProperties.SSL_AUTH.<Boolean>getOrDefault(requestConfig).booleanValue()) {
addHeader(
setHeader(
req,
ClickHouseHttpProto.HEADER_DB_USER,
ClientConfigProperties.USER.getOrDefault(requestConfig));
addHeader(
setHeader(
req,
ClickHouseHttpProto.HEADER_SSL_CERT_AUTH,
"on");
Expand All @@ -529,11 +535,11 @@ private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {
"Basic " + Base64.getEncoder().encodeToString(
(user + ":" + password).getBytes(StandardCharsets.UTF_8)));
} else {
addHeader(
setHeader(
req,
ClickHouseHttpProto.HEADER_DB_USER,
ClientConfigProperties.USER.getOrDefault(requestConfig));
addHeader(
setHeader(
req,
ClickHouseHttpProto.HEADER_DB_PASSWORD,
ClientConfigProperties.PASSWORD.getOrDefault(requestConfig));
Expand All @@ -551,18 +557,19 @@ private void addHeaders(HttpPost req, Map<String, Object> requestConfig) {

if (useHttpCompression) {
if (serverCompression) {
addHeader(req, HttpHeaders.ACCEPT_ENCODING, "lz4");
setHeader(req, HttpHeaders.ACCEPT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO);
}

if (clientCompression && !appCompressedData) {
addHeader(req, HttpHeaders.CONTENT_ENCODING, "lz4");
setHeader(req, HttpHeaders.CONTENT_ENCODING, DEFAULT_HTTP_COMPRESSION_ALGO);
}
}

for (String key : requestConfig.keySet()) {
if (key.startsWith(ClientConfigProperties.HTTP_HEADER_PREFIX)) {
Object val = requestConfig.get(key);
if (val != null) {
addHeader(
setHeader(
req,
key.substring(ClientConfigProperties.HTTP_HEADER_PREFIX.length()),
String.valueOf(val));
Expand Down Expand Up @@ -626,11 +633,19 @@ private void addQueryParams(URIBuilder req, Map<String, Object> requestConfig) {
}
}

private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompression, boolean useHttpCompression,
boolean appControlledCompression, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {
LOG.debug("wrapRequestEntity: client compression: {}, http compression: {}", clientCompression, useHttpCompression);
private HttpEntity wrapRequestEntity(HttpEntity httpEntity, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {

if (clientCompression && !appControlledCompression) {
boolean clientCompression = ClientConfigProperties.COMPRESS_CLIENT_REQUEST.getOrDefault(requestConfig);
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);
boolean appCompressedData = ClientConfigProperties.APP_COMPRESSED_DATA.getOrDefault(requestConfig);

LOG.debug("wrapRequestEntity: client compression: {}, http compression: {}, content encoding: {}",
clientCompression, useHttpCompression, httpEntity.getContentEncoding());

if (httpEntity.getContentEncoding() != null && !appCompressedData) {
// http header is set and data is not compressed
return new CompressedEntity(httpEntity, false, CompressorStreamFactory.getSingleton());
} else if (clientCompression && !appCompressedData) {
int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig);
return new LZ4Entity(httpEntity, useHttpCompression, false, true,
buffSize, false, lz4Factory);
Expand All @@ -639,25 +654,22 @@ private HttpEntity wrapRequestEntity(HttpEntity httpEntity, boolean clientCompre
}
}

private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, boolean serverCompression, boolean useHttpCompression, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {
LOG.debug("wrapResponseEntity: server compression: {}, http compression: {}", serverCompression, useHttpCompression);

if (serverCompression) {
// Server doesn't compress certain errors like 403
switch (httpStatus) {
case HttpStatus.SC_OK:
case HttpStatus.SC_CREATED:
case HttpStatus.SC_ACCEPTED:
case HttpStatus.SC_NO_CONTENT:
case HttpStatus.SC_PARTIAL_CONTENT:
case HttpStatus.SC_RESET_CONTENT:
case HttpStatus.SC_NOT_MODIFIED:
case HttpStatus.SC_BAD_REQUEST:
case HttpStatus.SC_INTERNAL_SERVER_ERROR:
case HttpStatus.SC_NOT_FOUND:
int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig);
return new LZ4Entity(httpEntity, useHttpCompression, true, false, buffSize, true, lz4Factory);
}
private HttpEntity wrapResponseEntity(HttpEntity httpEntity, int httpStatus, LZ4Factory lz4Factory, Map<String, Object> requestConfig) {
boolean serverCompression = ClientConfigProperties.COMPRESS_SERVER_RESPONSE.getOrDefault(requestConfig);
boolean useHttpCompression = ClientConfigProperties.USE_HTTP_COMPRESSION.getOrDefault(requestConfig);

LOG.debug("wrapResponseEntity: server compression: {}, http compression: {}, content encoding: {}",
serverCompression, useHttpCompression, httpEntity.getContentEncoding());

if (httpEntity.getContentEncoding() != null) {
// http compressed response
return new CompressedEntity(httpEntity, true, CompressorStreamFactory.getSingleton());
}

// data compression
if (serverCompression && !(httpStatus == HttpStatus.SC_FORBIDDEN || httpStatus == HttpStatus.SC_UNAUTHORIZED)) {
int buffSize = ClientConfigProperties.COMPRESSION_LZ4_UNCOMPRESSED_BUF_SIZE.getOrDefault(requestConfig);
return new LZ4Entity(httpEntity, useHttpCompression, true, false, buffSize, true, lz4Factory);
}

return httpEntity;
Expand Down Expand Up @@ -803,8 +815,8 @@ public void close() {
httpClient.close(CloseMode.IMMEDIATE);
}

private static <T> void addHeader(HttpRequest req, String headerName,
String value)
private static <T> void setHeader(HttpRequest req, String headerName,
String value)
{
if (value == null) {
return;
Expand All @@ -814,10 +826,10 @@ private static <T> void addHeader(HttpRequest req, String headerName,
return;
}
if (PATTERN_HEADER_VALUE_ASCII.matcher(value).matches()) {
req.addHeader(headerName, value);
req.setHeader(headerName, value);
} else {
try {
req.addHeader(
req.setHeader(
headerName + "*",
"UTF-8''" + URLEncoder.encode(value, StandardCharsets.UTF_8.name()));
} catch (UnsupportedEncodingException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

class LZ4Entity implements HttpEntity {

private HttpEntity httpEntity;
private final HttpEntity httpEntity;

private final boolean useHttpCompression;

Expand Down
Loading
Loading