Skip to content

Commit 0f0afb5

Browse files
authored
[feature][connector-v2] add sqlServer CDC (#3686)
* [feature][connector-v2] add sqlServer CDC * [feature][connector-v2] add sqlServer CDC E2E
1 parent 40c5d00 commit 0f0afb5

File tree

33 files changed

+3102
-116
lines changed

33 files changed

+3102
-116
lines changed

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,7 @@ seatunnel.source.Notion = connector-http-notion
160160
seatunnel.sink.RabbitMQ = connector-rabbitmq
161161
seatunnel.source.RabbitMQ = connector-rabbitmq
162162
seatunnel.source.OpenMldb = connector-openmldb
163+
seatunnel.source.SqlServer-CDC = connector-cdc-sqlserver
163164
seatunnel.sink.Doris = connector-doris
164165
seatunnel.source.Maxcompute = connector-maxcompute
165166
seatunnel.sink.Maxcompute = connector-maxcompute

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ public final void prepare(Config pluginConfig) throws PrepareFailException {
8585
this.stopMode = stopConfig.getStopMode();
8686
this.incrementalParallelism = readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
8787
this.configFactory = createSourceConfigFactory(readonlyConfig);
88-
this.deserializationSchema = createDebeziumDeserializationSchema(readonlyConfig);
8988
this.dataSourceDialect = createDataSourceDialect(readonlyConfig);
89+
this.deserializationSchema = createDebeziumDeserializationSchema(readonlyConfig);
9090
this.offsetFactory = createOffsetFactory(readonlyConfig);
9191
}
9292

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializationConverters.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -334,7 +334,7 @@ public Object convert(Object dbzObj, Schema schema) {
334334
default:
335335
}
336336
} else if (dbzObj instanceof Integer) {
337-
return LocalTime.ofNanoOfDay((long) dbzObj * 1000_000L);
337+
return LocalTime.ofNanoOfDay((Integer) dbzObj * 1000_000L);
338338
}
339339
// get number of milliseconds of the day
340340
return TemporalConversions.toLocalTime(dbzObj);

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -235,13 +235,12 @@ public Offset getStreamOffset(SourceRecord sourceRecord) {
235235
* Loads the connector's persistent offset (if present) via the given loader.
236236
*/
237237
private MySqlOffsetContext loadStartingOffsetState(
238-
OffsetContext.Loader loader, SourceSplitBase mySqlSplit) {
238+
MySqlOffsetContext.Loader loader, SourceSplitBase mySqlSplit) {
239239
Offset offset =
240240
mySqlSplit.isSnapshotSplit() ? BinlogOffset.INITIAL_OFFSET
241241
: mySqlSplit.asIncrementalSplit().getStartupOffset();
242242

243-
MySqlOffsetContext mySqlOffsetContext =
244-
(MySqlOffsetContext) loader.load(offset.getOffset());
243+
MySqlOffsetContext mySqlOffsetContext = loader.load(offset.getOffset());
245244

246245
if (!isBinlogAvailable(mySqlOffsetContext)) {
247246
throw new IllegalStateException(

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/scan/MySqlSnapshotSplitReadTask.java

Lines changed: 10 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,13 @@
3131
import io.debezium.connector.mysql.MySqlConnectorConfig;
3232
import io.debezium.connector.mysql.MySqlDatabaseSchema;
3333
import io.debezium.connector.mysql.MySqlOffsetContext;
34-
import io.debezium.connector.mysql.MySqlValueConverters;
3534
import io.debezium.pipeline.EventDispatcher;
3635
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
3736
import io.debezium.pipeline.source.spi.ChangeEventSource;
3837
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
3938
import io.debezium.pipeline.spi.ChangeRecordEmitter;
4039
import io.debezium.pipeline.spi.OffsetContext;
4140
import io.debezium.pipeline.spi.SnapshotResult;
42-
import io.debezium.relational.Column;
4341
import io.debezium.relational.RelationalSnapshotChangeEventSource;
4442
import io.debezium.relational.SnapshotChangeRecordEmitter;
4543
import io.debezium.relational.Table;
@@ -52,14 +50,12 @@
5250
import org.slf4j.Logger;
5351
import org.slf4j.LoggerFactory;
5452

55-
import java.io.UnsupportedEncodingException;
56-
import java.sql.Blob;
5753
import java.sql.PreparedStatement;
5854
import java.sql.ResultSet;
55+
import java.sql.ResultSetMetaData;
5956
import java.sql.SQLException;
6057
import java.sql.Types;
6158
import java.time.Duration;
62-
import java.util.Calendar;
6359

6460
public class MySqlSnapshotSplitReadTask extends AbstractSnapshotChangeEventSource {
6561

@@ -146,7 +142,7 @@ protected SnapshotResult doExecute(
146142
"Snapshot step 3 - Determining high watermark {} for split {}",
147143
highWatermark,
148144
snapshotSplit);
149-
((SnapshotSplitChangeEventSourceContext) context).setHighWatermark(lowWatermark);
145+
((SnapshotSplitChangeEventSourceContext) context).setHighWatermark(highWatermark);
150146
dispatcher.dispatchWatermarkEvent(
151147
offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH);
152148
return SnapshotResult.completed(ctx.offset);
@@ -220,9 +216,8 @@ private void createDataEventsForTable(
220216
rows++;
221217
final Object[] row = new Object[columnArray.getGreatestColumnPosition()];
222218
for (int i = 0; i < columnArray.getColumns().length; i++) {
223-
Column actualColumn = table.columns().get(i);
224219
row[columnArray.getColumns()[i].position() - 1] =
225-
readField(rs, i + 1, actualColumn, table);
220+
readField(rs, i + 1);
226221
}
227222
if (logTimer.expired()) {
228223
long stop = clock.currentTimeInMillis();
@@ -259,108 +254,15 @@ private Threads.Timer getTableScanLogTimer() {
259254
return Threads.timer(clock, LOG_INTERVAL);
260255
}
261256

262-
/**
263-
* Read JDBC return value and deal special type like time, timestamp.
264-
*
265-
* <p>Note https://issues.redhat.com/browse/DBZ-3238 has fixed this issue, please remove
266-
* this method once we bump Debezium version to 1.6
267-
*/
268-
private Object readField(ResultSet rs, int fieldNo, Column actualColumn, Table actualTable)
257+
private Object readField(
258+
ResultSet rs, int columnIndex)
269259
throws SQLException {
270-
if (actualColumn.jdbcType() == Types.TIME) {
271-
return readTimeField(rs, fieldNo);
272-
} else if (actualColumn.jdbcType() == Types.DATE) {
273-
return readDateField(rs, fieldNo, actualColumn, actualTable);
274-
}
275-
// This is for DATETIME columns (a logical date + time without time zone)
276-
// by reading them with a calendar based on the default time zone, we make sure that the
277-
// value
278-
// is constructed correctly using the database's (or connection's) time zone
279-
else if (actualColumn.jdbcType() == Types.TIMESTAMP) {
280-
return readTimestampField(rs, fieldNo, actualColumn, actualTable);
281-
}
282-
// JDBC's rs.GetObject() will return a Boolean for all TINYINT(1) columns.
283-
// TINYINT columns are reprtoed as SMALLINT by JDBC driver
284-
else if (actualColumn.jdbcType() == Types.TINYINT
285-
|| actualColumn.jdbcType() == Types.SMALLINT) {
286-
// It seems that rs.wasNull() returns false when default value is set and NULL is
287-
// inserted
288-
// We thus need to use getObject() to identify if the value was provided and if yes
289-
// then
290-
// read it again to get correct scale
291-
return rs.getObject(fieldNo) == null ? null : rs.getInt(fieldNo);
292-
}
293-
// DBZ-2673
294-
// It is necessary to check the type names as types like ENUM and SET are
295-
// also reported as JDBC type char
296-
else if ("CHAR".equals(actualColumn.typeName())
297-
|| "VARCHAR".equals(actualColumn.typeName())
298-
|| "TEXT".equals(actualColumn.typeName())) {
299-
return rs.getBytes(fieldNo);
260+
final ResultSetMetaData metaData = rs.getMetaData();
261+
final int columnType = metaData.getColumnType(columnIndex);
262+
if (columnType == Types.TIME) {
263+
return rs.getTimestamp(columnIndex);
300264
} else {
301-
return rs.getObject(fieldNo);
302-
}
303-
}
304-
305-
/**
306-
* As MySQL connector/J implementation is broken for MySQL type "TIME" we have to use a
307-
* binary-ish workaround. https://issues.jboss.org/browse/DBZ-342
308-
*/
309-
private Object readTimeField(ResultSet rs, int fieldNo) throws SQLException {
310-
Blob b = rs.getBlob(fieldNo);
311-
if (b == null) {
312-
return null; // Don't continue parsing time field if it is null
313-
}
314-
315-
try {
316-
return MySqlValueConverters.stringToDuration(
317-
new String(b.getBytes(1, (int) (b.length())), "UTF-8"));
318-
} catch (UnsupportedEncodingException e) {
319-
LOG.error("Could not read MySQL TIME value as UTF-8");
320-
throw new RuntimeException(e);
321-
}
322-
}
323-
324-
/**
325-
* In non-string mode the date field can contain zero in any of the date part which we need
326-
* to handle as all-zero.
327-
*/
328-
private Object readDateField(ResultSet rs, int fieldNo, Column column, Table table)
329-
throws SQLException {
330-
Blob b = rs.getBlob(fieldNo);
331-
if (b == null) {
332-
return null; // Don't continue parsing date field if it is null
333-
}
334-
335-
try {
336-
return MySqlValueConverters.stringToLocalDate(
337-
new String(b.getBytes(1, (int) (b.length())), "UTF-8"), column, table);
338-
} catch (UnsupportedEncodingException e) {
339-
LOG.error("Could not read MySQL TIME value as UTF-8");
340-
throw new RuntimeException(e);
341-
}
342-
}
343-
344-
/**
345-
* In non-string mode the time field can contain zero in any of the date part which we need
346-
* to handle as all-zero.
347-
*/
348-
private Object readTimestampField(ResultSet rs, int fieldNo, Column column, Table table)
349-
throws SQLException {
350-
Blob b = rs.getBlob(fieldNo);
351-
if (b == null) {
352-
return null; // Don't continue parsing timestamp field if it is null
353-
}
354-
355-
try {
356-
return MySqlValueConverters.containsZeroValuesInDatePart(
357-
new String(b.getBytes(1, (int) (b.length())), "UTF-8"),
358-
column,
359-
table) ? null
360-
: rs.getTimestamp(fieldNo, Calendar.getInstance());
361-
} catch (UnsupportedEncodingException e) {
362-
LOG.error("Could not read MySQL TIME value as UTF-8");
363-
throw new RuntimeException(e);
265+
return rs.getObject(columnIndex);
364266
}
365267
}
366268

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
4+
Licensed to the Apache Software Foundation (ASF) under one or more
5+
contributor license agreements. See the NOTICE file distributed with
6+
this work for additional information regarding copyright ownership.
7+
The ASF licenses this file to You under the Apache License, Version 2.0
8+
(the "License"); you may not use this file except in compliance with
9+
the License. You may obtain a copy of the License at
10+
11+
http://www.apache.org/licenses/LICENSE-2.0
12+
13+
Unless required by applicable law or agreed to in writing, software
14+
distributed under the License is distributed on an "AS IS" BASIS,
15+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
See the License for the specific language governing permissions and
17+
limitations under the License.
18+
19+
-->
20+
<project xmlns="http://maven.apache.org/POM/4.0.0"
21+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
22+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
23+
<parent>
24+
<artifactId>connector-cdc</artifactId>
25+
<groupId>org.apache.seatunnel</groupId>
26+
<version>${revision}</version>
27+
</parent>
28+
<modelVersion>4.0.0</modelVersion>
29+
<artifactId>connector-cdc-sqlserver</artifactId>
30+
31+
<dependencies>
32+
33+
<dependency>
34+
<groupId>org.apache.seatunnel</groupId>
35+
<artifactId>connector-cdc-base</artifactId>
36+
</dependency>
37+
38+
<dependency>
39+
<groupId>io.debezium</groupId>
40+
<artifactId>debezium-connector-sqlserver</artifactId>
41+
</dependency>
42+
</dependencies>
43+
44+
<dependencyManagement>
45+
<dependencies>
46+
<dependency>
47+
<groupId>org.apache.seatunnel</groupId>
48+
<artifactId>connector-cdc-base</artifactId>
49+
<version>${project.version}</version>
50+
<scope>compile</scope>
51+
</dependency>
52+
53+
<dependency>
54+
<groupId>io.debezium</groupId>
55+
<artifactId>debezium-connector-sqlserver</artifactId>
56+
<version>${debezium.version}</version>
57+
<scope>compile</scope>
58+
</dependency>
59+
60+
</dependencies>
61+
</dependencyManagement>
62+
63+
</project>
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.config;
19+
20+
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
21+
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
22+
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
23+
24+
import io.debezium.connector.sqlserver.SqlServerConnectorConfig;
25+
import io.debezium.relational.RelationalTableFilters;
26+
27+
import java.time.Duration;
28+
import java.util.List;
29+
import java.util.Properties;
30+
31+
/**
32+
* Describes the connection information of the Mysql database and the configuration information for
33+
* performing snapshotting and streaming reading, such as splitSize.
34+
*/
35+
public class SqlServerSourceConfig extends JdbcSourceConfig {
36+
37+
private static final long serialVersionUID = 1L;
38+
39+
public SqlServerSourceConfig(
40+
StartupConfig startupConfig,
41+
StopConfig stopConfig,
42+
List<String> databaseList,
43+
List<String> tableList,
44+
int splitSize,
45+
double distributionFactorUpper,
46+
double distributionFactorLower,
47+
Properties dbzProperties,
48+
String driverClassName,
49+
String hostname,
50+
int port,
51+
String username,
52+
String password,
53+
int fetchSize,
54+
String serverTimeZone,
55+
Duration connectTimeout,
56+
int connectMaxRetries,
57+
int connectionPoolSize) {
58+
super(
59+
startupConfig,
60+
stopConfig,
61+
databaseList,
62+
tableList,
63+
splitSize,
64+
distributionFactorUpper,
65+
distributionFactorLower,
66+
dbzProperties,
67+
driverClassName,
68+
hostname,
69+
port,
70+
username,
71+
password,
72+
fetchSize,
73+
serverTimeZone,
74+
connectTimeout,
75+
connectMaxRetries,
76+
connectionPoolSize);
77+
}
78+
79+
@Override
80+
public SqlServerConnectorConfig getDbzConnectorConfig() {
81+
return new SqlServerConnectorConfig(getDbzConfiguration());
82+
}
83+
84+
public RelationalTableFilters getTableFilters() {
85+
return getDbzConnectorConfig().getTableFilters();
86+
}
87+
}

0 commit comments

Comments
 (0)