Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -384,13 +384,25 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj

private void addHeaders(HttpPost req, Map<String, String> chConfig, Map<String, Object> requestConfig) {
req.addHeader(HttpHeaders.CONTENT_TYPE, CONTENT_TYPE.getMimeType());
if (requestConfig.containsKey(ClickHouseClientOption.FORMAT.getKey())) {
req.addHeader(ClickHouseHttpProto.HEADER_FORMAT, requestConfig.get(ClickHouseClientOption.FORMAT.getKey()));
}
if (requestConfig.containsKey(ClickHouseClientOption.QUERY_ID.getKey())) {
req.addHeader(ClickHouseHttpProto.HEADER_QUERY_ID, requestConfig.get(ClickHouseClientOption.QUERY_ID.getKey()).toString());
if (requestConfig != null) {
if (requestConfig.containsKey(ClickHouseClientOption.FORMAT.getKey())) {
req.addHeader(ClickHouseHttpProto.HEADER_FORMAT, requestConfig.get(ClickHouseClientOption.FORMAT.getKey()));
}
if (requestConfig.containsKey(ClickHouseClientOption.QUERY_ID.getKey())) {
req.addHeader(ClickHouseHttpProto.HEADER_QUERY_ID, requestConfig.get(ClickHouseClientOption.QUERY_ID.getKey()).toString());
}
if(requestConfig.containsKey(ClickHouseClientOption.DATABASE.getKey())) {
req.addHeader(ClickHouseHttpProto.HEADER_DATABASE, requestConfig.get(ClickHouseClientOption.DATABASE.getKey()));
}else {
req.addHeader(ClickHouseHttpProto.HEADER_DATABASE, chConfig.get(ClickHouseClientOption.DATABASE.getKey()));
}
if (requestConfig.containsKey(ClickHouseClientOption.FORMAT.getKey())) {
req.addHeader(ClickHouseHttpProto.HEADER_FORMAT, requestConfig.get(ClickHouseClientOption.FORMAT.getKey()));
}
if (requestConfig.containsKey(ClickHouseClientOption.QUERY_ID.getKey())) {
req.addHeader(ClickHouseHttpProto.HEADER_QUERY_ID, requestConfig.get(ClickHouseClientOption.QUERY_ID.getKey()).toString());
}
}
req.addHeader(ClickHouseHttpProto.HEADER_DATABASE, chConfig.get(ClickHouseClientOption.DATABASE.getKey()));
req.addHeader(ClickHouseHttpProto.HEADER_DB_USER, chConfig.get(ClickHouseDefaults.USER.getKey()));
if (MapUtils.getFlag(chConfig, "ssl_authentication", false)) {
req.addHeader(ClickHouseHttpProto.HEADER_SSL_CERT_AUTH, "on");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public void insertRawDataSimple(int numberOfRecords) throws Exception {
public void testInsertMetricsOperationId() throws Exception {
final String tableName = "insert_metrics_test";
final String createSQL = "CREATE TABLE " + tableName +
" (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()";
" (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()";
dropTable(tableName);
createTable(createSQL);

Expand All @@ -248,13 +248,50 @@ public void testInsertMetricsOperationId() throws Exception {
writer.flush();

InsertSettings settings = new InsertSettings()
.setQueryId(String.valueOf(UUID.randomUUID()))
.setOperationId(UUID.randomUUID().toString());
.setQueryId(String.valueOf(UUID.randomUUID()))
.setOperationId(UUID.randomUUID().toString());
InsertResponse response = client.insert(tableName, new ByteArrayInputStream(data.toByteArray()),
ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS);
ClickHouseFormat.TSV, settings).get(30, TimeUnit.SECONDS);
OperationMetrics metrics = response.getMetrics();
assertEquals((int)response.getWrittenRows(), numberOfRecords );
assertEquals(metrics.getQueryId(), settings.getQueryId());
assertTrue(metrics.getMetric(ClientMetrics.OP_DURATION).getLong() > 0);
}

@Test(groups = { "integration" })
public void testInsertSettingsAddDatabase() throws Exception {
final String tableName = "insert_settings_database_test";
final String new_database = "new_database";
final String createDatabaseSQL = "CREATE DATABASE " + new_database;
final String createTableSQL = "CREATE TABLE " + new_database + "." + tableName +
" (Id UInt32, event_ts Timestamp, name String, p1 Int64, p2 String) ENGINE = MergeTree() ORDER BY ()";
final String dropDatabaseSQL = "DROP DATABASE IF EXISTS " + new_database;

try (ClickHouseClient client = ClickHouseClient.builder().config(new ClickHouseConfig())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build()) {
client.read(getServer(ClickHouseProtocol.HTTP)).query(dropDatabaseSQL).executeAndWait().close();
client.read(getServer(ClickHouseProtocol.HTTP)).query(createDatabaseSQL).executeAndWait().close();
client.read(getServer(ClickHouseProtocol.HTTP)).query(createTableSQL).executeAndWait().close();
}


InsertSettings insertSettings = settings.setInputStreamCopyBufferSize(8198 * 2)
.setDeduplicationToken(RandomStringUtils.randomAlphabetic(36))
.setQueryId(String.valueOf(UUID.randomUUID()));
insertSettings.setDatabase(new_database);

ByteArrayOutputStream data = new ByteArrayOutputStream();
PrintWriter writer = new PrintWriter(data);
for (int i = 0; i < 1000; i++) {
writer.printf("%d\t%s\t%s\t%d\t%s\n", i, "2021-01-01 00:00:00", "name" + i, i, "p2");
}
writer.flush();
InsertResponse response = client.insert(tableName, new ByteArrayInputStream(data.toByteArray()),
ClickHouseFormat.TSV, insertSettings).get(30, TimeUnit.SECONDS);
assertEquals((int)response.getWrittenRows(), 1000 );

List<GenericRecord> records = client.queryAll("SELECT * FROM " + new_database + "." + tableName);
assertEquals(records.size(), 1000);
}
}
Loading