Skip to content

Commit

Permalink
Merge pull request #432 from Altinity/add_jdbc_parameter_config
Browse files Browse the repository at this point in the history
Add clickhouse jdbc parameter config
  • Loading branch information
subkanthi committed Jan 9, 2024
2 parents e4fd146 + a70ff61 commit ddf5b3f
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 4 deletions.
3 changes: 3 additions & 0 deletions sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,6 @@ auto.create.tables: "true"

# Max number of records for the flush buffer.
#buffer.max.records: "10000"

# ClickHouse JDBC configuration parameters, as a list of key-value pairs separated by commas.
#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000"
6 changes: 3 additions & 3 deletions sink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>com.altinity</groupId>
<artifactId>clickhouse-kafka-sink-connector</artifactId>
<version>0.0.5</version>
<version>0.0.6</version>
<packaging>jar</packaging>
<name>ClickHouse Kafka Sink Connector</name>
<description>Sinks data from Kafka into ClickHouse</description>
Expand Down Expand Up @@ -38,8 +38,8 @@

<!-- Set our Language Level to Java 11 -->
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<version.testcontainers>1.19.1</version.testcontainers>
</properties>

Expand Down
Empty file.
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,16 @@ static ConfigDef newConfigDef() {
5,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.RESTART_EVENT_LOOP_TIMEOUT_PERIOD.toString())
.define(
ClickHouseSinkConnectorConfigVariables.JDBC_PARAMETERS.toString(),
Type.STRING,
"",
Importance.HIGH,
"JDBC connection parameters, the parameters should be in this format socket_timeout=10000,connection_timeout=100, delimited by comma",
CONFIG_GROUP_CONNECTOR_CONFIG,
6,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.JDBC_PARAMETERS.toString())

// ToDo: Add JVM Proxy
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public enum ClickHouseSinkConnectorConfigVariables {

RESTART_EVENT_LOOP("restart.event.loop"),

RESTART_EVENT_LOOP_TIMEOUT_PERIOD("restart.event.loop.timeout.period.secs");
RESTART_EVENT_LOOP_TIMEOUT_PERIOD("restart.event.loop.timeout.period.secs"),
JDBC_PARAMETERS("clickhouse.jdbc.params");

private String label;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
Expand All @@ -26,6 +27,8 @@ public class BaseDbWriter {

private ZoneId serverTimeZone;

private ClickHouseSinkConnectorConfig config;

private static final Logger log = LoggerFactory.getLogger(BaseDbWriter.class);

public BaseDbWriter(
Expand All @@ -44,10 +47,29 @@ public BaseDbWriter(
this.password = password;

String connectionUrl = getConnectionString(hostName, port, database);
this.config = config;
this.createConnection(connectionUrl, "Agent_1", userName, password);
this.serverTimeZone = new DBMetadata().getServerTimeZone(this.conn);
}

/**
* Function to split JDBC properties string into Properties object.
* @param jdbcProperties
* @return
*/
public Properties splitJdbcProperties(String jdbcProperties) {
// Split JDBC properties(delimited by equal sign) string delimited by comma.
String[] splitProperties = jdbcProperties.split(",");

// Iterate through splitProperties and convert to Properties.
Properties properties = new Properties();
Arrays.stream(splitProperties).forEach(property -> {
String[] keyValue = property.split("=");
properties.setProperty(keyValue[0], keyValue[1]);
});

return properties;
}
public ClickHouseConnection getConnection() {
return this.conn;
}
Expand All @@ -64,10 +86,17 @@ public String getConnectionString(String hostName, Integer port, String database
* @param password Password
*/
protected void createConnection(String url, String clientName, String userName, String password) {
String jdbcParams = this.config.getString(ClickHouseSinkConnectorConfigVariables.JDBC_PARAMETERS.toString());
try {
Properties properties = new Properties();
properties.setProperty("client_name", clientName);
properties.setProperty("custom_settings", "allow_experimental_object_type=1");

if(!jdbcParams.isEmpty()) {
log.info("**** JDBC PARAMS from configuration:" + jdbcParams);
Properties userProps = splitJdbcProperties(jdbcParams);
properties.putAll(userProps);
}
ClickHouseDataSource dataSource = new ClickHouseDataSource(url, properties);

this.conn = dataSource.getConnection(userName, password);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.altinity.clickhouse.sink.connector.db;


import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfigVariables;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.annotation.JsonAppend;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

public class BaseDbWriterTest {
@Test
public void testSplitJdbcProperties() {
String jdbcProperties = "max_buffer_size=1000000,socket_timeout=10000";
Map props = new HashMap<>();
props.put(ClickHouseSinkConnectorConfigVariables.JDBC_PARAMETERS.toString(), jdbcProperties);
ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(props);

Properties properties = new BaseDbWriter(
"localhost",
8123,
"default",
"default",
"",
config
).splitJdbcProperties(jdbcProperties);
Assert.assertEquals(properties.getProperty("max_buffer_size"), "1000000");
Assert.assertEquals(properties.getProperty("socket_timeout"), "10000");
}
}

0 comments on commit ddf5b3f

Please sign in to comment.