From ad3dee25110c63e70617dd2341c0913cbf655e12 Mon Sep 17 00:00:00 2001 From: kanthi Subramanian Date: Mon, 24 Jul 2023 16:06:34 -0400 Subject: [PATCH] Added logic to ignore deletes --- sink-connector-lightweight/docker/mysqld.cnf | 3 ++- .../sink/connector/ClickHouseSinkConnectorConfig.java | 11 ++++++++++- .../ClickHouseSinkConnectorConfigVariables.java | 1 + .../clickhouse/sink/connector/db/DbWriter.java | 3 ++- 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/sink-connector-lightweight/docker/mysqld.cnf b/sink-connector-lightweight/docker/mysqld.cnf index 3f8f1d201..cccb59cba 100644 --- a/sink-connector-lightweight/docker/mysqld.cnf +++ b/sink-connector-lightweight/docker/mysqld.cnf @@ -1,10 +1,11 @@ [mysqld] max_connections=100000 +default_authentication_plugin=mysql_native_password gtid-mode = on enforce-gtid-consistency = true # for loading files local_infile = on # to support tables without PK -sql_generate_invisible_primary_key=1 \ No newline at end of file +sql_generate_invisible_primary_key=1 diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java index d71a07d71..58e348394 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfig.java @@ -353,7 +353,16 @@ static ConfigDef newConfigDef() { 3, ConfigDef.Width.NONE, ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()) - + .define( + ClickHouseSinkConnectorConfigVariables.IGNORE_DELETE.toString(), + Type.BOOLEAN, + false, + Importance.HIGH, + "If true, Deletes are ignored are not persisted to ClickHouse.", + CONFIG_GROUP_CONNECTOR_CONFIG, + 3, + ConfigDef.Width.NONE, + ClickHouseSinkConnectorConfigVariables.IGNORE_DELETE.toString()) // ToDo: Add JVM Proxy ; } diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java index 71ab590d6..1a7781d58 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/ClickHouseSinkConnectorConfigVariables.java @@ -3,6 +3,7 @@ public enum ClickHouseSinkConnectorConfigVariables { + IGNORE_DELETE("ignore_delete"), THREAD_POOL_SIZE("thread.pool.size"), BUFFER_COUNT("buffer.count"), DEDUPLICATION_POLICY("deduplication.policy"), diff --git a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java index 1a5ac6c8e..125389b5f 100644 --- a/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java +++ b/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java @@ -659,7 +659,8 @@ public void insertPreparedStatement(Map columnNameToIndexMap, P } // Sign column to mark deletes in ReplacingMergeTree if(this.replacingMergeTreeDeleteColumn != null && this.columnNameToDataTypeMap.containsKey(replacingMergeTreeDeleteColumn)) { - if(columnNameToIndexMap.containsKey(replacingMergeTreeDeleteColumn)) { + if(columnNameToIndexMap.containsKey(replacingMergeTreeDeleteColumn) && + this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.IGNORE_DELETE.toString()) == false) { if (record.getCdcOperation().getOperation().equalsIgnoreCase(ClickHouseConverter.CDC_OPERATION.DELETE.getOperation())) { if(replacingMergeTreeWithIsDeletedColumn) ps.setInt(columnNameToIndexMap.get(replacingMergeTreeDeleteColumn), 1);