Skip to content

Commit

Permalink
Merge pull request #497 from Altinity/2.0.1
Browse files Browse the repository at this point in the history
Release 2.0.1
  • Loading branch information
subkanthi committed Mar 19, 2024
2 parents 156f2ab + 81274f7 commit 23edb2b
Show file tree
Hide file tree
Showing 30 changed files with 943 additions and 51 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ First two are good tutorials on MySQL and PostgreSQL respectively.
* [Architecture Overview](doc/architecture.md)
* [Lightweight Sink Connect CLI](doc/sink_connector_cli.md)
* [Mutable Data Handling](doc/mutable_data.md)
* [ClickHouse Table Engine Types](doc/clickhouse_engines.md)

### Operations

Expand Down
11 changes: 11 additions & 0 deletions doc/clickhouse_engines.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
### Supported ClickHouse Table Engine Types.

The sink connector supports the following ClickHouse Engine types:
- **ReplacingMergeTree**: See [Architecture] (doc/mutable_data.md) for more information.
- ReplacingMergeTree is a variant of MergeTree that allows for updates and deletes.
- It is the default engine type used with the sink connector when tables are auto created using
- `auto.create.tables` set to `true`.
- **ReplicatedReplacingMergeTree**:
- ReplicatedReplacingMergeTree is a variant of ReplicatedMergeTree that allows for updates and deletes.
- To enable this engine type, set the `"auto.create.tables.replicated` to `true`, sink connector will
create tables with this engine type.
11 changes: 11 additions & 0 deletions sink-connector-lightweight/clickhouse/config.xml
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
<clickhouse replace="true">
<timezone>America/Chicago</timezone>
<zookeeper>
<node index="1">
<host>zookeeper</host>
<port>2181</port>
</node>
<session_timeout_ms>15000</session_timeout_ms>
</zookeeper>
<macros>
<replica>clickhouse</replica>
<shard>02</shard>
</macros>
</clickhouse>
8 changes: 7 additions & 1 deletion sink-connector-lightweight/docker/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ persist.raw.bytes: "false"
# auto.create.tables: If set to true, the connector will create tables in the target based on the schema received in the incoming message.
auto.create.tables: "true"

# auto.create.tables.replicated: If set to true, the connector will create table with Engine set to ReplicatedReplacingMergeTree
#"auto.create.tables.replicated: "true"

# database.connectionTimeZone: The timezone of the MySQL database server used to correctly shift the commit transaction timestamp.
#database.connectionTimeZone: "US/Samoa"

Expand Down Expand Up @@ -148,4 +151,7 @@ restart.event.loop.timeout.period.secs: "3000"
#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000"

# Maximum number of threads in the thread pool for processing CDC records.
#thread.pool.size: 10
#thread.pool.size: 10

# Sink Connector maximum queue size
#sink.connector.max.queue.size: "100000"
3 changes: 3 additions & 0 deletions sink-connector-lightweight/docker/config_local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,6 @@ restart.event.loop.timeout.period.secs: "30"

# ClickHouse JDBC configuration parameters, as a list of key-value pairs separated by commas.
#clickhouse.jdbc.params: "max_buffer_size=1000000,socket_timeout=10000"

