Skip to content

Commit 058f559

Browse files
authored
[Improve][mysql-cdc] Support mysql 5.5 versions (#6710)
1 parent e310353 commit 058f559

File tree

10 files changed

+298
-92
lines changed

10 files changed

+298
-92
lines changed

docs/en/connector-v2/source/MySQL-CDC.md

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,9 @@ describes how to set up the MySQL CDC connector to run SQL queries against MySQL
2323

2424
## Supported DataSource Info
2525

26-
| Datasource | Supported versions | Driver | Url | Maven |
27-
|------------|-------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
28-
| MySQL | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x </li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |
26+
| Datasource | Supported versions | Driver | Url | Maven |
27+
|------------|------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------|----------------------------------|----------------------------------------------------------------------|
28+
| MySQL | <li> [MySQL](https://dev.mysql.com/doc): 5.5, 5.6, 5.7, 8.0.x </li><li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x </li> | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.28 |
2929

3030
## Using Dependency
3131

@@ -92,9 +92,11 @@ server-id = 223344
9292
log_bin = mysql-bin
9393
expire_logs_days = 10
9494
binlog_format = row
95+
# mysql 5.6+ requires binlog_row_image to be set to FULL
9596
binlog_row_image = FULL
9697
9798
# enable gtid mode
99+
# mysql 5.6+ requires gtid_mode to be set to ON
98100
gtid_mode = on
99101
enforce_gtid_consistency = on
100102
```
@@ -107,6 +109,21 @@ enforce_gtid_consistency = on
107109

108110
4. Confirm your changes by checking the binlog status once more:
109111

112+
MySQL 5.5:
113+
114+
```sql
115+
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
116+
+--------------------------+----------------+
117+
| Variable_name | Value |
118+
+--------------------------+----------------+
119+
| binlog_format | ROW |
120+
| log_bin | ON |
121+
+--------------------------+----------------+
122+
5 rows in set (0.00 sec)
123+
```
124+
125+
MySQL 5.6+:
126+
110127
```sql
111128
mysql> show variables where variable_name in ('log_bin', 'binlog_format', 'binlog_row_image', 'gtid_mode', 'enforce_gtid_consistency');
112129
+--------------------------+----------------+

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,6 @@ public static org.apache.seatunnel.api.table.catalog.Column convertToSeaTunnelCo
9292
default:
9393
break;
9494
}
95-
return MySqlTypeConverter.INSTANCE.convert(builder.build());
95+
return MySqlTypeConverter.DEFAULT_INSTANCE.convert(builder.build());
9696
}
9797
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MySqlCatalog.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
3131
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
3232
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeMapper;
33+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion;
3334

3435
import com.google.common.base.Preconditions;
3536
import com.mysql.cj.MysqlType;
@@ -39,6 +40,7 @@
3940
import java.sql.DatabaseMetaData;
4041
import java.sql.ResultSet;
4142
import java.sql.SQLException;
43+
import java.sql.Statement;
4244
import java.util.Iterator;
4345
import java.util.List;
4446
import java.util.Locale;
@@ -56,9 +58,14 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
5658
SYS_DATABASES.add("sys");
5759
}
5860

61+
private MySqlVersion version;
62+
private MySqlTypeConverter typeConverter;
63+
5964
public MySqlCatalog(
6065
String catalogName, String username, String pwd, JdbcUrlUtil.UrlInfo urlInfo) {
6166
super(catalogName, username, pwd, urlInfo, null);
67+
this.version = resolveVersion();
68+
this.typeConverter = new MySqlTypeConverter(version);
6269
}
6370

6471
@Override
@@ -130,7 +137,8 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
130137
// e.g. `varchar(10)` is 40
131138
long charOctetLength = resultSet.getLong("CHARACTER_OCTET_LENGTH");
132139
// e.g. `timestamp(3)` is 3
133-
int timePrecision = resultSet.getInt("DATETIME_PRECISION");
140+
int timePrecision =
141+
MySqlVersion.V_5_5.equals(version) ? 0 : resultSet.getInt("DATETIME_PRECISION");
134142

135143
Preconditions.checkArgument(!(numberPrecision > 0 && charOctetLength > 0));
136144
Preconditions.checkArgument(!(numberScale > 0 && timePrecision > 0));
@@ -152,12 +160,13 @@ protected Column buildColumn(ResultSet resultSet) throws SQLException {
152160
.defaultValue(defaultValue)
153161
.comment(comment)
154162
.build();
155-
return MySqlTypeConverter.INSTANCE.convert(typeDefine);
163+
return typeConverter.convert(typeDefine);
156164
}
157165

158166
@Override
159167
protected String getCreateTableSql(TablePath tablePath, CatalogTable table) {
160-
return MysqlCreateTableSqlBuilder.builder(tablePath, table).build(table.getCatalogName());
168+
return MysqlCreateTableSqlBuilder.builder(tablePath, table, typeConverter)
169+
.build(table.getCatalogName());
161170
}
162171

