Skip to content

Commit

Permalink
Merge pull request #282 from Altinity/ignore_deletes
Browse files Browse the repository at this point in the history
Added logic to ignore deletes
  • Loading branch information
subkanthi committed Aug 3, 2023
2 parents dae79fa + ad3dee2 commit bfa9bd4
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 3 deletions.
3 changes: 2 additions & 1 deletion sink-connector-lightweight/docker/mysqld.cnf
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
[mysqld]
max_connections=100000

default_authentication_plugin=mysql_native_password

gtid-mode = on
enforce-gtid-consistency = true
# for loading files
local_infile = on
# to support tables without PK
sql_generate_invisible_primary_key=1
sql_generate_invisible_primary_key=1
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,16 @@ static ConfigDef newConfigDef() {
3,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString())

.define(
ClickHouseSinkConnectorConfigVariables.IGNORE_DELETE.toString(),
Type.BOOLEAN,
false,
Importance.HIGH,
"If true, Deletes are ignored are not persisted to ClickHouse.",
CONFIG_GROUP_CONNECTOR_CONFIG,
3,
ConfigDef.Width.NONE,
ClickHouseSinkConnectorConfigVariables.IGNORE_DELETE.toString())
// ToDo: Add JVM Proxy
;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

public enum ClickHouseSinkConnectorConfigVariables {

IGNORE_DELETE("ignore_delete"),
THREAD_POOL_SIZE("thread.pool.size"),
BUFFER_COUNT("buffer.count"),
DEDUPLICATION_POLICY("deduplication.policy"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,8 @@ public void insertPreparedStatement(Map<String, Integer> columnNameToIndexMap, P
}
// Sign column to mark deletes in ReplacingMergeTree
if(this.replacingMergeTreeDeleteColumn != null && this.columnNameToDataTypeMap.containsKey(replacingMergeTreeDeleteColumn)) {
if(columnNameToIndexMap.containsKey(replacingMergeTreeDeleteColumn)) {
if(columnNameToIndexMap.containsKey(replacingMergeTreeDeleteColumn) &&
this.config.getBoolean(ClickHouseSinkConnectorConfigVariables.IGNORE_DELETE.toString()) == false) {
if (record.getCdcOperation().getOperation().equalsIgnoreCase(ClickHouseConverter.CDC_OPERATION.DELETE.getOperation())) {
if(replacingMergeTreeWithIsDeletedColumn)
ps.setInt(columnNameToIndexMap.get(replacingMergeTreeDeleteColumn), 1);
Expand Down

0 comments on commit bfa9bd4

Please sign in to comment.