Skip to content

Commit 6093c21

Browse files
authored
[feature][connector-v2][clickhouse] Support write cdc changelog event in clickhouse sink (#3653)
* [feature][connector-v2][clickhouse] Support write cdc changelog event in clickhouse sink MergeTree Table Engine: - Support enable `allowExperimentalLightweightDelete` setting - CDC Events: - INSERT: INSERT INTO SQL - UPDATE_BEFORE: DELETE SQL - UPDATE_AFTER: INSERT INTO SQL - DELETE: DELETE SQL - ReplacingMergeTree Engine CDC Events: - INSERT: INSERT INTO SQL - UPDATE_BEFORE: ignore - UPDATE_AFTER: INSERT INTO SQL - DELETE: DELETE SQL Other Table Engine: - CDC Events: - INSERT: INSERT INTO SQL - UPDATE_BEFORE: ignore - UPDATE_AFTER: ALTER TABLE UPDATE SQL - DELETE: ALTER TABLE DELETE SQL * update * fix InsertOrUpdateBatchStatementExecutor * add check
1 parent fbce927 commit 6093c21

21 files changed

+1111
-353
lines changed

docs/en/connector-v2/sink/Clickhouse.md

Lines changed: 72 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Used to write data to Clickhouse.
1313
The Clickhouse sink plug-in can achieve accuracy once by implementing idempotent writing, and needs to cooperate with aggregatingmergetree and other engines that support deduplication.
1414

1515
- [ ] [schema projection](../../concept/connector-v2-features.md)
16+
- [x] [cdc](../../concept/connector-v2-features.md)
1617

1718
:::tip
1819

@@ -22,19 +23,22 @@ Write data to Clickhouse can also be done using JDBC
2223

2324
## Options
2425

25-
| name | type | required | default value |
26-
|----------------|--------|----------|---------------|
27-
| host | string | yes | - |
28-
| database | string | yes | - |
29-
| table | string | yes | - |
30-
| username | string | yes | - |
31-
| password | string | yes | - |
32-
| fields | string | yes | - |
33-
| clickhouse.* | string | no | |
34-
| bulk_size | string | no | 20000 |
35-
| split_mode | string | no | false |
36-
| sharding_key | string | no | - |
37-
| common-options | | no | - |
26+
| name | type | required | default value |
27+
|---------------------------------------|---------|----------|---------------|
28+
| host | string | yes | - |
29+
| database | string | yes | - |
30+
| table | string | yes | - |
31+
| username | string | yes | - |
32+
| password | string | yes | - |
33+
| fields | string | yes | - |
34+
| clickhouse.* | string | no | |
35+
| bulk_size | string | no | 20000 |
36+
| split_mode | string | no | false |
37+
| sharding_key | string | no | - |
38+
| primary_key | string | no | - |
39+
| support_upsert | boolean | no | false |
40+
| allow_experimental_lightweight_delete | boolean | no | false |
41+
| common-options | | no | - |
3842

3943
### host [string]
4044

@@ -82,39 +86,90 @@ When use split_mode, which node to send data to is a problem, the default is ran
8286
'sharding_key' parameter can be used to specify the field for the sharding algorithm. This option only
8387
worked when 'split_mode' is true.
8488

89+
### primary_key [string]
90+
91+
Mark the primary key column from clickhouse table, and based on primary key execute INSERT/UPDATE/DELETE to clickhouse table
92+
93+
### support_upsert [boolean]
94+
95+
Support upsert row by query primary key
96+
97+
### allow_experimental_lightweight_delete [boolean]
98+
99+
Allow experimental lightweight delete based on `*MergeTree` table engine
100+
85101
### common options
86102

87103
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
88104

89105
## Examples
90106

107+
Simple
108+
91109
```hocon
92110
sink {
111+
Clickhouse {
112+
host = "localhost:8123"
113+
database = "default"
114+
table = "fake_all"
115+
username = "default"
116+
password = ""
117+
}
118+
}
119+
```
120+
121+
Split mode
93122

123+
```hocon
124+
sink {
94125
Clickhouse {
95126
host = "localhost:8123"
96127
database = "default"
97128
table = "fake_all"
98129
username = "default"
99130
password = ""
131+
132+
# split mode options
100133
split_mode = true
101134
sharding_key = "age"
102135
}
103-
104136
}
105137
```
106138

139+
CDC(Change data capture)
140+
107141
```hocon
108142
sink {
143+
Clickhouse {
144+
host = "localhost:8123"
145+
database = "default"
146+
table = "fake_all"
147+
username = "default"
148+
password = ""
149+
150+
# cdc options
151+
primary_key = "id"
152+
support_upsert = true
153+
}
154+
}
155+
```
109156

157+
CDC(Change data capture) for *MergeTree engine
158+
159+
```hocon
160+
sink {
110161
Clickhouse {
111162
host = "localhost:8123"
112163
database = "default"
113164
table = "fake_all"
114165
username = "default"
115166
password = ""
167+
168+
# cdc options
169+
primary_key = "id"
170+
support_upsert = true
171+
allow_experimental_lightweight_delete = true
116172
}
117-
118173
}
119174
```
120175

@@ -132,3 +187,5 @@ sink {
132187
- [Improve] Clickhouse Sink support nest type and array type([3047](https://github.com/apache/incubator-seatunnel/pull/3047))
133188

134189
- [Improve] Clickhouse Sink support geo type([3141](https://github.com/apache/incubator-seatunnel/pull/3141))
190+
191+
- [Feature] Support CDC write DELETE/UPDATE/INSERT events ([3653](https://github.com/apache/incubator-seatunnel/pull/3653))

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseConfig.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,25 @@ public class ClickhouseConfig {
8181
/**
8282
* When split_mode is true, the sharding_key use for split
8383
*/
84-
public static final Option<String> SHARDING_KEY = Options.key("sharding_key").stringType()
85-
.noDefaultValue().withDescription("When split_mode is true, the sharding_key use for split");
84+
public static final Option<String> SHARDING_KEY = Options.key("sharding_key")
85+
.stringType()
86+
.noDefaultValue()
87+
.withDescription("When split_mode is true, the sharding_key use for split");
88+
89+
public static final Option<String> PRIMARY_KEY = Options.key("primary_key")
90+
.stringType()
91+
.noDefaultValue()
92+
.withDescription("Mark the primary key column from clickhouse table, and based on primary key execute INSERT/UPDATE/DELETE to clickhouse table");
93+
94+
public static final Option<Boolean> SUPPORT_UPSERT = Options.key("support_upsert")
95+
.booleanType()
96+
.defaultValue(false)
97+
.withDescription("Support upsert row by query primary key");
98+
99+
public static final Option<Boolean> ALLOW_EXPERIMENTAL_LIGHTWEIGHT_DELETE = Options.key("allow_experimental_lightweight_delete")
100+
.booleanType()
101+
.defaultValue(false)
102+
.withDescription("Allow experimental lightweight delete based on `*MergeTree` table engine");
86103

87104
/**
88105
* ClickhouseFile sink connector used clickhouse-local program's path

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ReaderOption.java

Lines changed: 11 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -20,75 +20,28 @@
2020
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2121
import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
2222

23+
import lombok.Builder;
24+
import lombok.Getter;
25+
import lombok.Setter;
26+
2327
import java.io.Serializable;
2428
import java.util.List;
2529
import java.util.Map;
2630
import java.util.Properties;
2731

32+
@Builder
33+
@Getter
2834
public class ReaderOption implements Serializable {
2935

3036
private ShardMetadata shardMetadata;
3137
private List<String> fields;
32-
38+
private String[] primaryKeys;
39+
private boolean allowExperimentalLightweightDelete;
40+
private boolean supportUpsert;
41+
private String tableEngine;
3342
private Map<String, String> tableSchema;
43+
@Setter
3444
private SeaTunnelRowType seaTunnelRowType;
3545
private Properties properties;
3646
private int bulkSize;
37-
38-
public ReaderOption(ShardMetadata shardMetadata,
39-
Properties properties, List<String> fields, Map<String, String> tableSchema, int bulkSize) {
40-
this.shardMetadata = shardMetadata;
41-
this.properties = properties;
42-
this.fields = fields;
43-
this.tableSchema = tableSchema;
44-
this.bulkSize = bulkSize;
45-
}
46-
47-
public Properties getProperties() {
48-
return properties;
49-
}
50-
51-
public void setProperties(Properties properties) {
52-
this.properties = properties;
53-
}
54-
55-
public ShardMetadata getShardMetadata() {
56-
return shardMetadata;
57-
}
58-
59-
public void setShardMetadata(ShardMetadata shardMetadata) {
60-
this.shardMetadata = shardMetadata;
61-
}
62-
63-
public SeaTunnelRowType getSeaTunnelRowType() {
64-
return seaTunnelRowType;
65-
}
66-
67-
public void setSeaTunnelRowType(SeaTunnelRowType seaTunnelRowType) {
68-
this.seaTunnelRowType = seaTunnelRowType;
69-
}
70-
71-
public Map<String, String> getTableSchema() {
72-
return tableSchema;
73-
}
74-
75-
public void setTableSchema(Map<String, String> tableSchema) {
76-
this.tableSchema = tableSchema;
77-
}
78-
79-
public List<String> getFields() {
80-
return fields;
81-
}
82-
83-
public void setFields(List<String> fields) {
84-
this.fields = fields;
85-
}
86-
87-
public int getBulkSize() {
88-
return bulkSize;
89-
}
90-
91-
public void setBulkSize(int bulkSize) {
92-
this.bulkSize = bulkSize;
93-
}
9447
}

0 commit comments

Comments
 (0)