Skip to content

Commit ae981df

Browse files
ic4yhailin0
andauthored
[feature][connector] add mysql cdc reader (#3455)
* [feature][connector] add mysql cdc reader * Update seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml Co-authored-by: hailin0 <hailin088@gmail.com> Co-authored-by: hailin0 <hailin088@gmail.com>
1 parent e294ce2 commit ae981df

File tree

22 files changed

+2809
-50
lines changed

22 files changed

+2809
-50
lines changed

LICENSE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connec
216216
seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/source/reader/ from https://github.com/apache/flink
217217
seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/source/enumerator/ from https://github.com/apache/iceberg
218218
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/base from https://github.com/ververica/flink-cdc-connectors
219+
seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql from https://github.com/ververica/flink-cdc-connectors
219220
seatunnel-connectors-v2/connector-cdc/connector-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium from https://github.com/ververica/flink-cdc-connectors
220221
generate_client_protocol.sh from https://github.com/hazelcast/hazelcast
221222
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/ExceptionUtil.java from https://github.com/hazelcast/hazelcast

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/pom.xml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
<artifactId>connector-cdc-base</artifactId>
3030

3131
<properties>
32-
<debezium.version>1.6.4.Final</debezium.version>
3332
<hikaricp.version>4.0.3</hikaricp.version>
3433
</properties>
3534

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.seatunnel.connectors.cdc.base.config;
19+
20+
import org.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
21+
import org.seatunnel.connectors.cdc.base.option.SourceOptions;
22+
23+
import java.time.Duration;
24+
import java.util.Arrays;
25+
import java.util.List;
26+
import java.util.Properties;
27+
28+
/** A {@link SourceConfig.Factory} to provide {@link SourceConfig} of JDBC data source. */
29+
@SuppressWarnings("checkstyle:MagicNumber")
30+
public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<JdbcSourceConfig> {
31+
32+
private static final long serialVersionUID = 1L;
33+
34+
protected int port;
35+
protected String hostname;
36+
protected String username;
37+
protected String password;
38+
protected List<String> databaseList;
39+
protected List<String> tableList;
40+
protected StartupConfig startupConfig;
41+
protected StopConfig stopConfig;
42+
protected boolean includeSchemaChanges = false;
43+
protected double distributionFactorUpper = 1000.0d;
44+
protected double distributionFactorLower = 0.05d;
45+
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
46+
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
47+
protected String serverTimeZone = JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
48+
protected Duration connectTimeout = JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue();
49+
protected int connectMaxRetries = JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
50+
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
51+
protected Properties dbzProperties;
52+
53+
/** Integer port number of the database server. */
54+
public JdbcSourceConfigFactory hostname(String hostname) {
55+
this.hostname = hostname;
56+
return this;
57+
}
58+
59+
/** Integer port number of the database server. */
60+
public JdbcSourceConfigFactory port(int port) {
61+
this.port = port;
62+
return this;
63+
}
64+
65+
/**
66+
* An optional list of regular expressions that match database names to be monitored; any
67+
* database name not included in the whitelist will be excluded from monitoring. By default all
68+
* databases will be monitored.
69+
*/
70+
public JdbcSourceConfigFactory databaseList(String... databaseList) {
71+
this.databaseList = Arrays.asList(databaseList);
72+
return this;
73+
}
74+
75+
/**
76+
* An optional list of regular expressions that match fully-qualified table identifiers for
77+
* tables to be monitored; any table not included in the list will be excluded from monitoring.
78+
* Each identifier is of the form databaseName.tableName. by default the connector will monitor
79+
* every non-system table in each monitored database.
80+
*/
81+
public JdbcSourceConfigFactory tableList(String... tableList) {
82+
this.tableList = Arrays.asList(tableList);
83+
return this;
84+
}
85+
86+
/** Name of the user to use when connecting to the database server. */
87+
public JdbcSourceConfigFactory username(String username) {
88+
this.username = username;
89+
return this;
90+
}
91+
92+
/** Password to use when connecting to the database server. */
93+
public JdbcSourceConfigFactory password(String password) {
94+
this.password = password;
95+
return this;
96+
}
97+
98+
/**
99+
* The session time zone in database server, e.g. "America/Los_Angeles". It controls how the
100+
* TIMESTAMP type converted to STRING. See more
101+
* https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-temporal-types
102+
*/
103+
public JdbcSourceConfigFactory serverTimeZone(String timeZone) {
104+
this.serverTimeZone = timeZone;
105+
return this;
106+
}
107+
108+
/**
109+
* The split size (number of rows) of table snapshot, captured tables are split into multiple
110+
* splits when read the snapshot of table.
111+
*/
112+
public JdbcSourceConfigFactory splitSize(int splitSize) {
113+
this.splitSize = splitSize;
114+
return this;
115+
}
116+
117+
/**
118+
* The upper bound of split key evenly distribution factor, the factor is used to determine
119+
* whether the table is evenly distribution or not.
120+
*/
121+
public JdbcSourceConfigFactory distributionFactorUpper(double distributionFactorUpper) {
122+
this.distributionFactorUpper = distributionFactorUpper;
123+
return this;
124+
}
125+
126+
/**
127+
* The lower bound of split key evenly distribution factor, the factor is used to determine
128+
* whether the table is evenly distribution or not.
129+
*/
130+
public JdbcSourceConfigFactory distributionFactorLower(double distributionFactorLower) {
131+
this.distributionFactorLower = distributionFactorLower;
132+
return this;
133+
}
134+
135+
/** The maximum fetch size for per poll when read table snapshot. */
136+
public JdbcSourceConfigFactory fetchSize(int fetchSize) {
137+
this.fetchSize = fetchSize;
138+
return this;
139+
}
140+
141+
/**
142+
* The maximum time that the connector should wait after trying to connect to the database
143+
* server before timing out.
144+
*/
145+
public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) {
146+
this.connectTimeout = connectTimeout;
147+
return this;
148+
}
149+
150+
/** The connection pool size. */
151+
public JdbcSourceConfigFactory connectionPoolSize(int connectionPoolSize) {
152+
this.connectionPoolSize = connectionPoolSize;
153+
return this;
154+
}
155+
156+
/** The max retry times to get connection. */
157+
public JdbcSourceConfigFactory connectMaxRetries(int connectMaxRetries) {
158+
this.connectMaxRetries = connectMaxRetries;
159+
return this;
160+
}
161+
162+
/** Whether the {@link SourceConfig} should output the schema changes or not. */
163+
public JdbcSourceConfigFactory includeSchemaChanges(boolean includeSchemaChanges) {
164+
this.includeSchemaChanges = includeSchemaChanges;
165+
return this;
166+
}
167+
168+
/** The Debezium connector properties. For example, "snapshot.mode". */
169+
public JdbcSourceConfigFactory debeziumProperties(Properties properties) {
170+
this.dbzProperties = properties;
171+
return this;
172+
}
173+
174+
/** Specifies the startup options. */
175+
public JdbcSourceConfigFactory startupOptions(StartupConfig startupConfig) {
176+
this.startupConfig = startupConfig;
177+
return this;
178+
}
179+
180+
@Override
181+
public abstract JdbcSourceConfig create(int subtask);
182+
}
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.seatunnel.connectors.cdc.base.option;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
import org.seatunnel.connectors.cdc.base.source.IncrementalSource;
24+
25+
import java.time.Duration;
26+
27+
/** Configurations for {@link IncrementalSource} of JDBC data source. */
28+
@SuppressWarnings("checkstyle:MagicNumber")
29+
public class JdbcSourceOptions extends SourceOptions {
30+
31+
public static final Option<String> HOSTNAME =
32+
Options.key("hostname")
33+
.stringType()
34+
.noDefaultValue()
35+
.withDescription("IP address or hostname of the database server.");
36+
37+
public static final Option<Integer> PORT =
38+
Options.key("port")
39+
.intType()
40+
.defaultValue(3306)
41+
.withDescription("Integer port number of the database server.");
42+
43+
public static final Option<String> USERNAME =
44+
Options.key("username")
45+
.stringType()
46+
.noDefaultValue()
47+
.withDescription(
48+
"Name of the database to use when connecting to the database server.");
49+
50+
public static final Option<String> PASSWORD =
51+
Options.key("password")
52+
.stringType()
53+
.noDefaultValue()
54+
.withDescription("Password to use when connecting to the database server.");
55+
56+
public static final Option<String> DATABASE_NAME =
57+
Options.key("database-name")
58+
.stringType()
59+
.noDefaultValue()
60+
.withDescription("Database name of the database to monitor.");
61+
62+
public static final Option<String> TABLE_NAME =
63+
Options.key("table-name")
64+
.stringType()
65+
.noDefaultValue()
66+
.withDescription("Table name of the database to monitor.");
67+
68+
public static final Option<String> SERVER_TIME_ZONE =
69+
Options.key("server-time-zone")
70+
.stringType()
71+
.defaultValue("UTC")
72+
.withDescription("The session time zone in database server.");
73+
74+
public static final Option<String> SERVER_ID =
75+
Options.key("server-id")
76+
.stringType()
77+
.noDefaultValue()
78+
.withDescription(
79+
"A numeric ID or a numeric ID range of this database client, "
80+
+ "The numeric ID syntax is like '5400', the numeric ID range syntax "
81+
+ "is like '5400-5408', The numeric ID range syntax is recommended when "
82+
+ "'scan.incremental.snapshot.enabled' enabled. Every ID must be unique across all "
83+
+ "currently-running database processes in the MySQL cluster. This connector"
84+
+ " joins the MySQL cluster as another server (with this unique ID) "
85+
+ "so it can read the binlog. By default, a random number is generated between"
86+
+ " 5400 and 6400, though we recommend setting an explicit value.");
87+
88+
public static final Option<Duration> CONNECT_TIMEOUT =
89+
Options.key("connect.timeout")
90+
.durationType()
91+
.defaultValue(Duration.ofSeconds(30))
92+
.withDescription(
93+
"The maximum time that the connector should wait after trying to connect to the database server before timing out.");
94+
95+
public static final Option<Integer> CONNECTION_POOL_SIZE =
96+
Options.key("connection.pool.size")
97+
.intType()
98+
.defaultValue(20)
99+
.withDescription("The connection pool size.");
100+
101+
public static final Option<Integer> CONNECT_MAX_RETRIES =
102+
Options.key("connect.max-retries")
103+
.intType()
104+
.defaultValue(3)
105+
.withDescription(
106+
"The max retry times that the connector should retry to build database server connection.");
107+
}

0 commit comments

Comments
 (0)