diff --git a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java index 21d8f0c..9fd3b69 100644 --- a/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java +++ b/flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseClientConfig.java @@ -7,6 +7,8 @@ import org.slf4j.LoggerFactory; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; public class ClickHouseClientConfig implements Serializable { private static final Logger LOG = LoggerFactory.getLogger(ClickHouseClientConfig.class); @@ -19,6 +21,7 @@ public class ClickHouseClientConfig implements Serializable { private final String tableName; private final String fullProductName; private Boolean supportDefault = null; + private final Map options; public ClickHouseClientConfig(String url, String username, String password, String database, String tableName) { this.url = url; @@ -27,6 +30,7 @@ public ClickHouseClientConfig(String url, String username, String password, Stri this.database = database; this.tableName = tableName; this.fullProductName = String.format("Flink-ClickHouse-Sink/%s (fv:flink/%s, lv:scala/%s)", ClickHouseSinkVersion.getVersion(), EnvironmentInformation.getVersion(), EnvironmentInformation.getScalaVersion()); + this.options = new HashMap<>(); } public Client createClient(String database) { @@ -37,6 +41,7 @@ public Client createClient(String database) { .setDefaultDatabase(database) .setClientName(fullProductName) .setOption(ClientConfigProperties.ASYNC_OPERATIONS.getKey(), "true") + .setOptions(options) .build(); return client; } @@ -54,4 +59,10 @@ public void setSupportDefault(Boolean supportDefault) { public Boolean getSupportDefault() { return supportDefault; } + + public void setOptions(Map options) { + if (options != null) { + this.options.putAll(options); + } + } }