From 64658790cccda44d55785c95bf4af898775baf8d Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 16 Jun 2025 02:43:03 +0200 Subject: [PATCH 1/3] Exposing set options for client --- .../clickhouse/sink/ClickHouseClientConfig.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java index 21d8f0c..f29ab58 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java @@ -7,6 +7,8 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; public class ClickHouseClientConfig implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseClientConfig.class); @@ -19,6 +21,7 @@ public class ClickHouseClientConfig implements Serializable { private final String tableName; private final String fullProductName; private Boolean supportDefault = null; + private Map options; public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) { this.url = url; @@ -27,6 +30,7 @@ public ClickHouseClientConfig(String url, String username, String password, Stri this.database = database; this.tableName = tableName; this.fullProductName = String.format("Flink-ClickHouse-Sink/%s (fv:flink/%s, lv:scala/%s)", ClickHouseSinkVersion.getVersion(), EnvironmentInformation.getVersion(), EnvironmentInformation.getScalaVersion()); + this.options = new HashMap<>(); } public Client createClient(String database) { @@ -37,6 +41,7 @@ public Client createClient(String database) { .setDefaultDatabase(database) .setClientName(fullProductName) .setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true") + .setOptions(options) .build(); return client; } @@ -54,4 +59,8 @@ public void setSupportDefault(Boolean supportDefault) { public Boolean getSupportDefault() { return supportDefault; } + + public void setOptions(Map options) { + this.options = options; + } } From 3e5b57de245f1a4375db9370fe1d9411f8195755 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 16 Jun 2025 02:44:57 +0200 Subject: [PATCH 2/3] checking if options is not null. --- .../connector/clickhouse/sink/ClickHouseClientConfig.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java index f29ab58..0f525a6 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java @@ -61,6 +61,8 @@ public Boolean getSupportDefault() { } public void setOptions(Map options) { - this.options = options; + if (options != null) { + this.options.putAll(options); + } } } From 742304a124aed5d7c5bcbcf1701df6f16114bce3 Mon Sep 17 00:00:00 2001 From: mzitnik Date: Mon, 16 Jun 2025 15:20:42 +0300 Subject: [PATCH 3/3] Convert to options map to final --- .../flink/connector/clickhouse/sink/ClickHouseClientConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java index 0f525a6..9fd3b69 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java @@ -21,7 +21,7 @@ public class ClickHouseClientConfig implements Serializable { private final String tableName; private final String fullProductName; private Boolean supportDefault = null; - private Map options; + private final Map options; public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) { this.url = url;