From cbc7c8ace9f3d4c63c520be6f0c3f7d5be5e0b85 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 8 Jun 2025 10:55:08 +0300 Subject: [PATCH 01/11] Change values to finals and clean remarks --- .../clickhouse/sink/ClickHouseSinkTests.java | 53 ++++++++++--------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index 6442c5e..57c4b9f 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -31,6 +31,14 @@ public class ClickHouseSinkTests extends FlinkClusterTests { static final int EXPECTED_ROWS = 10000; + static final int MAX_BATCH_SIZE = 5000; + static final int MAX_IN_FLIGHT_REQUESTS = 2; + static final int MAX_BUFFERED_REQUESTS = 20000; + static final long MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024; + static final long MAX_TIME_IN_BUFFER_MS = 5 * 1000; + static final long MAX_RECORD_SIZE_IN_BYTES = 1000; + + static final int STREAM_PARALLELISM = 5; private int executeJob(StreamExecutionEnvironment env, String tableName) throws Exception { JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource"); @@ -72,19 +80,19 @@ void CSVDataTest() throws Exception { ClickHouseServerForTests.executeSql(tableSql); final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); - env.setParallelism(1); + env.setParallelism(STREAM_PARALLELISM); ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); ElementConverter convertorString = new ClickHouseConvertor<>(String.class); // create sink ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( convertorString, - 5000, - 2, - 20000, - 1024 * 1024, - 5 * 1000, - 1000, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, clickHouseClientConfig ); csvSink.setClickHouseFormat(ClickHouseFormat.CSV); @@ -128,13 +136,11 @@ void CovidPOJODataTest() throws Exception { "ORDER BY (location_key, date); "; ClickHouseServerForTests.executeSql(tableSql); - TableSchema covidTableSchema = ClickHouseServerForTests.getTableSchema(tableName); -// POJOConvertor covidPOJOConvertor = POJOSerializable.create().createConvertor(covidTableSchema, CovidPOJO.class); POJOConvertor covidPOJOConvertor = new CovidPOJOConvertor(); final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); - env.setParallelism(5); + env.setParallelism(STREAM_PARALLELISM); ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); clickHouseClientConfig.setSupportDefault(covidTableSchema.hasDefaults()); @@ -142,12 +148,12 @@ void CovidPOJODataTest() throws Exception { ClickHouseAsyncSink covidPOJOSink = new ClickHouseAsyncSink<>( convertorCovid, - 5000, - 2, - 20000, - 1024 * 1024, - 5 * 1000, - 1000, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, clickHouseClientConfig ); @@ -204,11 +210,10 @@ void SimplePOJODataTest() throws Exception { TableSchema simpleTableSchema = ClickHouseServerForTests.getTableSchema(tableName); -// POJOConvertor simplePOJOConvertor = POJOSerializable.create().createConvertor(simpleTableSchema, SimplePOJO.class); POJOConvertor simplePOJOConvertor = new SimplePOJOConvertor(); final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); - env.setParallelism(5); + env.setParallelism(STREAM_PARALLELISM); ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults()); @@ -217,12 +222,12 @@ void SimplePOJODataTest() throws Exception { ClickHouseAsyncSink simplePOJOSink = new ClickHouseAsyncSink<>( convertorCovid, - 5000, - 2, - 20000, - 1024 * 1024, - 5 * 1000, - 1000, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, clickHouseClientConfig ); From 5d3f1ae30d6d9f90a6aca9a3a750a4ea14ee58e2 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 8 Jun 2025 13:29:39 +0300 Subject: [PATCH 02/11] Add Failure Drop data test --- .../main/java/com/clickhouse/utils/Utils.java | 24 +++---- .../sink/ClickHouseAsyncWriter.java | 9 ++- .../clickhouse/sink/ClickHouseSinkTests.java | 68 +++++++++++++++++-- 3 files changed, 82 insertions(+), 19 deletions(-) diff --git a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java index 7b5d5fd..21dc3d0 100644 --- a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java +++ b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java @@ -1,15 +1,13 @@ package com.clickhouse.utils; -import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.api.ServerException; import org.apache.flink.connector.clickhouse.exception.RetriableException; -import org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.SocketTimeoutException; import java.net.UnknownHostException; -import java.util.Collection; public class Utils { @@ -18,18 +16,18 @@ public class Utils { private static final String CLICKHOUSE_CLIENT_ERROR_WRITE_TIMEOUT_MSG = "Write timed out after"; /** - * This will drill down to the first ClickHouseException in the exception chain + * This will drill down to the first ServerException in the exception chain * * @param e Exception to drill down - * @return ClickHouseException or null if none found + * @return ServerException or null if none found */ - public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseException) { + public static Exception getRootCause(Throwable e, Boolean prioritizeServerException) { if (e == null) return null; Throwable runningException = e;//We have to use Throwable because of the getCause() signature while (runningException.getCause() != null && - (!prioritizeClickHouseException || !(runningException instanceof ClickHouseException))) { + (!prioritizeServerException || !(runningException instanceof ServerException))) { LOG.trace("Found exception: {}", runningException.getLocalizedMessage()); runningException = runningException.getCause(); } @@ -46,13 +44,13 @@ public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseEx public static void handleException(Throwable e) { LOG.warn("Deciding how to handle exception: {}", e.getLocalizedMessage()); - //Let's check if we have a ClickHouseException to reference the error code + //Let's check if we have a ServerException to reference the error code //https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp Exception rootCause = Utils.getRootCause(e, true); - if (rootCause instanceof ClickHouseException) { - ClickHouseException clickHouseException = (ClickHouseException) rootCause; - LOG.warn("ClickHouseException code: {}", clickHouseException.getErrorCode()); - switch (clickHouseException.getErrorCode()) { + if (rootCause instanceof ServerException) { + ServerException clickHouseServerException = (ServerException) rootCause; + LOG.warn("ClickHouse Server Exception Code: {}", clickHouseServerException.getCode()); + switch (clickHouseServerException.getCode()) { case 3: // UNEXPECTED_END_OF_FILE case 107: // FILE_DOESNT_EXIST case 159: // TIMEOUT_EXCEEDED @@ -70,7 +68,7 @@ public static void handleException(Throwable e) { case 999: // KEEPER_EXCEPTION throw new RetriableException(e); default: - LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseException.getErrorCode()); + LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseServerException.getCode()); break; } } diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java index 8222d1f..1e781e5 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; public class ClickHouseAsyncWriter extends AsyncSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncWriter.class); @@ -33,6 +32,8 @@ public class ClickHouseAsyncWriter extends AsyncSinkWriter elementConverter, WriterInitContext context, @@ -62,6 +63,8 @@ public ClickHouseAsyncWriter(ElementConverter element this.numBytesSendCounter = metricGroup.getNumBytesSendCounter(); this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter(); this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted"); + this.numOfDroppedBatchesCounter = metricGroup.counter("numOfDroppedBatches"); + this.numOfDroppedRecordsCounter = metricGroup.counter("numOfDroppedRecords"); } @Override @@ -141,7 +144,9 @@ private void handleFailedRequest( // TODO: send data again resultHandler.retryForEntries(requestEntries); } - LOG.info("completeExceptionally"); + LOG.info("Dropping request entries. Since It a failure that can not be retried. error {} number of entries drop {}", error.getLocalizedMessage(), requestEntries.size()); + numOfDroppedBatchesCounter.inc(); + numOfDroppedRecordsCounter.inc(requestEntries.size()); resultHandler.completeExceptionally((Exception)error); } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index 57c4b9f..71fce5a 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -2,6 +2,7 @@ import com.clickhouse.client.api.metadata.TableSchema; import com.clickhouse.data.ClickHouseFormat; +import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.connector.base.sink.writer.ElementConverter; @@ -31,6 +32,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests { static final int EXPECTED_ROWS = 10000; + static final int EXPECTED_ROWS_ON_FAILURE = 0; static final int MAX_BATCH_SIZE = 5000; static final int MAX_IN_FLIGHT_REQUESTS = 2; static final int MAX_BUFFERED_REQUESTS = 20000; @@ -40,7 +42,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests { static final int STREAM_PARALLELISM = 5; - private int executeJob(StreamExecutionEnvironment env, String tableName) throws Exception { + private int executeAsyncJob(StreamExecutionEnvironment env, String tableName) throws Exception { JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource"); int rows = 0; int iterations = 0; @@ -109,7 +111,7 @@ void CSVDataTest() throws Exception { "GzipCsvSource" ); lines.sinkTo(csvSink); - int rows = executeJob(env, tableName); + int rows = executeAsyncJob(env, tableName); Assertions.assertEquals(EXPECTED_ROWS, rows); } @@ -178,7 +180,7 @@ public CovidPOJO map(String value) throws Exception { }); // send to a sink covidPOJOs.sinkTo(covidPOJOSink); - int rows = executeJob(env, tableName); + int rows = executeAsyncJob(env, tableName); Assertions.assertEquals(EXPECTED_ROWS, rows); } @@ -239,7 +241,65 @@ void SimplePOJODataTest() throws Exception { DataStream simplePOJOs = env.fromData(simplePOJOList.toArray(new SimplePOJO[0])); // send to a sink simplePOJOs.sinkTo(simplePOJOSink); - int rows = executeJob(env, tableName); + int rows = executeAsyncJob(env, tableName); Assertions.assertEquals(EXPECTED_ROWS, rows); } + + @Test + void CSVDataOnFailureDropDataTest() throws Exception { + String tableName = "csv_failure_covid"; + String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); + ClickHouseServerForTests.executeSql(dropTable); + // create table + String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" + + "date Date," + + "location_key LowCardinality(String)," + + "new_confirmed Int32," + + "new_deceased Int32," + + "new_recovered Int32," + + "new_tested Int32," + + "cumulative_confirmed Int32," + + "cumulative_deceased Int32," + + "cumulative_recovered Int32," + + "cumulative_tested Int32" + + ") " + + "ENGINE = MergeTree " + + "ORDER BY (location_key, date); "; + ClickHouseServerForTests.executeSql(tableSql); + + final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); + env.setParallelism(STREAM_PARALLELISM); + + + ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); + ElementConverter convertorString = new ClickHouseConvertor<>(String.class); + // create sink + ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( + convertorString, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, + clickHouseClientConfig + ); + csvSink.setClickHouseFormat(ClickHouseFormat.TSV); + + Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz"); + + FileSource source = FileSource + .forRecordStreamFormat(new TextLineInputFormat(), filePath) + .build(); + // read csv data from file + DataStreamSource lines = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "GzipCsvSource" + ); + lines.sinkTo(csvSink); + // TODO: make the test smarter by checking the counter of numOfDroppedRecords equals EXPECTED_ROWS + int rows = executeAsyncJob(env, tableName); + Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows); + } } From 26558ba974c6cc7e4d1e1789d33020c7d3feeb30 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 8 Jun 2025 16:04:15 +0300 Subject: [PATCH 03/11] Added retry functionality and test (CSVDataOnRetryAndDropDataTest) --- .../main/java/com/clickhouse/utils/Utils.java | 8 ++ .../clickhouse/data/ClickHousePayload.java | 3 + .../sink/ClickHouseAsyncWriter.java | 74 +++++++++++++++---- .../clickhouse/sink/ClickHouseSinkTests.java | 67 +++++++++++++++++ .../connector/test/FlinkClusterTests.java | 4 + .../clickhouse/ClickHouseServerForTests.java | 5 ++ 6 files changed, 148 insertions(+), 13 deletions(-) diff --git a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java index 21dc3d0..1b6cb52 100644 --- a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java +++ b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java @@ -67,6 +67,14 @@ public static void handleException(Throwable e) { case 425: // SYSTEM_ERROR case 999: // KEEPER_EXCEPTION throw new RetriableException(e); + case 0: + switch (clickHouseServerException.getTransportProtocolCode()) { + case 400: // Bad request + case 500: // Internal server error + throw new RetriableException(e); + default: + LOG.error("Error code [{}] wasn't in the acceptable list. Transport protocol code [{}]", clickHouseServerException.getCode(), clickHouseServerException.getTransportProtocolCode()); + } default: LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseServerException.getCode()); break; diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java index 25fb0ae..24f6f08 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java @@ -9,10 +9,13 @@ public class ClickHousePayload implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(ClickHousePayload.class); private static final long serialVersionUID = 1L; + private int attemptCount = 1; private final byte[] payload; public ClickHousePayload(byte[] payload) { this.payload = payload; } public byte[] getPayload() { return payload; } public int getPayloadLength() { return payload.length; } + public int getAttemptCount() { return attemptCount; } + public void incrementAttempts() { attemptCount++; } } diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java index 1e781e5..33560f9 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java @@ -22,18 +22,22 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; public class ClickHouseAsyncWriter extends AsyncSinkWriter { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncWriter.class); + private static final int DEFAULT_MAX_RETRIES = 3; private final ClickHouseClientConfig clickHouseClientConfig; private ClickHouseFormat clickHouseFormat = null; + private int numberOfRetries = DEFAULT_MAX_RETRIES; private final Counter numBytesSendCounter; private final Counter numRecordsSendCounter; private final Counter numRequestSubmittedCounter; private final Counter numOfDroppedBatchesCounter; private final Counter numOfDroppedRecordsCounter; + private final Counter totalBatchRetriesCounter; public ClickHouseAsyncWriter(ElementConverter elementConverter, WriterInitContext context, @@ -43,28 +47,58 @@ public ClickHouseAsyncWriter(ElementConverter element long maxBatchSizeInBytes, long maxTimeInBufferMS, long maxRecordSizeInBytes, + int numberOfRetries, ClickHouseClientConfig clickHouseClientConfig, ClickHouseFormat clickHouseFormat, Collection> state) { super(elementConverter, - context, - AsyncSinkWriterConfiguration.builder() - .setMaxBatchSize(maxBatchSize) - .setMaxBatchSizeInBytes(maxBatchSizeInBytes) - .setMaxInFlightRequests(maxInFlightRequests) - .setMaxBufferedRequests(maxBufferedRequests) - .setMaxTimeInBufferMS(maxTimeInBufferMS) - .setMaxRecordSizeInBytes(maxRecordSizeInBytes) - .build(), - state); + context, + AsyncSinkWriterConfiguration.builder() + .setMaxBatchSize(maxBatchSize) + .setMaxBatchSizeInBytes(maxBatchSizeInBytes) + .setMaxInFlightRequests(maxInFlightRequests) + .setMaxBufferedRequests(maxBufferedRequests) + .setMaxTimeInBufferMS(maxTimeInBufferMS) + .setMaxRecordSizeInBytes(maxRecordSizeInBytes) + .build(), + state); this.clickHouseClientConfig = clickHouseClientConfig; this.clickHouseFormat = clickHouseFormat; + this.numberOfRetries = numberOfRetries; final SinkWriterMetricGroup metricGroup = context.metricGroup(); this.numBytesSendCounter = metricGroup.getNumBytesSendCounter(); this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter(); this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted"); this.numOfDroppedBatchesCounter = metricGroup.counter("numOfDroppedBatches"); this.numOfDroppedRecordsCounter = metricGroup.counter("numOfDroppedRecords"); + this.totalBatchRetriesCounter = metricGroup.counter("totalBatchRetries"); + } + + + public ClickHouseAsyncWriter(ElementConverter elementConverter, + WriterInitContext context, + int maxBatchSize, + int maxInFlightRequests, + int maxBufferedRequests, + long maxBatchSizeInBytes, + long maxTimeInBufferMS, + long maxRecordSizeInBytes, + ClickHouseClientConfig clickHouseClientConfig, + ClickHouseFormat clickHouseFormat, + Collection> state) { + this(elementConverter, + context, + maxBatchSize, + maxInFlightRequests, + maxBufferedRequests, + maxBatchSizeInBytes, + maxTimeInBufferMS, + maxRecordSizeInBytes, + DEFAULT_MAX_RETRIES, + clickHouseClientConfig, + clickHouseFormat, + state + ); } @Override @@ -141,13 +175,27 @@ private void handleFailedRequest( Utils.handleException(error); } catch (RetriableException e) { LOG.info("Retriable exception occurred while processing request. ", e); - // TODO: send data again - resultHandler.retryForEntries(requestEntries); + // Let's try to retry + if (requestEntries != null && !requestEntries.isEmpty()) { + ClickHousePayload firstElement = requestEntries.get(0); + LOG.warn("Retry number [{}] out of [{}]", firstElement.getAttemptCount(), this.numberOfRetries); + firstElement.incrementAttempts(); + if (firstElement.getAttemptCount() <= this.numberOfRetries) { + totalBatchRetriesCounter.inc(); + LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - (firstElement.getAttemptCount() - 1) ); + // We are not in retry threshold we can send data again + resultHandler.retryForEntries(requestEntries); + return; + } else { + LOG.warn("No attempts left going to drop batch"); + } + } + } LOG.info("Dropping request entries. Since It a failure that can not be retried. error {} number of entries drop {}", error.getLocalizedMessage(), requestEntries.size()); numOfDroppedBatchesCounter.inc(); numOfDroppedRecordsCounter.inc(requestEntries.size()); - resultHandler.completeExceptionally((Exception)error); + resultHandler.completeExceptionally((Exception) error); } } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index 71fce5a..1cddca0 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -245,6 +245,10 @@ void SimplePOJODataTest() throws Exception { Assertions.assertEquals(EXPECTED_ROWS, rows); } + /** + * Suppose to drop data on failure. The way we try to generate this use case is by supplying the writer with wrong Format + * @throws Exception + */ @Test void CSVDataOnFailureDropDataTest() throws Exception { String tableName = "csv_failure_covid"; @@ -302,4 +306,67 @@ void CSVDataOnFailureDropDataTest() throws Exception { int rows = executeAsyncJob(env, tableName); Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows); } + + /** + * Suppose to retry and drop data on failure. The way we try to generate this use case is by supplying a different port of ClickHouse server + * @throws Exception + */ + @Test + void CSVDataOnRetryAndDropDataTest() throws Exception { + String tableName = "csv_retry_covid"; + String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); + ClickHouseServerForTests.executeSql(dropTable); + // create table + String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" + + "date Date," + + "location_key LowCardinality(String)," + + "new_confirmed Int32," + + "new_deceased Int32," + + "new_recovered Int32," + + "new_tested Int32," + + "cumulative_confirmed Int32," + + "cumulative_deceased Int32," + + "cumulative_recovered Int32," + + "cumulative_tested Int32" + + ") " + + "ENGINE = MergeTree " + + "ORDER BY (location_key, date); "; + ClickHouseServerForTests.executeSql(tableSql); + + final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); + env.setParallelism(STREAM_PARALLELISM); + + + ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getIncorrectServerURL(), getUsername(), getPassword(), getDatabase(), tableName); + ElementConverter convertorString = new ClickHouseConvertor<>(String.class); + // create sink + ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( + convertorString, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, + clickHouseClientConfig + ); + csvSink.setClickHouseFormat(ClickHouseFormat.CSV); + + Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz"); + + FileSource source = FileSource + .forRecordStreamFormat(new TextLineInputFormat(), filePath) + .build(); + // read csv data from file + DataStreamSource lines = env.fromSource( + source, + WatermarkStrategy.noWatermarks(), + "GzipCsvSource" + ); + lines.sinkTo(csvSink); + // TODO: make the test smarter by checking the counter of numOfDroppedRecords equals EXPECTED_ROWS + int rows = executeAsyncJob(env, tableName); + Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows); + } + } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java index 9ee6463..334ba1c 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java @@ -23,6 +23,10 @@ public static String getServerURL() { return ClickHouseServerForTests.getURL(); } + public static String getIncorrectServerURL() { + return ClickHouseServerForTests.getURL(ClickHouseServerForTests.getHost(), ClickHouseServerForTests.getPort() + 1); + } + public static String getUsername() { return ClickHouseServerForTests.getUsername(); } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index ff4b0fd..20f9362 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -78,7 +78,12 @@ public static void tearDown() { public static int getPort() { return port; } public static String getUsername() { return username; } public static String getPassword() { return password; } + public static String getURL() { + return ClickHouseServerForTests.getURL(host, port); + } + + public static String getURL(String host, int port) { if (isCloud) { return "https://" + host + ":" + port + "/"; } else { From 872baa87cc98e9ed42b66532db8aa7cd734490c3 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Tue, 24 Jun 2025 18:56:25 +0300 Subject: [PATCH 04/11] Adjust configuration with consts --- .../clickhouse/sink/ClickHouseSinkTests.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index 81c2883..a8807ec 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -275,12 +275,12 @@ void ProductNameTest() throws Exception { // create sink ClickHouseAsyncSink csvSink = new ClickHouseAsyncSink<>( convertorString, - 5000, - 2, - 20000, - 1024 * 1024, - 5 * 1000, - 1000, + MAX_BATCH_SIZE, + MAX_IN_FLIGHT_REQUESTS, + MAX_BUFFERED_REQUESTS, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, clickHouseClientConfig ); csvSink.setClickHouseFormat(ClickHouseFormat.CSV); @@ -297,7 +297,7 @@ void ProductNameTest() throws Exception { "GzipCsvSource" ); lines.sinkTo(csvSink); - int rows = executeJob(env, tableName); + int rows = executeAsyncJob(env, tableName); Assertions.assertEquals(EXPECTED_ROWS, rows); ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS"); if (ClickHouseServerForTests.isCloud()) From 84412ce892b7f234a506e843a192014d2b24ea54 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 29 Jun 2025 13:26:36 +0300 Subject: [PATCH 05/11] Added TOO_MANY_PARTS testing --- .../build.gradle.kts | 7 +- .../main/java/com/clickhouse/utils/Utils.java | 51 ++-------- .../sink/ClickHouseAsyncWriter.java | 2 +- .../sink/ClickHouseClientConfig.java | 37 ++++++-- .../clickhouse/sink/ClickHouseSinkTests.java | 93 +++++++++++++++++-- .../clickhouse/ClickHouseServerForTests.java | 14 +++ 6 files changed, 139 insertions(+), 65 deletions(-) diff --git a/flink-connector-clickhouse-base/build.gradle.kts b/flink-connector-clickhouse-base/build.gradle.kts index 6a6d7d1..7378e98 100644 --- a/flink-connector-clickhouse-base/build.gradle.kts +++ b/flink-connector-clickhouse-base/build.gradle.kts @@ -12,12 +12,13 @@ val scalaVersion = "2.13.12" repositories { // Use Maven Central for resolving dependencies. - mavenLocal() +// mavenLocal() + maven("https://s01.oss.sonatype.org/content/groups/staging/") mavenCentral() } extra.apply { - set("clickHouseDriverVersion", "0.8.6") + set("clickHouseDriverVersion", "0.9.0-SNAPSHOT") set("flinkVersion", "2.0.0") set("log4jVersion","2.17.2") set("testContainersVersion", "1.21.0") @@ -91,7 +92,7 @@ sourceSets { // Apply a specific Java toolchain to ease working on different environments. java { toolchain { - languageVersion = JavaLanguageVersion.of(11) + languageVersion = JavaLanguageVersion.of(17) } } diff --git a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java index 1b6cb52..21095ee 100644 --- a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java +++ b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java @@ -1,5 +1,6 @@ package com.clickhouse.utils; +import com.clickhouse.client.api.ConnectionInitiationException; import com.clickhouse.client.api.ServerException; import org.apache.flink.connector.clickhouse.exception.RetriableException; import org.slf4j.Logger; @@ -43,50 +44,19 @@ public static Exception getRootCause(Throwable e, Boolean prioritizeServerExcept public static void handleException(Throwable e) { LOG.warn("Deciding how to handle exception: {}", e.getLocalizedMessage()); - - //Let's check if we have a ServerException to reference the error code - //https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp Exception rootCause = Utils.getRootCause(e, true); if (rootCause instanceof ServerException) { - ServerException clickHouseServerException = (ServerException) rootCause; - LOG.warn("ClickHouse Server Exception Code: {}", clickHouseServerException.getCode()); - switch (clickHouseServerException.getCode()) { - case 3: // UNEXPECTED_END_OF_FILE - case 107: // FILE_DOESNT_EXIST - case 159: // TIMEOUT_EXCEEDED - case 164: // READONLY - case 202: // TOO_MANY_SIMULTANEOUS_QUERIES - case 203: // NO_FREE_CONNECTION - case 209: // SOCKET_TIMEOUT - case 210: // NETWORK_ERROR - case 241: // MEMORY_LIMIT_EXCEEDED - case 242: // TABLE_IS_READ_ONLY - case 252: // TOO_MANY_PARTS - case 285: // TOO_FEW_LIVE_REPLICAS - case 319: // UNKNOWN_STATUS_OF_INSERT - case 425: // SYSTEM_ERROR - case 999: // KEEPER_EXCEPTION - throw new RetriableException(e); - case 0: - switch (clickHouseServerException.getTransportProtocolCode()) { - case 400: // Bad request - case 500: // Internal server error - throw new RetriableException(e); - default: - LOG.error("Error code [{}] wasn't in the acceptable list. Transport protocol code [{}]", clickHouseServerException.getCode(), clickHouseServerException.getTransportProtocolCode()); - } - default: - LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseServerException.getCode()); - break; + ServerException serverException = (ServerException) rootCause; + LOG.warn("ClickHouse Server Exception Code: {} isRetryable: {}", serverException.getCode(), serverException.isRetryable()); + System.out.println("ClickHouse Server Exception Code: " + serverException.getCode()); + if (serverException.isRetryable()) { + throw new RetriableException(e); } - } - - //Otherwise use Root-Cause Exception Checking - if (rootCause instanceof SocketTimeoutException) { - LOG.warn("SocketTimeoutException thrown, wrapping exception: {}", e.getLocalizedMessage()); + } else if (rootCause instanceof ConnectionInitiationException) { + LOG.warn("ClickHouse Connection Initiation Exception: {}", rootCause.getLocalizedMessage()); throw new RetriableException(e); - } else if (rootCause instanceof UnknownHostException) { - LOG.warn("UnknownHostException thrown, wrapping exception: {}", e.getLocalizedMessage()); + } else if (rootCause instanceof SocketTimeoutException) { + LOG.warn("SocketTimeoutException thrown, wrapping exception: {}", e.getLocalizedMessage()); throw new RetriableException(e); } else if (rootCause instanceof IOException) { final String msg = rootCause.getMessage(); @@ -96,5 +66,4 @@ public static void handleException(Throwable e) { } } } - } diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java index 33560f9..f68bda9 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java @@ -94,7 +94,7 @@ public ClickHouseAsyncWriter(ElementConverter element maxBatchSizeInBytes, maxTimeInBufferMS, maxRecordSizeInBytes, - DEFAULT_MAX_RETRIES, + clickHouseClientConfig.getNumberOfRetries(), clickHouseClientConfig, clickHouseFormat, state diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java index 9fd3b69..5ebbbac 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java @@ -14,6 +14,8 @@ public class ClickHouseClientConfig implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseClientConfig.class); private static final long serialVersionUID = 1L; + private static final int DEFAULT_MAX_RETRIES = 3; + private final String url; private final String username; private final String password; @@ -22,6 +24,8 @@ public class ClickHouseClientConfig implements Serializable { private final String fullProductName; private Boolean supportDefault = null; private final Map options; + private transient Client client = null; + private int numberOfRetries = DEFAULT_MAX_RETRIES; public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) { this.url = url; @@ -34,16 +38,21 @@ public ClickHouseClientConfig(String url, String username, String password, Stri } public Client createClient(String database) { - Client client = new Client.Builder() - .addEndpoint(url) - .setUsername(username) - .setPassword(password) - .setDefaultDatabase(database) - .setClientName(fullProductName) - .setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true") - .setOptions(options) - .build(); - return client; + if (this.client == null) { + Client client = new Client.Builder() + .addEndpoint(url) + .setUsername(username) + .setPassword(password) + .setDefaultDatabase(database) + .setClientName(fullProductName) + .setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true") + .setOptions(options) + .build(); + this.client = client; + return client; + } else { + return this.client; + } } public Client createClient() { @@ -65,4 +74,12 @@ public void setOptions(Map options) { this.options.putAll(options); } } + + public void setNumberOfRetries(int numberOfRetries) { + this.numberOfRetries = numberOfRetries; + } + + public int getNumberOfRetries() { + return numberOfRetries; + } } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index a8807ec..1c6ea6b 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -29,11 +29,14 @@ import java.util.ArrayList; import java.util.List; +import static org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests.countMerges; + public class ClickHouseSinkTests extends FlinkClusterTests { static final int EXPECTED_ROWS = 10000; static final int EXPECTED_ROWS_ON_FAILURE = 0; static final int MAX_BATCH_SIZE = 5000; + static final int MIN_BATCH_SIZE = 1; static final int MAX_IN_FLIGHT_REQUESTS = 2; static final int MAX_BUFFERED_REQUESTS = 20000; static final long MAX_BATCH_SIZE_IN_BYTES = 1024 * 1024; @@ -42,17 +45,18 @@ public class ClickHouseSinkTests extends FlinkClusterTests { static final int STREAM_PARALLELISM = 5; - private int executeAsyncJob(StreamExecutionEnvironment env, String tableName) throws Exception { + private int executeAsyncJob(StreamExecutionEnvironment env, String tableName, int numIterations, int expectedRows) throws Exception { JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource"); int rows = 0; int iterations = 0; - while (iterations < 10) { + while (iterations < numIterations) { Thread.sleep(1000); iterations++; rows = ClickHouseServerForTests.countRows(tableName); - System.out.println("Rows: " + rows); - if (rows == EXPECTED_ROWS) + System.out.println("Rows: " + rows + " EXPECTED_ROWS: " + expectedRows); + if (rows == expectedRows) break; + } // cancel job jobClient.cancel(); @@ -111,7 +115,7 @@ void CSVDataTest() throws Exception { "GzipCsvSource" ); lines.sinkTo(csvSink); - int rows = executeAsyncJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS, rows); } @@ -180,7 +184,7 @@ public CovidPOJO map(String value) throws Exception { }); // send to a sink covidPOJOs.sinkTo(covidPOJOSink); - int rows = executeAsyncJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS, rows); } @@ -241,7 +245,7 @@ void SimplePOJODataTest() throws Exception { DataStream simplePOJOs = env.fromData(simplePOJOList.toArray(new SimplePOJO[0])); // send to a sink simplePOJOs.sinkTo(simplePOJOSink); - int rows = executeAsyncJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS, rows); } @@ -297,7 +301,7 @@ void ProductNameTest() throws Exception { "GzipCsvSource" ); lines.sinkTo(csvSink); - int rows = executeAsyncJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS, rows); ClickHouseServerForTests.executeSql("SYSTEM FLUSH LOGS"); if (ClickHouseServerForTests.isCloud()) @@ -368,7 +372,7 @@ void CSVDataOnFailureDropDataTest() throws Exception { ); lines.sinkTo(csvSink); // TODO: make the test smarter by checking the counter of numOfDroppedRecords equals EXPECTED_ROWS - int rows = executeAsyncJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows); } @@ -430,8 +434,77 @@ void CSVDataOnRetryAndDropDataTest() throws Exception { ); lines.sinkTo(csvSink); // TODO: make the test smarter by checking the counter of numOfDroppedRecords equals EXPECTED_ROWS - int rows = executeAsyncJob(env, tableName); + int rows = executeAsyncJob(env, tableName, 10, EXPECTED_ROWS); Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows); } + /* + In this test, we lower the parts_to_throw_insert setting (https://clickhouse.com/docs/operations/settings/merge-tree-settings#parts_to_throw_insert) to trigger the "Too Many Parts" error more easily. + Once we exceed this threshold, ClickHouse will reject INSERT operations with a "Too Many Parts" error. + Our retry implementation will demonstrate how it handles these failures by retrying the inserts until all rows are successfully inserted. We will insert one batch containing two records to observe this behavior. + */ + @Test + void SimplePOJODataTooManyPartsTest() throws Exception { + // TODO: needs to be extended to all types + String tableName = "simple_too_many_parts_pojo"; + + String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); + ClickHouseServerForTests.executeSql(dropTable); + // create table + String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" + + "bytePrimitive Int8," + + "byteObject Int8," + + "shortPrimitive Int16," + + "shortObject Int16," + + "intPrimitive Int32," + + "integerObject Int32," + + "longPrimitive Int64," + + "longObject Int64," + + "floatPrimitive Float," + + "floatObject Float," + + "doublePrimitive Double," + + "doubleObject Double," + + ") " + + "ENGINE = MergeTree " + + "ORDER BY (longPrimitive) " + + "SETTINGS parts_to_throw_insert = 10;"; + ClickHouseServerForTests.executeSql(tableSql); + //ClickHouseServerForTests.executeSql(String.format("SYSTEM STOP MERGES `%s.%s`", getDatabase(), tableName)); + + TableSchema simpleTableSchema = ClickHouseServerForTests.getTableSchema(tableName); + POJOConvertor simplePOJOConvertor = new SimplePOJOConvertor(); + + final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment(); + env.setParallelism(STREAM_PARALLELISM); + + ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName); + clickHouseClientConfig.setNumberOfRetries(10); + clickHouseClientConfig.setSupportDefault(simpleTableSchema.hasDefaults()); + + ElementConverter convertorCovid = new ClickHouseConvertor<>(SimplePOJO.class, simplePOJOConvertor); + + ClickHouseAsyncSink simplePOJOSink = new ClickHouseAsyncSink<>( + convertorCovid, + MIN_BATCH_SIZE * 2, + MAX_IN_FLIGHT_REQUESTS, + 10, + MAX_BATCH_SIZE_IN_BYTES, + MAX_TIME_IN_BUFFER_MS, + MAX_RECORD_SIZE_IN_BYTES, + clickHouseClientConfig + ); + + List simplePOJOList = new ArrayList<>(); + for (int i = 0; i < EXPECTED_ROWS; i++) { + simplePOJOList.add(new SimplePOJO(i)); + } + // create from list + DataStream simplePOJOs = env.fromData(simplePOJOList.toArray(new SimplePOJO[0])); + // send to a sink + simplePOJOs.sinkTo(simplePOJOSink); + int rows = executeAsyncJob(env, tableName, 100, EXPECTED_ROWS); + Assertions.assertEquals(EXPECTED_ROWS, rows); + //ClickHouseServerForTests.executeSql(String.format("SYSTEM START MERGES `%s.%s`", getDatabase(), tableName)); + } + } diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java index f0ae0a6..2a72dff 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java @@ -102,6 +102,20 @@ public static void executeSql(String sql) throws ExecutionException, Interrupted } } + public static int countParts(String table) { + String countPartsSql = String.format("SELECT count(*) FROM system.parts WHERE table = '%s' and active = 1", table); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List countResult = client.queryAll(countPartsSql); + return countResult.get(0).getInteger(1); + } + + public static int countMerges(String table) { + String countPartsSql = String.format("SELECT count(*) FROM system.merges WHERE table = '%s'", table); + Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); + List countResult = client.queryAll(countPartsSql); + return countResult.get(0).getInteger(1); + } + public static int countRows(String table) throws ExecutionException, InterruptedException { String countSql = String.format("SELECT COUNT(*) FROM `%s`.`%s`", database, table); Client client = ClickHouseTestHelpers.getClient(host, port, isSSL, username, password); From 5591bb96949e39b718e2b5e2bbdd242172cbd107 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 30 Jun 2025 08:36:13 +0300 Subject: [PATCH 06/11] Skip too many parts test on the cloud --- .../flink/connector/clickhouse/sink/ClickHouseSinkTests.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java index 1c6ea6b..2af6350 100644 --- a/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java +++ b/flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java @@ -30,6 +30,7 @@ import java.util.List; import static org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests.countMerges; +import static org.apache.flink.connector.test.embedded.clickhouse.ClickHouseServerForTests.isCloud; public class ClickHouseSinkTests extends FlinkClusterTests { @@ -445,7 +446,9 @@ In this test, we lower the parts_to_throw_insert setting (https://clickhouse.com */ @Test void SimplePOJODataTooManyPartsTest() throws Exception { - // TODO: needs to be extended to all types + // this test is not running on cloud + if (isCloud()) + return; String tableName = "simple_too_many_parts_pojo"; String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName); From 5404243bc343346b59917baef49cba2bab60f8db Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 30 Jun 2025 08:57:55 +0300 Subject: [PATCH 07/11] Addubg a few comments --- flink-connector-clickhouse-base/build.gradle.kts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connector-clickhouse-base/build.gradle.kts b/flink-connector-clickhouse-base/build.gradle.kts index 7378e98..df43e27 100644 --- a/flink-connector-clickhouse-base/build.gradle.kts +++ b/flink-connector-clickhouse-base/build.gradle.kts @@ -13,12 +13,12 @@ val scalaVersion = "2.13.12" repositories { // Use Maven Central for resolving dependencies. // mavenLocal() - maven("https://s01.oss.sonatype.org/content/groups/staging/") + maven("https://s01.oss.sonatype.org/content/groups/staging/") // Temporary until we have a Java Client release mavenCentral() } extra.apply { - set("clickHouseDriverVersion", "0.9.0-SNAPSHOT") + set("clickHouseDriverVersion", "0.9.0-SNAPSHOT") // Temporary until we have a Java Client release set("flinkVersion", "2.0.0") set("log4jVersion","2.17.2") set("testContainersVersion", "1.21.0") From d85cb2b850b8c4283e6d20cbad012cbeaf9259a8 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Wed, 2 Jul 2025 15:16:12 +0300 Subject: [PATCH 08/11] Adding generation of shadow jar to include all dependencies --- CONTRIBUTING.md | 6 ++++ .../build.gradle.kts | 30 ++++++++++++++++++- 2 files changed, 35 insertions(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d566a8a..0a57a25 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -14,6 +14,12 @@ cd flink-connector-clickhouse ./gradlew build ``` +### Publish locally + ```bash + cd flink-connector-clickhouse + ./gradlew publishToMavenLocal + ``` + ## Testing ### Tooling diff --git a/flink-connector-clickhouse-base/build.gradle.kts b/flink-connector-clickhouse-base/build.gradle.kts index df43e27..f41cee5 100644 --- a/flink-connector-clickhouse-base/build.gradle.kts +++ b/flink-connector-clickhouse-base/build.gradle.kts @@ -4,11 +4,14 @@ */ plugins { + `maven-publish` scala java + id("com.github.johnrengelman.shadow") version "8.1.1" } val scalaVersion = "2.13.12" +val sinkVersion = "0.0.1" repositories { // Use Maven Central for resolving dependencies. @@ -92,7 +95,7 @@ sourceSets { // Apply a specific Java toolchain to ease working on different environments. java { toolchain { - languageVersion = JavaLanguageVersion.of(17) + languageVersion = JavaLanguageVersion.of(11) } } @@ -129,3 +132,28 @@ tasks.register("runScalaTests") { "-s", "org.apache.flink.connector.clickhouse.test.scala.ClickHouseSinkTests" ) } + +tasks.shadowJar { + archiveClassifier.set("all") + + dependencies { + exclude(dependency("org.apache.flink:.*")) + } + mergeServiceFiles() +} + +tasks.jar { + enabled = false +} + +publishing { + publications { + create("maven") { + //from(components["java"]) + artifact(tasks.shadowJar) + groupId = "org.apache.flink.connector" + artifactId = "clickhouse" + version = sinkVersion + } + } +} From 378bbd3ae9641f50f0ffee3fcb280c9140f42f6c Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 6 Jul 2025 08:20:27 +0300 Subject: [PATCH 09/11] Remove system.out --- .../src/main/java/com/clickhouse/utils/Utils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java index 21095ee..ed17adc 100644 --- a/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java +++ b/flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java @@ -48,7 +48,6 @@ public static void handleException(Throwable e) { if (rootCause instanceof ServerException) { ServerException serverException = (ServerException) rootCause; LOG.warn("ClickHouse Server Exception Code: {} isRetryable: {}", serverException.getCode(), serverException.isRetryable()); - System.out.println("ClickHouse Server Exception Code: " + serverException.getCode()); if (serverException.isRetryable()) { throw new RetriableException(e); } From 1d8b002d48f3d37ecedd83ab538a60922c846a20 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 6 Jul 2025 08:26:27 +0300 Subject: [PATCH 10/11] Enhance log message --- .../flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java index f68bda9..60eaa02 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java @@ -192,7 +192,7 @@ private void handleFailedRequest( } } - LOG.info("Dropping request entries. Since It a failure that can not be retried. error {} number of entries drop {}", error.getLocalizedMessage(), requestEntries.size()); + LOG.info("Dropping {} request entries due to non-retryable failure: {}", requestEntries.size(), error.getLocalizedMessage()); numOfDroppedBatchesCounter.inc(); numOfDroppedRecordsCounter.inc(requestEntries.size()); resultHandler.completeExceptionally((Exception) error); From ff2f4efcef3802631e8aac8ab7d71c96cd742dfc Mon Sep 17 00:00:00 2001 From: mzitnik Date: Sun, 6 Jul 2025 08:28:13 +0300 Subject: [PATCH 11/11] Reformat code with IDE --- .../connector/clickhouse/sink/ClickHouseClientConfig.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java index 5ebbbac..079e6ae 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java @@ -59,7 +59,9 @@ public Client createClient() { return createClient(this.database); } - public String getTableName() { return tableName; } + public String getTableName() { + return tableName; + } public void setSupportDefault(Boolean supportDefault) { this.supportDefault = supportDefault;