163172
@Override
@@ -179,7 +188,8 @@ protected String getDropDatabaseSql(String databaseName) {
179188
@Override
180189
public CatalogTable getTable(String sqlQuery) throws SQLException {
181190
Connection defaultConnection = getConnection(defaultUrl);
182-
return CatalogUtils.getCatalogTable(defaultConnection, sqlQuery, new MySqlTypeMapper());
191+
return CatalogUtils.getCatalogTable(
192+
defaultConnection, sqlQuery, new MySqlTypeMapper(typeConverter));
183193
}
184194

185195
@Override
@@ -193,4 +203,18 @@ public String getExistDataSql(TablePath tablePath) {
193203
"SELECT * FROM `%s`.`%s` LIMIT 1;",
194204
tablePath.getDatabaseName(), tablePath.getTableName());
195205
}
206+
207+
private MySqlVersion resolveVersion() {
208+
try (Statement statement = getConnection(defaultUrl).createStatement();
209+
ResultSet resultSet = statement.executeQuery("SELECT VERSION()")) {
210+
resultSet.next();
211+
return MySqlVersion.parse(resultSet.getString(1));
212+
} catch (Exception e) {
213+
log.info(
214+
"Failed to get mysql version, fallback to default version: {}",
215+
MySqlVersion.V_5_7,
216+
e);
217+
return MySqlVersion.V_5_7;
218+
}
219+
}
196220
}

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.seatunnel.api.table.catalog.TablePath;
2525
import org.apache.seatunnel.api.table.catalog.TableSchema;
2626
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
27+
import org.apache.seatunnel.api.table.type.SqlType;
2728
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils;
2829
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
2930
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
@@ -57,20 +58,23 @@ public class MysqlCreateTableSqlBuilder {
5758

5859
private String fieldIde;
5960

60-
private MysqlCreateTableSqlBuilder(String tableName) {
61+
private final MySqlTypeConverter typeConverter;
62+
63+
private MysqlCreateTableSqlBuilder(String tableName, MySqlTypeConverter typeConverter) {
6164
checkNotNull(tableName, "tableName must not be null");
6265
this.tableName = tableName;
66+
this.typeConverter = typeConverter;
6367
}
6468

6569
public static MysqlCreateTableSqlBuilder builder(
66-
TablePath tablePath, CatalogTable catalogTable) {
70+
TablePath tablePath, CatalogTable catalogTable, MySqlTypeConverter typeConverter) {
6771
checkNotNull(tablePath, "tablePath must not be null");
6872
checkNotNull(catalogTable, "catalogTable must not be null");
6973

7074
TableSchema tableSchema = catalogTable.getTableSchema();
7175
checkNotNull(tableSchema, "tableSchema must not be null");
7276

73-
return new MysqlCreateTableSqlBuilder(tablePath.getTableName())
77+
return new MysqlCreateTableSqlBuilder(tablePath.getTableName(), typeConverter)
7478
.comment(catalogTable.getComment())
7579
// todo: set charset and collate
7680
.engine(null)
@@ -167,10 +171,16 @@ private String buildColumnIdentifySql(Column column, String catalogName) {
167171
final List<String> columnSqls = new ArrayList<>();
168172
columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
169173
boolean isSupportDef = true;
170-
if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
174+
175+
if ((SqlType.TIME.equals(column.getDataType().getSqlType())
176+
|| SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
177+
&& column.getScale() != null) {
178+
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
179+
columnSqls.add(typeDefine.getColumnType());
180+
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
171181
columnSqls.add(column.getSourceType());
172182
} else {
173-
BasicTypeDefine<MysqlType> typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
183+
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
174184
columnSqls.add(typeDefine.getColumnType());
175185
}
176186
// nullable

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlDataTypeConvertor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ public SeaTunnelDataType<?> toSeaTunnelType(
102102
.scale(scale)
103103
.build();
104104

105-
return MySqlTypeConverter.INSTANCE.convert(typeDefine).getDataType();
105+
return MySqlTypeConverter.DEFAULT_INSTANCE.convert(typeDefine).getDataType();
106106
}
107107

108108
@Override
@@ -122,7 +122,8 @@ public MysqlType toConnectorType(
122122
.nullable(true)
123123
.build();
124124

125-
BasicTypeDefine<MysqlType> typeDefine = MySqlTypeConverter.INSTANCE.reconvert(column);
125+
BasicTypeDefine<MysqlType> typeDefine =
126+
MySqlTypeConverter.DEFAULT_INSTANCE.reconvert(column);
126127
return typeDefine.getNativeType();
127128
}
128129

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeConverter.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,14 @@ public class MySqlTypeConverter implements TypeConverter<BasicTypeDefine<MysqlTy
100100
public static final long POWER_2_24 = (long) Math.pow(2, 24);
101101
public static final long POWER_2_32 = (long) Math.pow(2, 32);
102102
public static final long MAX_VARBINARY_LENGTH = POWER_2_16 - 4;
103-
public static final MySqlTypeConverter INSTANCE = new MySqlTypeConverter();
103+
public static final MySqlTypeConverter DEFAULT_INSTANCE =
104+
new MySqlTypeConverter(MySqlVersion.V_5_7);
105+
106+
private final MySqlVersion version;
107+
108+
public MySqlTypeConverter(MySqlVersion version) {
109+
this.version = version;
110+
}
104111

105112
@Override
106113
public String identifier() {
@@ -462,7 +469,9 @@ public BasicTypeDefine<MysqlType> reconvert(Column column) {
462469
case TIME:
463470
builder.nativeType(MysqlType.TIME);
464471
builder.dataType(MYSQL_TIME);
465-
if (column.getScale() != null && column.getScale() > 0) {
472+
if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
473+
builder.columnType(MYSQL_TIME);
474+
} else if (column.getScale() != null && column.getScale() > 0) {
466475
int timeScale = column.getScale();
467476
if (timeScale > MAX_TIME_SCALE) {
468477
timeScale = MAX_TIME_SCALE;
@@ -484,7 +493,9 @@ public BasicTypeDefine<MysqlType> reconvert(Column column) {
484493
case TIMESTAMP:
485494
builder.nativeType(MysqlType.DATETIME);
486495
builder.dataType(MYSQL_DATETIME);
487-
if (column.getScale() != null && column.getScale() > 0) {
496+
if (version.isAtOrBefore(MySqlVersion.V_5_5)) {
497+
builder.columnType(MYSQL_DATETIME);
498+
} else if (column.getScale() != null && column.getScale() > 0) {
488499
int timestampScale = column.getScale();
489500
if (timestampScale > MAX_TIMESTAMP_SCALE) {
490501
timestampScale = MAX_TIMESTAMP_SCALE;

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/mysql/MySqlTypeMapper.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,19 @@
2828

2929
public class MySqlTypeMapper implements JdbcDialectTypeMapper {
3030

31+
private MySqlTypeConverter typeConverter;
32+
33+
public MySqlTypeMapper() {
34+
this(MySqlTypeConverter.DEFAULT_INSTANCE);
35+
}
36+
37+
public MySqlTypeMapper(MySqlTypeConverter typeConverter) {
38+
this.typeConverter = typeConverter;
39+
}
40+
3141
@Override
3242
public Column mappingColumn(BasicTypeDefine typeDefine) {
33-
return MySqlTypeConverter.INSTANCE.convert(typeDefine);
43+
return typeConverter.convert(typeDefine);
3444
}
3545

3646
@Override
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql;
19+
20+
public enum MySqlVersion {
21+
V_5_5,
22+
V_5_6,
23+
V_5_7,
24+
V_8;
25+
26+
public static MySqlVersion parse(String version) {
27+
if (version != null) {
28+
if (version.startsWith("5.5")) {
29+
return V_5_5;
30+
}
31+
if (version.startsWith("5.6")) {
32+
return V_5_6;
33+
}
34+
if (version.startsWith("5.7")) {
35+
return V_5_7;
36+
}
37+
if (version.startsWith("8.0")) {
38+
return V_8;
39+
}
40+
}
41+
throw new UnsupportedOperationException("Unsupported MySQL version: " + version);
42+
}
43+
44+
public boolean isBefore(MySqlVersion version) {
45+
return this.compareTo(version) < 0;
46+
}
47+
48+
public boolean isAtOrBefore(MySqlVersion version) {
49+
return this.compareTo(version) <= 0;
50+
}
51+
52+
public boolean isAfter(MySqlVersion version) {
53+
return this.compareTo(version) > 0;
54+
}
55+
56+
public boolean isAtOrAfter(MySqlVersion version) {
57+
return this.compareTo(version) >= 0;
58+
}
59+
}

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.seatunnel.api.table.type.LocalTimeType;
2929
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
3030
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
31+
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
3132

3233
import org.junit.jupiter.api.Assertions;
3334
import org.junit.jupiter.api.Test;
@@ -91,7 +92,8 @@ public void testBuild() {
9192
"User table");
9293

9394
String createTableSql =
94-
MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable)
95+
MysqlCreateTableSqlBuilder.builder(
96+
tablePath, catalogTable, MySqlTypeConverter.DEFAULT_INSTANCE)
9597
.build(DatabaseIdentifier.MYSQL);
9698
// create table sql is change; The old unit tests are no longer applicable
9799
String expect =

0 commit comments

Comments
 (0)