Skip to content

Commit 934434c

Browse files
happyboy1024happyboy1024
andauthored
[Feature][Connector-V2] Support write cdc changelog event into hudi sink (#7845)
Co-authored-by: happyboy1024 <296442618@qq.com>
1 parent 63c7b4e commit 934434c

File tree

36 files changed

+927
-319
lines changed

36 files changed

+927
-319
lines changed

docs/en/connector-v2/sink/Hudi.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Used to write data to Hudi.
88

99
## Key features
1010

11-
- [x] [exactly-once](../../concept/connector-v2-features.md)
11+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
1212
- [x] [cdc](../../concept/connector-v2-features.md)
1313
- [x] [support multiple table write](../../concept/connector-v2-features.md)
1414

@@ -21,7 +21,6 @@ Base configuration:
2121
| table_dfs_path | string | yes | - |
2222
| conf_files_path | string | no | - |
2323
| table_list | Array | no | - |
24-
| auto_commit | boolean | no | true |
2524
| schema_save_mode | enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST|
2625
| common-options | Config | no | - |
2726

@@ -44,6 +43,7 @@ Table list configuration:
4443
| index_type | enum | no | BLOOM |
4544
| index_class_name | string | no | - |
4645
| record_byte_size | Int | no | 1024 |
46+
| cdc_enabled | boolean| no | false |
4747

4848
Note: When this configuration corresponds to a single table, you can flatten the configuration items in table_list to the outer layer.
4949

@@ -115,9 +115,9 @@ Note: When this configuration corresponds to a single table, you can flatten the
115115

116116
`max_commits_to_keep` The max commits to keep of hudi table.
117117

118-
### auto_commit [boolean]
118+
### cdc_enabled [boolean]
119119

120-
`auto_commit` Automatic transaction commit is enabled by default.
120+
`cdc_enabled` Whether to persist the CDC change log. When enable, persist the change data if necessary, and the table can be queried as a CDC query mode.
121121

122122
### schema_save_mode [Enum]
123123

docs/zh/connector-v2/sink/Hudi.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
## 主要特点
1010

11-
- [x] [exactly-once](../../concept/connector-v2-features.md)
11+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
1212
- [x] [cdc](../../concept/connector-v2-features.md)
1313
- [x] [support multiple table write](../../concept/connector-v2-features.md)
1414

@@ -21,7 +21,6 @@
2121
| table_dfs_path | string || - |
2222
| conf_files_path | string || - |
2323
| table_list | string || - |
24-
| auto_commit | boolean|| true |
2524
| schema_save_mode | enum || CREATE_SCHEMA_WHEN_NOT_EXIST |
2625
| common-options | config || - |
2726

@@ -44,6 +43,7 @@
4443
| index_type | enum | no | BLOOM |
4544
| index_class_name | string | no | - |
4645
| record_byte_size | Int | no | 1024 |
46+
| cdc_enabled | boolean| no | false |
4747

4848
注意: 当此配置对应于单个表时,您可以将table_list中的配置项展平到外层。
4949

@@ -115,9 +115,9 @@
115115

116116
`max_commits_to_keep` Hudi 表保留的最多提交数。
117117

118-
### auto_commit [boolean]
118+
### cdc_enabled [boolean]
119119

120-
`auto_commit` 是否自动提交.
120+
`cdc_enabled` 是否持久化Hudi表的CDC变更日志。启用后,在必要时持久化更改数据,表可以作为CDC模式进行查询.
121121

122122
### schema_save_mode [Enum]
123123

seatunnel-connectors-v2/connector-hudi/pom.xml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,27 @@
102102

103103
</dependencies>
104104

105+
<build>
106+
<plugins>
107+
<plugin>
108+
<artifactId>maven-shade-plugin</artifactId>
109+
<executions>
110+
<execution>
111+
<goals>
112+
<goal>shade</goal>
113+
</goals>
114+
<phase>package</phase>
115+
<configuration>
116+
<relocations>
117+
<relocation>
118+
<pattern>org.apache.avro</pattern>
119+
<shadedPattern>${seatunnel.shade.package}.${connector.name}.org.apache.avro</shadedPattern>
120+
</relocation>
121+
</relocations>
122+
</configuration>
123+
</execution>
124+
</executions>
125+
</plugin>
126+
</plugins>
127+
</build>
105128
</project>

seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.apache.hadoop.fs.FileStatus;
3636
import org.apache.hadoop.fs.FileSystem;
3737
import org.apache.hadoop.fs.Path;
38+
import org.apache.hudi.avro.AvroSchemaUtils;
3839
import org.apache.hudi.common.model.HoodieAvroPayload;
3940
import org.apache.hudi.common.model.HoodieTableType;
4041
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -53,6 +54,7 @@
5354
import java.util.stream.Collectors;
5455

5556
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
57+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
5658
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.RECORD_KEY_FIELDS;
5759
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.TABLE_TYPE;
5860
import static org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSchemaConverter.convertToSchema;
@@ -195,6 +197,7 @@ public CatalogTable getTable(TablePath tablePath)
195197
String.join(",", tableConfig.getRecordKeyFields().get()));
196198
}
197199
options.put(TABLE_TYPE.key(), tableType.name());
200+
options.put(CDC_ENABLED.key(), String.valueOf(tableConfig.isCDCEnabled()));
198201
return CatalogTable.of(
199202
TableIdentifier.of(
200203
catalogName, tablePath.getDatabaseName(), tablePath.getTableName()),
@@ -218,10 +221,16 @@ public void createTable(TablePath tablePath, CatalogTable table, boolean ignoreI
218221
.setTableType(table.getOptions().get(TABLE_TYPE.key()))
219222
.setRecordKeyFields(table.getOptions().get(RECORD_KEY_FIELDS.key()))
220223
.setTableCreateSchema(
221-
convertToSchema(table.getSeaTunnelRowType()).toString())
224+
convertToSchema(
225+
table.getSeaTunnelRowType(),
226+
AvroSchemaUtils.getAvroRecordQualifiedName(
227+
table.getTableId().getTableName()))
228+
.toString())
222229
.setTableName(tablePath.getTableName())
223230
.setPartitionFields(String.join(",", table.getPartitionKeys()))
224231
.setPayloadClassName(HoodieAvroPayload.class.getName())
232+
.setCDCEnabled(
233+
Boolean.parseBoolean(table.getOptions().get(CDC_ENABLED.key())))
225234
.initTable(new HadoopStorageConfiguration(hadoopConf), tablePathStr);
226235
}
227236
} catch (IOException e) {

seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,6 @@ public interface HudiOptions {
4444
.noDefaultValue()
4545
.withDescription("table_list");
4646

47-
Option<Boolean> AUTO_COMMIT =
48-
Options.key("auto_commit")
49-
.booleanType()
50-
.defaultValue(true)
51-
.withDescription("auto commit");
52-
5347
Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
5448
Options.key("schema_save_mode")
5549
.enumType(SchemaSaveMode.class)

seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,12 @@ public class HudiSinkConfig implements Serializable {
4040

4141
private String confFilesPath;
4242

43-
private boolean autoCommit;
44-
4543
private SchemaSaveMode schemaSaveMode;
4644

4745
private DataSaveMode dataSaveMode;
4846

4947
public static HudiSinkConfig of(ReadonlyConfig config) {
5048
Builder builder = HudiSinkConfig.builder();
51-
Optional<Boolean> optionalAutoCommit = config.getOptional(HudiOptions.AUTO_COMMIT);
5249
Optional<SchemaSaveMode> optionalSchemaSaveMode =
5350
config.getOptional(HudiOptions.SCHEMA_SAVE_MODE);
5451
Optional<DataSaveMode> optionalDataSaveMode =
@@ -58,7 +55,6 @@ public static HudiSinkConfig of(ReadonlyConfig config) {
5855
builder.confFilesPath(config.get(HudiOptions.CONF_FILES_PATH));
5956
builder.tableList(HudiTableConfig.of(config));
6057

61-
builder.autoCommit(optionalAutoCommit.orElseGet(HudiOptions.AUTO_COMMIT::defaultValue));
6258
builder.schemaSaveMode(
6359
optionalSchemaSaveMode.orElseGet(HudiOptions.SCHEMA_SAVE_MODE::defaultValue));
6460
builder.dataSaveMode(

seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_INTERVAL_MS;
4242
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_SIZE;
43+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
4344
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.DATABASE;
4445
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_CLASS_NAME;
4546
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_TYPE;
@@ -108,6 +109,9 @@ public HudiTableConfig() {}
108109
@JsonProperty("max_commits_to_keep")
109110
private int maxCommitsToKeep;
110111

112+
@JsonProperty("cdc_enabled")
113+
private boolean cdcEnabled;
114+
111115
public static List<HudiTableConfig> of(ReadonlyConfig connectorConfig) {
112116
List<HudiTableConfig> tableList;
113117
if (connectorConfig.getOptional(HudiOptions.TABLE_LIST).isPresent()) {
@@ -132,6 +136,7 @@ public static List<HudiTableConfig> of(ReadonlyConfig connectorConfig) {
132136
connectorConfig.get(UPSERT_SHUFFLE_PARALLELISM))
133137
.minCommitsToKeep(connectorConfig.get(MIN_COMMITS_TO_KEEP))
134138
.maxCommitsToKeep(connectorConfig.get(MAX_COMMITS_TO_KEEP))
139+
.cdcEnabled(connectorConfig.get(CDC_ENABLED))
135140
.build();
136141
tableList = Collections.singletonList(hudiTableConfig);
137142
}

seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,13 @@ public interface HudiTableOptions {
4646
.defaultValue(HoodieTableType.COPY_ON_WRITE)
4747
.withDescription("hudi table type");
4848

49+
Option<Boolean> CDC_ENABLED =
50+
Options.key("cdc_enabled")
51+
.booleanType()
52+
.defaultValue(false)
53+
.withDescription(
54+
"When enable, persist the change data if necessary, and can be queried as a CDC query mode.");
55+
4956
Option<String> RECORD_KEY_FIELDS =
5057
Options.key("record_key_fields")
5158
.stringType()
@@ -76,7 +83,7 @@ public interface HudiTableOptions {
7683
Options.key("record_byte_size")
7784
.intType()
7885
.defaultValue(1024)
79-
.withDescription("auto commit");
86+
.withDescription("The byte size of each record");
8087

8188
Option<WriteOperationType> OP_TYPE =
8289
Options.key("op_type")

seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSink.java

Lines changed: 2 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import org.apache.seatunnel.api.sink.DefaultSaveModeHandler;
2525
import org.apache.seatunnel.api.sink.SaveModeHandler;
2626
import org.apache.seatunnel.api.sink.SeaTunnelSink;
27-
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
2827
import org.apache.seatunnel.api.sink.SinkWriter;
2928
import org.apache.seatunnel.api.sink.SupportMultiTableSink;
3029
import org.apache.seatunnel.api.sink.SupportSaveMode;
@@ -38,14 +37,12 @@
3837
import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkConfig;
3938
import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableConfig;
4039
import org.apache.seatunnel.connectors.seatunnel.hudi.exception.HudiConnectorException;
41-
import org.apache.seatunnel.connectors.seatunnel.hudi.sink.commiter.HudiSinkAggregatedCommitter;
4240
import org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiAggregatedCommitInfo;
4341
import org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiCommitInfo;
4442
import org.apache.seatunnel.connectors.seatunnel.hudi.sink.state.HudiSinkState;
4543
import org.apache.seatunnel.connectors.seatunnel.hudi.sink.writer.HudiSinkWriter;
4644

4745
import java.io.IOException;
48-
import java.util.ArrayList;
4946
import java.util.List;
5047
import java.util.Optional;
5148

@@ -82,15 +79,13 @@ public String getPluginName() {
8279

8380
@Override
8481
public HudiSinkWriter createWriter(SinkWriter.Context context) throws IOException {
85-
return new HudiSinkWriter(
86-
context, seaTunnelRowType, hudiSinkConfig, hudiTableConfig, new ArrayList<>());
82+
return new HudiSinkWriter(context, seaTunnelRowType, hudiSinkConfig, hudiTableConfig);
8783
}
8884

8985
@Override
9086
public SinkWriter<SeaTunnelRow, HudiCommitInfo, HudiSinkState> restoreWriter(
9187
SinkWriter.Context context, List<HudiSinkState> states) throws IOException {
92-
return new HudiSinkWriter(
93-
context, seaTunnelRowType, hudiSinkConfig, hudiTableConfig, states);
88+
return SeaTunnelSink.super.restoreWriter(context, states);
9489
}
9590

9691
@Override
@@ -103,18 +98,6 @@ public Optional<Serializer<HudiCommitInfo>> getCommitInfoSerializer() {
10398
return Optional.of(new DefaultSerializer<>());
10499
}
105100

106-
@Override
107-
public Optional<SinkAggregatedCommitter<HudiCommitInfo, HudiAggregatedCommitInfo>>
108-
createAggregatedCommitter() throws IOException {
109-
return Optional.of(
110-
new HudiSinkAggregatedCommitter(hudiTableConfig, hudiSinkConfig, seaTunnelRowType));
111-
}
112-
113-
@Override
114-
public Optional<Serializer<HudiAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
115-
return Optional.of(new DefaultSerializer<>());
116-
}
117-
118101
@Override
119102
public Optional<SaveModeHandler> getSaveModeHandler() {
120103
TablePath tablePath =

seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/sink/HudiSinkFactory.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,12 @@
3737
import java.util.List;
3838
import java.util.Optional;
3939

40-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.AUTO_COMMIT;
4140
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.CONF_FILES_PATH;
4241
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_DFS_PATH;
4342
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_LIST;
4443
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_INTERVAL_MS;
4544
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_SIZE;
45+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
4646
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_CLASS_NAME;
4747
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_TYPE;
4848
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INSERT_SHUFFLE_PARALLELISM;
@@ -85,7 +85,7 @@ public OptionRule optionRule() {
8585
UPSERT_SHUFFLE_PARALLELISM,
8686
MIN_COMMITS_TO_KEEP,
8787
MAX_COMMITS_TO_KEEP,
88-
AUTO_COMMIT,
88+
CDC_ENABLED,
8989
SinkCommonOptions.MULTI_TABLE_SINK_REPLICA)
9090
.build();
9191
}
@@ -121,6 +121,10 @@ public TableSink createSink(TableSinkFactoryContext context) {
121121
}
122122
// table type
123123
catalogTable.getOptions().put(TABLE_TYPE.key(), hudiTableConfig.getTableType().name());
124+
// cdc enabled
125+
catalogTable
126+
.getOptions()
127+
.put(CDC_ENABLED.key(), String.valueOf(hudiTableConfig.isCdcEnabled()));
124128
catalogTable =
125129
CatalogTable.of(
126130
newTableId,

0 commit comments

Comments
 (0)