Skip to content

Commit 356538d

Browse files
authored
[Improve][CDC] Add mysql-cdc source factory (#3791)
* [Improve][CDC] Add mysql-cdc source factory * update CONNECT_TIMEOUT_MS options data type
1 parent adffb2d commit 356538d

File tree

11 files changed

+106
-28
lines changed

11 files changed

+106
-28
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfig.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import io.debezium.relational.RelationalDatabaseConnectorConfig;
2323

24-
import java.time.Duration;
2524
import java.util.List;
2625
import java.util.Properties;
2726

@@ -39,7 +38,7 @@ public abstract class JdbcSourceConfig extends BaseSourceConfig {
3938
protected final List<String> tableList;
4039
protected final int fetchSize;
4140
protected final String serverTimeZone;
42-
protected final Duration connectTimeout;
41+
protected final long connectTimeoutMillis;
4342
protected final int connectMaxRetries;
4443
protected final int connectionPoolSize;
4544

@@ -59,7 +58,7 @@ public JdbcSourceConfig(
5958
String password,
6059
int fetchSize,
6160
String serverTimeZone,
62-
Duration connectTimeout,
61+
long connectTimeoutMillis,
6362
int connectMaxRetries,
6463
int connectionPoolSize) {
6564
super(
@@ -78,7 +77,7 @@ public JdbcSourceConfig(
7877
this.tableList = tableList;
7978
this.fetchSize = fetchSize;
8079
this.serverTimeZone = serverTimeZone;
81-
this.connectTimeout = connectTimeout;
80+
this.connectTimeoutMillis = connectTimeoutMillis;
8281
this.connectMaxRetries = connectMaxRetries;
8382
this.connectionPoolSize = connectionPoolSize;
8483
}
@@ -121,8 +120,8 @@ public String getServerTimeZone() {
121120
return serverTimeZone;
122121
}
123122

124-
public Duration getConnectTimeout() {
125-
return connectTimeout;
123+
public long getConnectTimeoutMillis() {
124+
return connectTimeoutMillis;
126125
}
127126

128127
public int getConnectMaxRetries() {

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/config/JdbcSourceConfigFactory.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
2222
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
2323

24-
import java.time.Duration;
2524
import java.util.Arrays;
2625
import java.util.Collections;
2726
import java.util.List;
@@ -47,7 +46,7 @@ public abstract class JdbcSourceConfigFactory implements SourceConfig.Factory<Jd
4746
protected int splitSize = SourceOptions.SNAPSHOT_SPLIT_SIZE.defaultValue();
4847
protected int fetchSize = SourceOptions.SNAPSHOT_FETCH_SIZE.defaultValue();
4948
protected String serverTimeZone = JdbcSourceOptions.SERVER_TIME_ZONE.defaultValue();
50-
protected Duration connectTimeout = JdbcSourceOptions.CONNECT_TIMEOUT.defaultValue();
49+
protected long connectTimeoutMillis = JdbcSourceOptions.CONNECT_TIMEOUT_MS.defaultValue();
5150
protected int connectMaxRetries = JdbcSourceOptions.CONNECT_MAX_RETRIES.defaultValue();
5251
protected int connectionPoolSize = JdbcSourceOptions.CONNECTION_POOL_SIZE.defaultValue();
5352
protected Properties dbzProperties;
@@ -144,8 +143,8 @@ public JdbcSourceConfigFactory fetchSize(int fetchSize) {
144143
* The maximum time that the connector should wait after trying to connect to the database
145144
* server before timing out.
146145
*/
147-
public JdbcSourceConfigFactory connectTimeout(Duration connectTimeout) {
148-
this.connectTimeout = connectTimeout;
146+
public JdbcSourceConfigFactory connectTimeoutMillis(long connectTimeoutMillis) {
147+
this.connectTimeoutMillis = connectTimeoutMillis;
149148
return this;
150149
}
151150

@@ -199,7 +198,7 @@ public JdbcSourceConfigFactory fromReadonlyConfig(ReadonlyConfig config) {
199198
this.splitSize = config.get(SourceOptions.SNAPSHOT_SPLIT_SIZE);
200199
this.fetchSize = config.get(SourceOptions.SNAPSHOT_FETCH_SIZE);
201200
this.serverTimeZone = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
202-
this.connectTimeout = config.get(JdbcSourceOptions.CONNECT_TIMEOUT);
201+
this.connectTimeoutMillis = config.get(JdbcSourceOptions.CONNECT_TIMEOUT_MS);
203202
this.connectMaxRetries = config.get(JdbcSourceOptions.CONNECT_MAX_RETRIES);
204203
this.connectionPoolSize = config.get(JdbcSourceOptions.CONNECTION_POOL_SIZE);
205204
this.dbzProperties = new Properties();

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/option/JdbcSourceOptions.java

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import org.apache.seatunnel.api.configuration.Options;
2222
import org.apache.seatunnel.connectors.cdc.base.source.IncrementalSource;
2323

24-
import java.time.Duration;
25-
2624
/** Configurations for {@link IncrementalSource} of JDBC data source. */
2725
@SuppressWarnings("checkstyle:MagicNumber")
2826
public class JdbcSourceOptions extends SourceOptions {
@@ -83,10 +81,10 @@ public class JdbcSourceOptions extends SourceOptions {
8381
+ "so it can read the binlog. By default, a random number is generated between"
8482
+ " 5400 and 6400, though we recommend setting an explicit value.");
8583

86-
public static final Option<Duration> CONNECT_TIMEOUT =
87-
Options.key("connect.timeout")
88-
.durationType()
89-
.defaultValue(Duration.ofSeconds(30))
84+
public static final Option<Long> CONNECT_TIMEOUT_MS =
85+
Options.key("connect.timeout.ms")
86+
.longType()
87+
.defaultValue(30000L)
9088
.withDescription(
9189
"The maximum time that the connector should wait after trying to connect to the database server before timing out.");
9290

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/relational/connection/JdbcConnectionPoolFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public HikariDataSource createPooledDataSource(JdbcSourceConfig sourceConfig) {
4141
config.setPassword(sourceConfig.getPassword());
4242
config.setMinimumIdle(MINIMUM_POOL_SIZE);
4343
config.setMaximumPoolSize(sourceConfig.getConnectionPoolSize());
44-
config.setConnectionTimeout(sourceConfig.getConnectTimeout().toMillis());
44+
config.setConnectionTimeout(sourceConfig.getConnectTimeoutMillis());
4545
config.addDataSourceProperty(SERVER_TIMEZONE_KEY, sourceConfig.getServerTimeZone());
4646
config.setDriverClassName(sourceConfig.getDriverClassName());
4747

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfig.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.debezium.connector.mysql.MySqlConnectorConfig;
2525
import io.debezium.relational.RelationalTableFilters;
2626

27-
import java.time.Duration;
2827
import java.util.List;
2928
import java.util.Properties;
3029

@@ -52,7 +51,7 @@ public MySqlSourceConfig(
5251
String password,
5352
int fetchSize,
5453
String serverTimeZone,
55-
Duration connectTimeout,
54+
long connectTimeoutMillis,
5655
int connectMaxRetries,
5756
int connectionPoolSize) {
5857
super(
@@ -71,7 +70,7 @@ public MySqlSourceConfig(
7170
password,
7271
fetchSize,
7372
serverTimeZone,
74-
connectTimeout,
73+
connectTimeoutMillis,
7574
connectMaxRetries,
7675
connectionPoolSize);
7776
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public MySqlSourceConfig create(int subtaskId) {
7171
props.setProperty("database.history.skip.unparseable.ddl", String.valueOf(true));
7272
props.setProperty("database.history.refer.ddl", String.valueOf(true));
7373

74-
props.setProperty("connect.timeout.ms", String.valueOf(connectTimeout.toMillis()));
74+
props.setProperty("connect.timeout.ms", String.valueOf(connectTimeoutMillis));
7575
// the underlying debezium reader should always capture the schema changes and forward them.
7676
// Note: the includeSchemaChanges parameter is used to control emitting the schema record,
7777
// only DataStream API program need to emit the schema record, the Table API need not
@@ -125,7 +125,7 @@ public MySqlSourceConfig create(int subtaskId) {
125125
password,
126126
fetchSize,
127127
serverTimeZone,
128-
connectTimeout,
128+
connectTimeoutMillis,
129129
connectMaxRetries,
130130
connectionPoolSize);
131131
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,11 @@
4242

4343
@AutoService(SeaTunnelSource.class)
4444
public class MySqlIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig> {
45+
static final String IDENTIFIER = "MySQL-CDC";
46+
4547
@Override
4648
public String getPluginName() {
47-
return "MySQL-CDC";
49+
return IDENTIFIER;
4850
}
4951

5052
@Override
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
19+
20+
import org.apache.seatunnel.api.configuration.util.OptionRule;
21+
import org.apache.seatunnel.api.table.factory.Factory;
22+
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
23+
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
24+
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
25+
26+
import com.google.auto.service.AutoService;
27+
28+
@AutoService(Factory.class)
29+
public class MySqlIncrementalSourceFactory implements TableSourceFactory {
30+
@Override
31+
public String factoryIdentifier() {
32+
return MySqlIncrementalSource.IDENTIFIER;
33+
}
34+
35+
@Override
36+
public OptionRule optionRule() {
37+
return JdbcSourceOptions.BASE_RULE
38+
.required(
39+
JdbcSourceOptions.HOSTNAME,
40+
JdbcSourceOptions.USERNAME,
41+
JdbcSourceOptions.PASSWORD,
42+
JdbcSourceOptions.DATABASE_NAME,
43+
JdbcSourceOptions.TABLE_NAME,
44+
JdbcCatalogOptions.BASE_URL)
45+
.optional(
46+
JdbcSourceOptions.PORT,
47+
JdbcSourceOptions.SERVER_ID,
48+
JdbcSourceOptions.SERVER_TIME_ZONE,
49+
JdbcSourceOptions.CONNECT_TIMEOUT_MS,
50+
JdbcSourceOptions.CONNECT_MAX_RETRIES,
51+
JdbcSourceOptions.CONNECTION_POOL_SIZE)
52+
.build();
53+
}
54+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
19+
20+
import org.junit.jupiter.api.Assertions;
21+
import org.junit.jupiter.api.Test;
22+
23+
public class MySqlIncrementalSourceFactoryTest {
24+
@Test
25+
public void testOptionRule() {
26+
Assertions.assertNotNull((new MySqlIncrementalSourceFactory()).optionRule());
27+
}
28+
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/config/SqlServerSourceConfig.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
2525
import io.debezium.relational.RelationalTableFilters;
2626

27-
import java.time.Duration;
2827
import java.util.List;
2928
import java.util.Properties;
3029

@@ -52,7 +51,7 @@ public SqlServerSourceConfig(
5251
String password,
5352
int fetchSize,
5453
String serverTimeZone,
55-
Duration connectTimeout,
54+
long connectTimeoutMillis,
5655
int connectMaxRetries,
5756
int connectionPoolSize) {
5857
super(
@@ -71,7 +70,7 @@ public SqlServerSourceConfig(
7170
password,
7271
fetchSize,
7372
serverTimeZone,
74-
connectTimeout,
73+
connectTimeoutMillis,
7574
connectMaxRetries,
7675
connectionPoolSize);
7776
}

0 commit comments

Comments
 (0)