# The maximum number of records that should be loaded into memory while streaming data from MySQL to ClickHouse.
sink.connector.max.queue.size: "100000"
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public class DebeziumChangeEventCapture {
private ClickHouseBatchRunnable runnable;

// Records grouped by Topic Name
private ConcurrentLinkedQueue<List<ClickHouseStruct>> records = new ConcurrentLinkedQueue<>();
private LinkedBlockingQueue<List<ClickHouseStruct>> records;


private BaseDbWriter writer = null;
Expand Down Expand Up @@ -97,13 +97,15 @@ private void performDDLOperation(String DDL, Properties props, SourceRecord sr,
writer = new BaseDbWriter(dbCredentials.getHostName(), dbCredentials.getPort(),
dbCredentials.getDatabase(), dbCredentials.getUserName(),
dbCredentials.getPassword(), config, this.conn);
try {
String clickHouseVersion = writer.getClickHouseVersion();
isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
.checkIfNewReplacingMergeTree(clickHouseVersion);
} catch (Exception e) {
log.error("Error retrieving version");
}

}

try {
String clickHouseVersion = writer.getClickHouseVersion();
isNewReplacingMergeTreeEngine = new com.altinity.clickhouse.sink.connector.db.DBMetadata()
.checkIfNewReplacingMergeTree(clickHouseVersion);
} catch (Exception e) {
log.error("Error retrieving version");
}
StringBuffer clickHouseQuery = new StringBuffer();
AtomicBoolean isDropOrTruncate = new AtomicBoolean(false);
Expand Down Expand Up @@ -586,6 +588,13 @@ public void connectorStopped() {
public void setup(Properties props, DebeziumRecordParserService debeziumRecordParserService,
DDLParserService ddlParserService, boolean forceStart) throws IOException, ClassNotFoundException {

// Check if max queue size was defined by the user.
if(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()) != null) {
int maxQueueSize = Integer.parseInt(props.getProperty(ClickHouseSinkConnectorConfigVariables.MAX_QUEUE_SIZE.toString()));
this.records = new LinkedBlockingQueue<>(maxQueueSize);
} else {
this.records = new LinkedBlockingQueue<>();
}

ClickHouseSinkConnectorConfig config = new ClickHouseSinkConnectorConfig(PropertiesHelper.toMap(props));
Metrics.initialize(props.getProperty(ClickHouseSinkConnectorConfigVariables.ENABLE_METRICS.toString()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr
if(columnNames.contains(isDeletedColumn)) {
isDeletedColumn = "__" + IS_DELETED_COLUMN;
}

// Check if the destination is ReplicatedReplacingMergeTree.
boolean isReplicatedReplacingMergeTree = config.getBoolean(ClickHouseSinkConnectorConfigVariables
.AUTO_CREATE_TABLES_REPLICATED.toString());

if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append("`").append(VERSION_COLUMN).append("` ").append(VERSION_COLUMN_DATA_TYPE).append(",");
this.query.append("`").append(isDeletedColumn).append("` ").append(IS_DELETED_COLUMN_DATA_TYPE);
Expand All @@ -113,10 +118,16 @@ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext columnCr

this.query.append(")");
if(DebeziumChangeEventCapture.isNewReplacingMergeTreeEngine == true) {
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");

if(isReplicatedReplacingMergeTree == true) {
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s, %s)", tableName, VERSION_COLUMN, isDeletedColumn));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(",").append(isDeletedColumn).append(")");
} else {
if (isReplicatedReplacingMergeTree == true) {
this.query.append(String.format("Engine=ReplicatedReplacingMergeTree('/clickhouse/tables/{shard}/%s', '{replica}', %s)", tableName, VERSION_COLUMN));
} else
this.query.append(" Engine=ReplacingMergeTree(").append(VERSION_COLUMN).append(")");
}
if(partitionByColumn.length() > 0) {
this.query.append(Constants.PARTITION_BY).append(" ").append(partitionByColumn);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package com.altinity.clickhouse.debezium.embedded;

import com.altinity.clickhouse.debezium.embedded.cdc.DebeziumChangeEventCapture;
import com.altinity.clickhouse.debezium.embedded.ddl.parser.MySQLDDLParserService;
import com.altinity.clickhouse.debezium.embedded.parser.SourceRecordParserService;
import com.altinity.clickhouse.sink.connector.ClickHouseSinkConnectorConfig;
import com.altinity.clickhouse.sink.connector.db.BaseDbWriter;
import com.clickhouse.jdbc.ClickHouseConnection;
import org.junit.Assert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.testcontainers.Testcontainers;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.utility.DockerImageName;

import java.sql.ResultSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;

import static com.altinity.clickhouse.debezium.embedded.PostgresProperties.getDefaultProperties;

public class PostgresInitialDockerIT {

@Container
public static ClickHouseContainer clickHouseContainer = new ClickHouseContainer(DockerImageName.parse("clickhouse/clickhouse-server:latest")
.asCompatibleSubstituteFor("clickhouse"))
.withInitScript("init_clickhouse_it.sql")
.withUsername("ch_user")
.withPassword("password")
.withExposedPorts(8123);

public static DockerImageName myImage = DockerImageName.parse("debezium/postgres:15-alpine").asCompatibleSubstituteFor("postgres");

@Container
public static PostgreSQLContainer postgreSQLContainer = (PostgreSQLContainer) new PostgreSQLContainer(myImage)
.withInitScript("init_postgres.sql")
.withDatabaseName("public")
.withUsername("root")
.withPassword("root")
.withExposedPorts(5432)
.withCommand("postgres -c wal_level=logical")
.withNetworkAliases("postgres").withAccessToHost(true);



public Properties getProperties() throws Exception {

Properties properties = getDefaultProperties(postgreSQLContainer, clickHouseContainer);
properties.put("plugin.name", "decoderbufs");
properties.put("plugin.path", "/");
properties.put("table.include.list", "public.tm");
properties.put("slot.max.retries", "6");
properties.put("slot.retry.delay.ms", "5000");
properties.put("database.allowPublicKeyRetrieval", "true");
properties.put("table.include.list", "public.tm,public.tm2");

return properties;
}

@Test
@DisplayName("Integration Test - Validates PostgreSQL replication when the plugin is set to DecoderBufs")
public void testDecoderBufsPlugin() throws Exception {
Network network = Network.newNetwork();

postgreSQLContainer.withNetwork(network).start();
clickHouseContainer.withNetwork(network).start();
Thread.sleep(10000);

Testcontainers.exposeHostPorts(postgreSQLContainer.getFirstMappedPort());
AtomicReference<DebeziumChangeEventCapture> engine = new AtomicReference<>();

ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
try {

engine.set(new DebeziumChangeEventCapture());
engine.get().setup(getProperties(), new SourceRecordParserService(),
new MySQLDDLParserService(new ClickHouseSinkConnectorConfig(new HashMap<>())), false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

Thread.sleep(10000);//
Thread.sleep(50000);

String jdbcUrl = BaseDbWriter.getConnectionString(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public");
ClickHouseConnection chConn = BaseDbWriter.createConnection(jdbcUrl, "Client_1",
clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), new ClickHouseSinkConnectorConfig(new HashMap<>()));

BaseDbWriter writer = new BaseDbWriter(clickHouseContainer.getHost(), clickHouseContainer.getFirstMappedPort(),
"public", clickHouseContainer.getUsername(), clickHouseContainer.getPassword(), null, chConn);
Map<String, String> tmColumns = writer.getColumnsDataTypesForTable("tm");
Assert.assertTrue(tmColumns.size() == 22);
Assert.assertTrue(tmColumns.get("id").equalsIgnoreCase("UUID"));
Assert.assertTrue(tmColumns.get("secid").equalsIgnoreCase("Nullable(UUID)"));
//Assert.assertTrue(tmColumns.get("am").equalsIgnoreCase("Nullable(Decimal(21,5))"));
Assert.assertTrue(tmColumns.get("created").equalsIgnoreCase("Nullable(DateTime64(6))"));


int tmCount = 0;
ResultSet chRs = writer.getConnection().prepareStatement("select count(*) from tm").executeQuery();
while(chRs.next()) {
tmCount = chRs.getInt(1);
}

Assert.assertTrue(tmCount == 2);

if(engine.get() != null) {
engine.get().stop();
}
// Files.deleteIfExists(tmpFilePath);
executorService.shutdown();

}
}

0 comments on commit 23edb2b

Please sign in to comment.