From c3fb54716b0b96bf3613e6787ba7f0ef5545589b Mon Sep 17 00:00:00 2001 From: Zhichun Wu Date: Tue, 23 Mar 2021 15:01:00 +0800 Subject: [PATCH] Explicitly add "compress" query parameter according to given option, and avoid "Content-Encoding" header when compression option is "none" --- .../clickhouse/ClickHouseStatementImpl.java | 5 +- .../java/ru/yandex/clickhouse/Writer.java | 49 ++++++++-------- .../clickhouse/integration/StreamSQLTest.java | 56 ++++++++++++++----- 3 files changed, 70 insertions(+), 40 deletions(-) diff --git a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java index ea12eba6d..906e2f166 100644 --- a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java +++ b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseStatementImpl.java @@ -18,7 +18,6 @@ import java.util.Map; import java.util.TimeZone; import java.util.UUID; - import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -35,7 +34,7 @@ import org.apache.http.util.EntityUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - +import ru.yandex.clickhouse.domain.ClickHouseCompression; import ru.yandex.clickhouse.domain.ClickHouseFormat; import ru.yandex.clickhouse.except.ClickHouseException; import ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier; @@ -1014,7 +1013,7 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException { HttpPost httpPost = new HttpPost(uri); - if (writer.getCompression() != null) { + if (writer.getCompression() != ClickHouseCompression.none) { httpPost.addHeader("Content-Encoding", writer.getCompression().name()); } httpPost.setEntity(content); diff --git a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/Writer.java b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/Writer.java index 360731f6a..dd24fccec 100644 --- a/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/Writer.java +++ b/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/Writer.java @@ -1,21 +1,23 @@ package ru.yandex.clickhouse; -import org.apache.http.HttpEntity; -import org.apache.http.entity.InputStreamEntity; -import ru.yandex.clickhouse.domain.ClickHouseCompression; -import ru.yandex.clickhouse.domain.ClickHouseFormat; -import ru.yandex.clickhouse.util.ClickHouseStreamCallback; -import ru.yandex.clickhouse.util.ClickHouseStreamHttpEntity; +import static ru.yandex.clickhouse.domain.ClickHouseFormat.Native; +import static ru.yandex.clickhouse.domain.ClickHouseFormat.RowBinary; +import static ru.yandex.clickhouse.domain.ClickHouseFormat.TabSeparated; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.sql.SQLException; +import java.util.Objects; -import static ru.yandex.clickhouse.domain.ClickHouseFormat.Native; -import static ru.yandex.clickhouse.domain.ClickHouseFormat.RowBinary; -import static ru.yandex.clickhouse.domain.ClickHouseFormat.TabSeparated; +import org.apache.http.HttpEntity; +import org.apache.http.entity.InputStreamEntity; +import ru.yandex.clickhouse.domain.ClickHouseCompression; +import ru.yandex.clickhouse.domain.ClickHouseFormat; +import ru.yandex.clickhouse.settings.ClickHouseQueryParam; +import ru.yandex.clickhouse.util.ClickHouseStreamCallback; +import ru.yandex.clickhouse.util.ClickHouseStreamHttpEntity; public class Writer extends ConfigurableApi { @@ -27,10 +29,12 @@ public class Writer extends ConfigurableApi { Writer(ClickHouseStatementImpl statement) { super(statement); + + dataCompression(ClickHouseCompression.none); } /** - * Specifies format for further insert of data via send() + * Specifies format for further insert of data via send(). * * @param format * the format of the data to upload @@ -45,7 +49,7 @@ public Writer format(ClickHouseFormat format) { } /** - * Set table name for data insertion + * Set table name for data insertion. * * @param table * name of the table to upload the data to @@ -58,7 +62,7 @@ public Writer table(String table) { } /** - * Set SQL for data insertion + * Set SQL for data insertion. * * @param sql * in a form "INSERT INTO table_name [(X,Y,Z)] VALUES " @@ -71,7 +75,7 @@ public Writer sql(String sql) { } /** - * Specifies data input stream + * Specifies data input stream. * * @param stream * a stream providing the data to upload @@ -83,7 +87,7 @@ public Writer data(InputStream stream) { } /** - * Specifies data input stream, and the format to use + * Specifies data input stream, and the format to use. * * @param stream * a stream providing the data to upload @@ -96,7 +100,7 @@ public Writer data(InputStream stream, ClickHouseFormat format) { } /** - * Shortcut method for specifying a file as an input + * Shortcut method for specifying a file as an input. * * @param input * the file to upload @@ -116,10 +120,9 @@ public Writer data(File input, ClickHouseFormat format, ClickHouseCompression co } public Writer dataCompression(ClickHouseCompression compression) { - if (null == compression) { - throw new NullPointerException("Compression can not be null"); - } - this.compression = compression; + this.compression = Objects.requireNonNull(compression, "Compression can not be null"); + this.addDbParam(ClickHouseQueryParam.COMPRESS, String.valueOf(compression != ClickHouseCompression.none)); + return this; } @@ -128,7 +131,7 @@ public Writer data(File input, ClickHouseFormat format) { } /** - * Method to call, when Writer is fully configured + * Method to call, when Writer is fully configured. */ public void send() throws SQLException { HttpEntity entity; @@ -149,7 +152,7 @@ private void send(HttpEntity entity) throws SQLException { } /** - * Allows to send stream of data to ClickHouse + * Allows to send stream of data to ClickHouse. * * @param sql * in a form of "INSERT INTO table_name (X,Y,Z) VALUES " @@ -165,7 +168,7 @@ public void send(String sql, InputStream data, ClickHouseFormat format) throws S } /** - * Convenient method for importing the data into table + * Convenient method for importing the data into table. * * @param table * table name @@ -182,7 +185,7 @@ public void sendToTable(String table, InputStream data, ClickHouseFormat format) /** * Sends the data in {@link ClickHouseFormat#RowBinary RowBinary} or in - * {@link ClickHouseFormat#Native Native} format + * {@link ClickHouseFormat#Native Native} format. * * @param sql * the SQL statement to execute diff --git a/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java b/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java index 0d09b36b7..ea207acb3 100644 --- a/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java +++ b/clickhouse-jdbc/src/test/java/ru/yandex/clickhouse/integration/StreamSQLTest.java @@ -1,15 +1,11 @@ package ru.yandex.clickhouse.integration; -import org.testng.Assert; -import org.testng.annotations.BeforeTest; -import org.testng.annotations.Test; -import ru.yandex.clickhouse.ClickHouseConnection; -import ru.yandex.clickhouse.ClickHouseContainerForTest; -import ru.yandex.clickhouse.ClickHouseDataSource; -import ru.yandex.clickhouse.domain.ClickHouseCompression; -import ru.yandex.clickhouse.domain.ClickHouseFormat; -import java.io.*; +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.math.BigDecimal; import java.nio.charset.Charset; import java.sql.ResultSet; @@ -19,6 +15,14 @@ import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.zip.GZIPOutputStream; +import org.testng.Assert; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; +import ru.yandex.clickhouse.ClickHouseConnection; +import ru.yandex.clickhouse.ClickHouseContainerForTest; +import ru.yandex.clickhouse.ClickHouseDataSource; +import ru.yandex.clickhouse.domain.ClickHouseCompression; +import ru.yandex.clickhouse.domain.ClickHouseFormat; public class StreamSQLTest { private static final DateTimeFormatter DATE_TIME_FORMATTER_TZ = @@ -49,8 +53,8 @@ public void simpleCSVInsert() throws SQLException { String string = "5,6\n1,6"; InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8"))); - connection.createStatement(). - write() + connection.createStatement() + .write() .sql("insert into test.csv_stream_sql format CSV") .data(inputStream) .send(); @@ -114,7 +118,7 @@ public void multiRowTSVInsert() throws SQLException { private InputStream gzStream( InputStream is ) throws IOException { final int bufferSize = 16384; - byte data[] = new byte[bufferSize]; + byte[] data = new byte[bufferSize]; ByteArrayOutputStream os = new ByteArrayOutputStream(); GZIPOutputStream gzipOutputStream = new GZIPOutputStream(os); BufferedInputStream es = new BufferedInputStream(is, bufferSize); @@ -137,8 +141,8 @@ public void multiRowTSVInsertCompressed() throws SQLException, IOException { final int rowsCount = 100000; InputStream gz = gzStream(getTSVStream(rowsCount)); - connection.createStatement(). - write() + connection.createStatement() + .write() .sql("insert into test.tsv_compressed_stream_sql format TSV") .data(gz, ClickHouseFormat.TSV, ClickHouseCompression.gzip) .send(); @@ -151,6 +155,30 @@ public void multiRowTSVInsertCompressed() throws SQLException, IOException { Assert.assertEquals(rs.getInt("uniq"), rowsCount); } + @Test + public void multiRowTSVInsertNotCompressed() throws SQLException, IOException { + connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_not_compressed_stream_sql"); + connection.createStatement().execute( + "CREATE TABLE test.tsv_not_compressed_stream_sql (value Int32, string_value String) ENGINE = Log()" + ); + + final int rowsCount = 100000; + + InputStream in = getTSVStream(rowsCount); + connection.createStatement() + .write() + .sql("insert into test.tsv_not_compressed_stream_sql format TSV") + .data(in, ClickHouseFormat.TSV, ClickHouseCompression.none) + .send(); + + ResultSet rs = connection.createStatement().executeQuery( + "SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.tsv_not_compressed_stream_sql"); + Assert.assertTrue(rs.next()); + Assert.assertEquals(rs.getInt("cnt"), rowsCount); + Assert.assertEquals(rs.getInt("sum"), rowsCount); + Assert.assertEquals(rs.getInt("uniq"), rowsCount); + } + @Test public void JSONEachRowInsert() throws SQLException {