Skip to content

Commit ff44db1

Browse files
authored
[feature][connector][cdc] add SeaTunnelRowDebeziumDeserializeSchema (#3499)
* [feature][connector][cdc] add SeaTunnel row Deserialization Converters * [feature][connector][mysql-cdc] add createDebeziumDeserializationSchema * [chore] fix checkstyle * [feature] fill config factory
1 parent 94b472b commit ff44db1

File tree

14 files changed

+1187
-40
lines changed

14 files changed

+1187
-40
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableSchema.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.api.table.catalog;
1919

2020
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2122

2223
import java.io.Serializable;
2324
import java.util.ArrayList;
@@ -49,6 +50,18 @@ public List<Column> getColumns() {
4950
return columns;
5051
}
5152

53+
public SeaTunnelRowType toPhysicalRowDataType() {
54+
SeaTunnelDataType<?>[] fieldTypes = columns.stream()
55+
.filter(Column::isPhysical)
56+
.map(Column::getDataType)
57+
.toArray(SeaTunnelDataType[]::new);
58+
String[] fields = columns.stream()
59+
.filter(Column::isPhysical)
60+
.map(Column::getName)
61+
.toArray(String[]::new);
62+
return new SeaTunnelRowType(fields, fieldTypes);
63+
}
64+
5265
public static final class Builder {
5366
private final List<Column> columns = new ArrayList<>();
5467

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
package org.apache.seatunnel.connectors.cdc.base.config;
1919

20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2021
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
2122
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
2223

2324
import java.time.Duration;
2425
import java.util.Arrays;
26+
import java.util.Collections;
2527
import java.util.List;
2628
import java.util.Properties;
2729

@@ -177,6 +179,26 @@ public JdbcSourceConfigFactory startupOptions(StartupConfig startupConfig) {
177179
return this;
178180
}
179181

182+
public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
183+
this.port = config.get(JdbcSourceOptions.PORT);
184+
this.hostname = config.get(JdbcSourceOptions.HOSTNAME);
185+
this.password = config.get(JdbcSourceOptions.PASSWORD);
186+
// TODO: support multi-table
187+
this.databaseList = Collections.singletonList(config.get(JdbcSourceOptions.DATABASE_NAME));
188+
this.tableList = Collections.singletonList(config.get(JdbcSourceOptions.TABLE_NAME));
189+
this.distributionFactorUpper = config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
190+
this.distributionFactorLower = config.get(JdbcSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
191+
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
192+
this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
193+
this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
194+
this.connectTimeout = config.get(JdbcSourceOptions.CONNECT_TIMEOUT);
195+
this.connectMaxRetries = config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
196+
this.connectionPoolSize = config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
197+
this.dbzProperties = new Properties();
198+
config.getOptional(SourceOptions.DEBEZIUM_PROPERTIES).ifPresent(map -> dbzProperties.putAll(map));
199+
return this;
200+
}
201+
180202
@Override
181203
public abstract JdbcSourceConfig create(int subtask);
182204
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,4 +103,26 @@ public class JdbcSourceOptions extends SourceOptions {
103103
.defaultValue(3)
104104
.withDescription(
105105
"The max retry times that the connector should retry to build database server connection.");
106+
107+
public static final Option<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND =
108+
Options.key("chunk-key.even-distribution.factor.upper-bound")
109+
.doubleType()
110+
.defaultValue(1000.0d)
111+
.withDescription(
112+
"The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the"
113+
+ " table is evenly distribution or not."
114+
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
115+
+ " and the query for splitting would happen when it is uneven."
116+
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
117+
118+
public static final Option<Double> CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND =
119+
Options.key("chunk-key.even-distribution.factor.lower-bound")
120+
.doubleType()
121+
.defaultValue(0.05d)
122+
.withDescription(
123+
"The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the"
124+
+ " table is evenly distribution or not."
125+
+ " The table chunks would use evenly calculation optimization when the data distribution is even,"
126+
+ " and the query for splitting would happen when it is uneven."
127+
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");
106128
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/SourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.seatunnel.api.configuration.Options;
2222
import org.apache.seatunnel.api.configuration.util.OptionRule;
2323

24+
import java.util.Map;
25+
2426
@SuppressWarnings("MagicNumber")
2527
public class SourceOptions {
2628

@@ -83,10 +85,16 @@ public class SourceOptions {
8385
.noDefaultValue()
8486
.withDescription("Optional offsets used in case of \"specific\" stop mode");
8587

88+
public static final Option<Map<String, String>> DEBEZIUM_PROPERTIES = Options.key("debezium")
89+
.mapType()
90+
.noDefaultValue()
91+
.withDescription("Decides if the table options contains Debezium client properties that start with prefix 'debezium'.");
92+
8693
public static final OptionRule.Builder BASE_RULE = OptionRule.builder()
8794
.optional(SNAPSHOT_SPLIT_SIZE, SNAPSHOT_FETCH_SIZE)
8895
.optional(INCREMENTAL_PARALLELISM)
8996
.optional(STARTUP_MODE, STOP_MODE)
97+
.optional(DEBEZIUM_PROPERTIES)
9098
.conditional(STARTUP_MODE, StartupMode.TIMESTAMP, STARTUP_TIMESTAMP)
9199
.conditional(STARTUP_MODE, StartupMode.SPECIFIC, STARTUP_SPECIFIC_OFFSET_FILE, STARTUP_SPECIFIC_OFFSET_POS)
92100
.conditional(STOP_MODE, StopMode.TIMESTAMP, STOP_TIMESTAMP)
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.debezium;
19+
20+
import org.apache.kafka.connect.data.Schema;
21+
22+
import java.io.Serializable;
23+
24+
/**
25+
* Runtime converter that converts objects of Debezium into objects of internal data structures.
26+
*/
27+
@FunctionalInterface
28+
public interface DebeziumDeserializationConverter extends Serializable {
29+
Object convert(Object dbzObj, Schema schema) throws Exception;
30+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.debezium;
19+
20+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
21+
22+
import java.io.Serializable;
23+
import java.time.ZoneId;
24+
import java.util.Optional;
25+
26+
/**
27+
* Factory to create {@link DebeziumDeserializationConverter} according to {@link SeaTunnelDataType}. It's
28+
* usually used to create a user-defined {@link DebeziumDeserializationConverter} which has a higher
29+
* resolve order than default converter.
30+
*/
31+
public interface DebeziumDeserializationConverterFactory extends Serializable {
32+
33+
/**
34+
* A user-defined converter factory which always fallback to default converters.
35+
*/
36+
DebeziumDeserializationConverterFactory DEFAULT =
37+
(logicalType, serverTimeZone) -> Optional.empty();
38+
39+
/**
40+
* Returns an optional {@link DebeziumDeserializationConverter}. Returns {@link Optional#empty()}
41+
* if fallback to default converter.
42+
*
43+
* @param type the SeaTunnel datatype to be converted from objects of Debezium
44+
* @param serverTimeZone TimeZone used to convert data with timestamp type
45+
*/
46+
Optional<DebeziumDeserializationConverter> createUserDefinedConverter(
47+
SeaTunnelDataType<?> type, ZoneId serverTimeZone);
48+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.cdc.debezium;
19+
20+
import org.apache.kafka.connect.source.SourceRecord;
21+
22+
import java.io.Serializable;
23+
24+
/**
25+
* {@link SourceRecord} metadata info converter.
26+
*/
27+
@FunctionalInterface
28+
public interface MetadataConverter extends Serializable {
29+
Object read(SourceRecord record);
30+
}

0 commit comments

Comments
 (0)