Skip to content

Commit

Permalink
Fix deserialization issue in performance mode
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Jun 25, 2022
1 parent d3d8089 commit de865c3
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 45 deletions.
Expand Up @@ -180,8 +180,8 @@ static ClickHouseInputStream getAsyncResponseInputStream(ClickHouseConfig config
.createPipedOutputStream(config, null);
wrappedInput = getResponseInputStream(config, decompressedStream.getInputStream(), postCloseAction);
submit(() -> {
try (ClickHouseInputStream in = ClickHouseInputStream.of(input, config.getReadBufferSize());
ClickHouseOutputStream out = decompressedStream) {
try (ClickHouseInputStream in = ClickHouseInputStream.of(input, config.getReadBufferSize(),
config.getResponseCompressAlgorithm(), null); ClickHouseOutputStream out = decompressedStream) {
in.pipe(out);
}
return null;
Expand Down
Expand Up @@ -264,9 +264,10 @@ public long skip(long n) throws IOException {

// peforms better but this is a bit tricky
if (n == Long.MAX_VALUE) {
int avail = 0;
long counter = (long) limit - position;
while (updateBuffer() > 0) {
counter += limit;
while ((avail = updateBuffer()) > 0) {
counter += avail;
}

return counter;
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.stream.Collectors;

import com.clickhouse.client.ClickHouseClientBuilder.Agent;
import com.clickhouse.client.config.ClickHouseBufferingMode;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseRenameMethod;
import com.clickhouse.client.config.ClickHouseSslMode;
Expand Down Expand Up @@ -93,11 +94,27 @@ protected ClickHouseNode getSecureServer() {

@DataProvider(name = "compressionMatrix")
protected Object[][] getCompressionMatrix() {
return new Object[][] {
new Object[] { false, false },
new Object[] { true, false },
new Object[] { true, true },
new Object[] { false, true } };
ClickHouseFormat[] formats = new ClickHouseFormat[] {
ClickHouseFormat.RowBinaryWithNamesAndTypes,
ClickHouseFormat.TabSeparatedWithNamesAndTypes
};
ClickHouseBufferingMode[] modes = new ClickHouseBufferingMode[] {
ClickHouseBufferingMode.RESOURCE_EFFICIENT,
ClickHouseBufferingMode.PERFORMANCE
};
boolean[] bools = new boolean[] { true, false };
Object[][] array = new Object[formats.length * modes.length * 2 * 2][4];
int i = 0;
for (ClickHouseFormat format : formats) {
for (ClickHouseBufferingMode mode : modes) {
for (boolean compress : bools) {
for (boolean decompress : bools) {
array[i++] = new Object[] { format, mode, compress, decompress };
}
}
}
}
return array;
}

@DataProvider(name = "renameMethods")
Expand Down Expand Up @@ -187,47 +204,44 @@ public void testOpenCloseClient() throws Exception {
}

@Test(dataProvider = "compressionMatrix", groups = { "integration" })
public void testCompression(boolean compressRequest, boolean compressResponse)
throws ClickHouseException {
public void testCompression(ClickHouseFormat format, ClickHouseBufferingMode bufferingMode,
boolean compressRequest, boolean compressResponse) throws ClickHouseException {
ClickHouseNode server = getServer();
String uuid = UUID.randomUUID().toString();
for (ClickHouseFormat format : new ClickHouseFormat[] {
ClickHouseFormat.RowBinaryWithNamesAndTypes,
ClickHouseFormat.TabSeparatedWithNamesAndTypes }) {
try (ClickHouseClient client = getClient()) {
ClickHouseRequest<?> request = client.connect(server)
.format(format)
.compressServerResponse(compressResponse)
.decompressClientRequest(compressRequest);
boolean hasResult = false;
try (ClickHouseResponse resp = request
.query("select :uuid").params(ClickHouseStringValue.of(uuid)).executeAndWait()) {
Assert.assertEquals(resp.firstRecord().getValue(0).asString(), uuid);
hasResult = true;
}
Assert.assertTrue(hasResult, "Should have at least one result");
try (ClickHouseClient client = getClient()) {
ClickHouseRequest<?> request = client.connect(server)
.format(format)
.option(ClickHouseClientOption.RESPONSE_BUFFERING, bufferingMode)
.compressServerResponse(compressResponse)
.decompressClientRequest(compressRequest);
boolean hasResult = false;
try (ClickHouseResponse resp = request
.query("select :uuid").params(ClickHouseStringValue.of(uuid)).executeAndWait()) {
Assert.assertEquals(resp.firstRecord().getValue(0).asString(), uuid);
hasResult = true;
}
Assert.assertTrue(hasResult, "Should have at least one result");

// empty results
try (ClickHouseResponse resp = request
.query("create database if not exists system")
.executeAndWait()) {
ClickHouseResponseSummary summary = resp.getSummary();
Assert.assertEquals(summary.getReadRows(), 0L);
Assert.assertEquals(summary.getWrittenRows(), 0L);
}
// empty results
try (ClickHouseResponse resp = request
.query("create database if not exists system")
.executeAndWait()) {
ClickHouseResponseSummary summary = resp.getSummary();
Assert.assertEquals(summary.getReadRows(), 0L);
Assert.assertEquals(summary.getWrittenRows(), 0L);
}

// let's also check if failures can be captured successfully as well
ClickHouseException exp = null;
try (ClickHouseResponse resp = request
.use(uuid)
.query("select currentUser(), timezone(), version(), getSetting('readonly') readonly FORMAT RowBinaryWithNamesAndTypes")
.executeAndWait()) {
Assert.fail("Query should fail");
} catch (ClickHouseException e) {
exp = e;
}
Assert.assertEquals(exp.getErrorCode(), 81);
// let's also check if failures can be captured successfully as well
ClickHouseException exp = null;
try (ClickHouseResponse resp = request
.use(uuid)
.query("select currentUser(), timezone(), version(), getSetting('readonly') readonly FORMAT RowBinaryWithNamesAndTypes")
.executeAndWait()) {
Assert.fail("Query should fail");
} catch (ClickHouseException e) {
exp = e;
}
Assert.assertEquals(exp.getErrorCode(), 81);
}
}

Expand Down

0 comments on commit de865c3

Please sign in to comment.