Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import java.util.Map;
import java.util.TimeZone;
import java.util.UUID;

import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
Expand All @@ -35,7 +34,7 @@
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import ru.yandex.clickhouse.domain.ClickHouseCompression;
import ru.yandex.clickhouse.domain.ClickHouseFormat;
import ru.yandex.clickhouse.except.ClickHouseException;
import ru.yandex.clickhouse.except.ClickHouseExceptionSpecifier;
Expand Down Expand Up @@ -1014,7 +1013,7 @@ void sendStream(Writer writer, HttpEntity content) throws ClickHouseException {

HttpPost httpPost = new HttpPost(uri);

if (writer.getCompression() != null) {
if (writer.getCompression() != ClickHouseCompression.none) {
httpPost.addHeader("Content-Encoding", writer.getCompression().name());
}
httpPost.setEntity(content);
Expand Down
49 changes: 26 additions & 23 deletions clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/Writer.java
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package ru.yandex.clickhouse;

import org.apache.http.HttpEntity;
import org.apache.http.entity.InputStreamEntity;
import ru.yandex.clickhouse.domain.ClickHouseCompression;
import ru.yandex.clickhouse.domain.ClickHouseFormat;
import ru.yandex.clickhouse.util.ClickHouseStreamCallback;
import ru.yandex.clickhouse.util.ClickHouseStreamHttpEntity;
import static ru.yandex.clickhouse.domain.ClickHouseFormat.Native;
import static ru.yandex.clickhouse.domain.ClickHouseFormat.RowBinary;
import static ru.yandex.clickhouse.domain.ClickHouseFormat.TabSeparated;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.SQLException;
import java.util.Objects;

import static ru.yandex.clickhouse.domain.ClickHouseFormat.Native;
import static ru.yandex.clickhouse.domain.ClickHouseFormat.RowBinary;
import static ru.yandex.clickhouse.domain.ClickHouseFormat.TabSeparated;
import org.apache.http.HttpEntity;
import org.apache.http.entity.InputStreamEntity;
import ru.yandex.clickhouse.domain.ClickHouseCompression;
import ru.yandex.clickhouse.domain.ClickHouseFormat;
import ru.yandex.clickhouse.settings.ClickHouseQueryParam;
import ru.yandex.clickhouse.util.ClickHouseStreamCallback;
import ru.yandex.clickhouse.util.ClickHouseStreamHttpEntity;

public class Writer extends ConfigurableApi<Writer> {

Expand All @@ -27,10 +29,12 @@ public class Writer extends ConfigurableApi<Writer> {

Writer(ClickHouseStatementImpl statement) {
super(statement);

dataCompression(ClickHouseCompression.none);
}

/**
* Specifies format for further insert of data via send()
* Specifies format for further insert of data via send().
*
* @param format
* the format of the data to upload
Expand All @@ -45,7 +49,7 @@ public Writer format(ClickHouseFormat format) {
}

/**
* Set table name for data insertion
* Set table name for data insertion.
*
* @param table
* name of the table to upload the data to
Expand All @@ -58,7 +62,7 @@ public Writer table(String table) {
}

/**
* Set SQL for data insertion
* Set SQL for data insertion.
*
* @param sql
* in a form "INSERT INTO table_name [(X,Y,Z)] VALUES "
Expand All @@ -71,7 +75,7 @@ public Writer sql(String sql) {
}

/**
* Specifies data input stream
* Specifies data input stream.
*
* @param stream
* a stream providing the data to upload
Expand All @@ -83,7 +87,7 @@ public Writer data(InputStream stream) {
}

/**
* Specifies data input stream, and the format to use
* Specifies data input stream, and the format to use.
*
* @param stream
* a stream providing the data to upload
Expand All @@ -96,7 +100,7 @@ public Writer data(InputStream stream, ClickHouseFormat format) {
}

/**
* Shortcut method for specifying a file as an input
* Shortcut method for specifying a file as an input.
*
* @param input
* the file to upload
Expand All @@ -116,10 +120,9 @@ public Writer data(File input, ClickHouseFormat format, ClickHouseCompression co
}

public Writer dataCompression(ClickHouseCompression compression) {
if (null == compression) {
throw new NullPointerException("Compression can not be null");
}
this.compression = compression;
this.compression = Objects.requireNonNull(compression, "Compression can not be null");
this.addDbParam(ClickHouseQueryParam.COMPRESS, String.valueOf(compression != ClickHouseCompression.none));

return this;
}

Expand All @@ -128,7 +131,7 @@ public Writer data(File input, ClickHouseFormat format) {
}

/**
* Method to call, when Writer is fully configured
* Method to call, when Writer is fully configured.
*/
public void send() throws SQLException {
HttpEntity entity;
Expand All @@ -149,7 +152,7 @@ private void send(HttpEntity entity) throws SQLException {
}

/**
* Allows to send stream of data to ClickHouse
* Allows to send stream of data to ClickHouse.
*
* @param sql
* in a form of "INSERT INTO table_name (X,Y,Z) VALUES "
Expand All @@ -165,7 +168,7 @@ public void send(String sql, InputStream data, ClickHouseFormat format) throws S
}

/**
* Convenient method for importing the data into table
* Convenient method for importing the data into table.
*
* @param table
* table name
Expand All @@ -182,7 +185,7 @@ public void sendToTable(String table, InputStream data, ClickHouseFormat format)

/**
* Sends the data in {@link ClickHouseFormat#RowBinary RowBinary} or in
* {@link ClickHouseFormat#Native Native} format
* {@link ClickHouseFormat#Native Native} format.
*
* @param sql
* the SQL statement to execute
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package ru.yandex.clickhouse.integration;

import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseContainerForTest;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.domain.ClickHouseCompression;
import ru.yandex.clickhouse.domain.ClickHouseFormat;

import java.io.*;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.ResultSet;
Expand All @@ -19,6 +15,14 @@
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.zip.GZIPOutputStream;
import org.testng.Assert;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
import ru.yandex.clickhouse.ClickHouseConnection;
import ru.yandex.clickhouse.ClickHouseContainerForTest;
import ru.yandex.clickhouse.ClickHouseDataSource;
import ru.yandex.clickhouse.domain.ClickHouseCompression;
import ru.yandex.clickhouse.domain.ClickHouseFormat;

public class StreamSQLTest {
private static final DateTimeFormatter DATE_TIME_FORMATTER_TZ =
Expand Down Expand Up @@ -49,8 +53,8 @@ public void simpleCSVInsert() throws SQLException {
String string = "5,6\n1,6";
InputStream inputStream = new ByteArrayInputStream(string.getBytes(Charset.forName("UTF-8")));

connection.createStatement().
write()
connection.createStatement()
.write()
.sql("insert into test.csv_stream_sql format CSV")
.data(inputStream)
.send();
Expand Down Expand Up @@ -114,7 +118,7 @@ public void multiRowTSVInsert() throws SQLException {
private InputStream gzStream( InputStream is ) throws IOException
{
final int bufferSize = 16384;
byte data[] = new byte[bufferSize];
byte[] data = new byte[bufferSize];
ByteArrayOutputStream os = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(os);
BufferedInputStream es = new BufferedInputStream(is, bufferSize);
Expand All @@ -137,8 +141,8 @@ public void multiRowTSVInsertCompressed() throws SQLException, IOException {
final int rowsCount = 100000;

InputStream gz = gzStream(getTSVStream(rowsCount));
connection.createStatement().
write()
connection.createStatement()
.write()
.sql("insert into test.tsv_compressed_stream_sql format TSV")
.data(gz, ClickHouseFormat.TSV, ClickHouseCompression.gzip)
.send();
Expand All @@ -151,6 +155,30 @@ public void multiRowTSVInsertCompressed() throws SQLException, IOException {
Assert.assertEquals(rs.getInt("uniq"), rowsCount);
}

@Test
public void multiRowTSVInsertNotCompressed() throws SQLException, IOException {
connection.createStatement().execute("DROP TABLE IF EXISTS test.tsv_not_compressed_stream_sql");
connection.createStatement().execute(
"CREATE TABLE test.tsv_not_compressed_stream_sql (value Int32, string_value String) ENGINE = Log()"
);

final int rowsCount = 100000;

InputStream in = getTSVStream(rowsCount);
connection.createStatement()
.write()
.sql("insert into test.tsv_not_compressed_stream_sql format TSV")
.data(in, ClickHouseFormat.TSV, ClickHouseCompression.none)
.send();

ResultSet rs = connection.createStatement().executeQuery(
"SELECT count() AS cnt, sum(value) AS sum, uniqExact(string_value) uniq FROM test.tsv_not_compressed_stream_sql");
Assert.assertTrue(rs.next());
Assert.assertEquals(rs.getInt("cnt"), rowsCount);
Assert.assertEquals(rs.getInt("sum"), rowsCount);
Assert.assertEquals(rs.getInt("uniq"), rowsCount);
}


@Test
public void JSONEachRowInsert() throws SQLException {
Expand Down