From 5415bcf0f43db627cd3604697245cc07a630f808 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Wed, 3 Aug 2022 18:00:26 +0800 Subject: [PATCH] fix review --- .../cdc/connectors/mysql/MySqlValidator.java | 3 +- .../mysql/debezium/DebeziumUtils.java | 6 +- .../connector/mysql/MySqlConnection.java | 741 ++++++++++++++++++ .../MySqlConnectionWithJdbcProperties.java | 110 --- 4 files changed, 744 insertions(+), 116 deletions(-) create mode 100644 flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java delete mode 100644 flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnectionWithJdbcProperties.java diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlValidator.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlValidator.java index 7e6d676c14..da9ef13075 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlValidator.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/MySqlValidator.java @@ -67,8 +67,7 @@ public void validate() { } else { // for the legacy source connection = - DebeziumUtils.createMySqlConnection( - from(dbzProperties), sourceConfig.getJdbcProperties()); + DebeziumUtils.createMySqlConnection(from(dbzProperties), new Properties()); } checkVersion(connection); checkBinlogFormat(connection); diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java index f45d9eb2f5..2c76a76091 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/DebeziumUtils.java @@ -24,7 +24,6 @@ import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset; import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnection; -import io.debezium.connector.mysql.MySqlConnectionWithJdbcProperties; import io.debezium.connector.mysql.MySqlConnectorConfig; import io.debezium.connector.mysql.MySqlDatabaseSchema; import io.debezium.connector.mysql.MySqlSystemVariables; @@ -76,9 +75,8 @@ public static MySqlConnection createMySqlConnection(MySqlSourceConfig sourceConf /** Creates a new {@link MySqlConnection}, but not open the connection. */ public static MySqlConnection createMySqlConnection( Configuration dbzConfiguration, Properties jdbcProperties) { - return new MySqlConnectionWithJdbcProperties( - new MySqlConnectionWithJdbcProperties.MySqlConnectionConfigurationWithCustomUrl( - dbzConfiguration, jdbcProperties)); + return new MySqlConnection( + new MySqlConnection.MySqlConnectionConfiguration(dbzConfiguration, jdbcProperties)); } /** Creates a new {@link BinaryLogClient} for consuming mysql binlog. */ diff --git a/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java b/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java new file mode 100644 index 0000000000..cb781a07d0 --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java @@ -0,0 +1,741 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.mysql; + +import io.debezium.DebeziumException; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.CommonConnectorConfig.EventProcessingFailureHandlingMode; +import io.debezium.config.Configuration; +import io.debezium.config.Configuration.Builder; +import io.debezium.config.Field; +import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode; +import io.debezium.connector.mysql.legacy.MySqlJdbcContext.DatabaseLocales; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.Table; +import io.debezium.relational.TableId; +import io.debezium.relational.history.DatabaseHistory; +import io.debezium.schema.DatabaseSchema; +import io.debezium.util.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Properties; + +/** + * Copied from Debezium project(1.6.4.final) to add custom jdbc properties in the jdbc url. The new + * parameter {@code jdbcProperties} in the constructor of {@link MySqlConnectionConfiguration} will + * be used to generate the jdbc url pattern, and may overwrite the default value. + * + *

Line 80: Add field {@code urlPattern} in {@link MySqlConnection} and remove old pattern. + * + *

Line 93: Init {@code urlPattern} using the url pattern from {@link + * MySqlConnectionConfiguration}. + * + *

Line 561: Generate the connection string by the new field {@code urlPattern}. + * + *

Line 574 ~ 581: Add new constant and field {@code urlPattern} to {@link + * MySqlConnectionConfiguration}. + * + *

Line 626 ~ 629: Init new field {@code urlPattern} in {@link MySqlConnectionConfiguration}. + * + *

Line 690 ~ 720: Add some methods helping to generate the url pattern and add default values. + */ +public class MySqlConnection extends JdbcConnection { + + private static Logger LOGGER = LoggerFactory.getLogger(MySqlConnection.class); + + private static final String SQL_SHOW_SYSTEM_VARIABLES = "SHOW VARIABLES"; + private static final String SQL_SHOW_SYSTEM_VARIABLES_CHARACTER_SET = + "SHOW VARIABLES WHERE Variable_name IN ('character_set_server','collation_server')"; + private static final String SQL_SHOW_SESSION_VARIABLE_SSL_VERSION = + "SHOW SESSION STATUS LIKE 'Ssl_version'"; + + private final Map originalSystemProperties = new HashMap<>(); + private final MySqlConnectionConfiguration connectionConfig; + private final MysqlFieldReader mysqlFieldReader; + private final String urlPattern; + + /** + * Creates a new connection using the supplied configuration. + * + * @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null. + * @param fieldReader binary or text protocol based readers + */ + public MySqlConnection( + MySqlConnectionConfiguration connectionConfig, MysqlFieldReader fieldReader) { + super(connectionConfig.config(), connectionConfig.factory()); + this.connectionConfig = connectionConfig; + this.mysqlFieldReader = fieldReader; + this.urlPattern = connectionConfig.getUrlPattern(); + } + + /** + * Creates a new connection using the supplied configuration. + * + * @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null. + */ + public MySqlConnection(MySqlConnectionConfiguration connectionConfig) { + this(connectionConfig, new MysqlTextProtocolFieldReader()); + } + + @Override + public synchronized Connection connection(boolean executeOnConnect) throws SQLException { + if (!isConnected() && connectionConfig.sslModeEnabled()) { + originalSystemProperties.clear(); + // Set the System properties for SSL for the MySQL driver ... + setSystemProperty("javax.net.ssl.keyStore", MySqlConnectorConfig.SSL_KEYSTORE, true); + setSystemProperty( + "javax.net.ssl.keyStorePassword", + MySqlConnectorConfig.SSL_KEYSTORE_PASSWORD, + false); + setSystemProperty( + "javax.net.ssl.trustStore", MySqlConnectorConfig.SSL_TRUSTSTORE, true); + setSystemProperty( + "javax.net.ssl.trustStorePassword", + MySqlConnectorConfig.SSL_TRUSTSTORE_PASSWORD, + false); + } + return super.connection(executeOnConnect); + } + + @Override + public void close() throws SQLException { + try { + super.close(); + } finally { + // Reset the system properties to their original value ... + originalSystemProperties.forEach( + (name, value) -> { + if (value != null) { + System.setProperty(name, value); + } else { + System.clearProperty(name); + } + }); + } + } + + /** + * Read the MySQL charset-related system variables. + * + * @return the system variables that are related to server character sets; never null + */ + protected Map readMySqlCharsetSystemVariables() { + // Read the system variables from the MySQL instance and get the current database name ... + LOGGER.debug("Reading MySQL charset-related system variables before parsing DDL history."); + return querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES_CHARACTER_SET); + } + + /** + * Read the MySQL system variables. + * + * @return the system variables that are related to server character sets; never null + */ + protected Map readMySqlSystemVariables() { + // Read the system variables from the MySQL instance and get the current database name ... + LOGGER.debug("Reading MySQL system variables"); + return querySystemVariables(SQL_SHOW_SYSTEM_VARIABLES); + } + + private Map querySystemVariables(String statement) { + final Map variables = new HashMap<>(); + try { + query( + statement, + rs -> { + while (rs.next()) { + String varName = rs.getString(1); + String value = rs.getString(2); + if (varName != null && value != null) { + variables.put(varName, value); + LOGGER.debug( + "\t{} = {}", + Strings.pad(varName, 45, ' '), + Strings.pad(value, 45, ' ')); + } + } + }); + } catch (SQLException e) { + throw new DebeziumException("Error reading MySQL variables: " + e.getMessage(), e); + } + + return variables; + } + + protected String setStatementFor(Map variables) { + StringBuilder sb = new StringBuilder("SET "); + boolean first = true; + List varNames = new ArrayList<>(variables.keySet()); + Collections.sort(varNames); + for (String varName : varNames) { + if (first) { + first = false; + } else { + sb.append(", "); + } + sb.append(varName).append("="); + String value = variables.get(varName); + if (value == null) { + value = ""; + } + if (value.contains(",") || value.contains(";")) { + value = "'" + value + "'"; + } + sb.append(value); + } + return sb.append(";").toString(); + } + + protected void setSystemProperty(String property, Field field, boolean showValueInError) { + String value = connectionConfig.config().getString(field); + if (value != null) { + value = value.trim(); + String existingValue = System.getProperty(property); + if (existingValue == null) { + // There was no existing property ... + String existing = System.setProperty(property, value); + originalSystemProperties.put(property, existing); // the existing value may be null + } else { + existingValue = existingValue.trim(); + if (!existingValue.equalsIgnoreCase(value)) { + // There was an existing property, and the value is different ... + String msg = + "System or JVM property '" + + property + + "' is already defined, but the configuration property '" + + field.name() + + "' defines a different value"; + if (showValueInError) { + msg = + "System or JVM property '" + + property + + "' is already defined as " + + existingValue + + ", but the configuration property '" + + field.name() + + "' defines a different value '" + + value + + "'"; + } + throw new DebeziumException(msg); + } + // Otherwise, there was an existing property, and the value is exactly the same (so + // do nothing!) + } + } + } + + /** + * Read the Ssl Version session variable. + * + * @return the session variables that are related to sessions ssl version + */ + protected String getSessionVariableForSslVersion() { + final String SSL_VERSION = "Ssl_version"; + LOGGER.debug("Reading MySQL Session variable for Ssl Version"); + Map sessionVariables = + querySystemVariables(SQL_SHOW_SESSION_VARIABLE_SSL_VERSION); + if (!sessionVariables.isEmpty() && sessionVariables.containsKey(SSL_VERSION)) { + return sessionVariables.get(SSL_VERSION); + } + return null; + } + + /** + * Determine whether the MySQL server has GTIDs enabled. + * + * @return {@code false} if the server's {@code gtid_mode} is set and is {@code OFF}, or {@code + * true} otherwise + */ + public boolean isGtidModeEnabled() { + try { + return queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'GTID_MODE'", + rs -> { + if (rs.next()) { + return !"OFF".equalsIgnoreCase(rs.getString(2)); + } + return false; + }); + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking at GTID mode: ", e); + } + } + + /** + * Determine the executed GTID set for MySQL. + * + * @return the string representation of MySQL's GTID sets; never null but an empty string if the + * server does not use GTIDs + */ + public String knownGtidSet() { + try { + return queryAndMap( + "SHOW MASTER STATUS", + rs -> { + if (rs.next() && rs.getMetaData().getColumnCount() > 4) { + return rs.getString( + 5); // GTID set, may be null, blank, or contain a GTID set + } + return ""; + }); + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking at GTID mode: ", e); + } + } + + /** + * Determine the difference between two sets. + * + * @return a subtraction of two GTID sets; never null + */ + public GtidSet subtractGtidSet(GtidSet set1, GtidSet set2) { + try { + return prepareQueryAndMap( + "SELECT GTID_SUBTRACT(?, ?)", + ps -> { + ps.setString(1, set1.toString()); + ps.setString(2, set2.toString()); + }, + rs -> { + if (rs.next()) { + return new GtidSet(rs.getString(1)); + } + return new GtidSet(""); + }); + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking at GTID mode: ", e); + } + } + + /** + * Get the purged GTID values from MySQL (gtid_purged value) + * + * @return A GTID set; may be empty if not using GTIDs or none have been purged yet + */ + public GtidSet purgedGtidSet() { + try { + return queryAndMap( + "SELECT @@global.gtid_purged", + rs -> { + if (rs.next() && rs.getMetaData().getColumnCount() > 0) { + return new GtidSet( + rs.getString( + 1)); // GTID set, may be null, blank, or contain a GTID + // set + } + return new GtidSet(""); + }); + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking at gtid_purged variable: ", + e); + } + } + + /** + * Determine if the current user has the named privilege. Note that if the user has the "ALL" + * privilege this method returns {@code true}. + * + * @param grantName the name of the MySQL privilege; may not be null + * @return {@code true} if the user has the named privilege, or {@code false} otherwise + */ + public boolean userHasPrivileges(String grantName) { + try { + return queryAndMap( + "SHOW GRANTS FOR CURRENT_USER", + rs -> { + while (rs.next()) { + String grants = rs.getString(1); + LOGGER.debug(grants); + if (grants == null) { + return false; + } + grants = grants.toUpperCase(); + if (grants.contains("ALL") + || grants.contains(grantName.toUpperCase())) { + return true; + } + } + return false; + }); + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking at privileges for current user: ", + e); + } + } + + /** + * Determine the earliest binlog filename that is still available in the server. + * + * @return the name of the earliest binlog filename, or null if there are none. + */ + public String earliestBinlogFilename() { + // Accumulate the available binlog filenames ... + List logNames = new ArrayList<>(); + try { + LOGGER.info("Checking all known binlogs from MySQL"); + query( + "SHOW BINARY LOGS", + rs -> { + while (rs.next()) { + logNames.add(rs.getString(1)); + } + }); + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking for binary logs: ", e); + } + + if (logNames.isEmpty()) { + return null; + } + return logNames.get(0); + } + + /** + * Determine whether the MySQL server has the binlog_row_image set to 'FULL'. + * + * @return {@code true} if the server's {@code binlog_row_image} is set to {@code FULL}, or + * {@code false} otherwise + */ + protected boolean isBinlogRowImageFull() { + try { + final String rowImage = + queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'binlog_row_image'", + rs -> { + if (rs.next()) { + return rs.getString(2); + } + // This setting was introduced in MySQL 5.6+ with default of 'FULL'. + // For older versions, assume 'FULL'. + return "FULL"; + }); + LOGGER.debug("binlog_row_image={}", rowImage); + return "FULL".equalsIgnoreCase(rowImage); + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking at BINLOG_ROW_IMAGE mode: ", + e); + } + } + + /** + * Determine whether the MySQL server has the row-level binlog enabled. + * + * @return {@code true} if the server's {@code binlog_format} is set to {@code ROW}, or {@code + * false} otherwise + */ + protected boolean isBinlogFormatRow() { + try { + final String mode = + queryAndMap( + "SHOW GLOBAL VARIABLES LIKE 'binlog_format'", + rs -> rs.next() ? rs.getString(2) : ""); + LOGGER.debug("binlog_format={}", mode); + return "ROW".equalsIgnoreCase(mode); + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking at BINLOG_FORMAT mode: ", + e); + } + } + + /** + * Query the database server to get the list of the binlog files availble. + * + * @return list of the binlog files + */ + public List availableBinlogFiles() { + List logNames = new ArrayList<>(); + try { + LOGGER.info("Get all known binlogs from MySQL"); + query( + "SHOW BINARY LOGS", + rs -> { + while (rs.next()) { + logNames.add(rs.getString(1)); + } + }); + return logNames; + } catch (SQLException e) { + throw new DebeziumException( + "Unexpected error while connecting to MySQL and looking for binary logs: ", e); + } + } + + public OptionalLong getEstimatedTableSize(TableId tableId) { + try { + // Choose how we create statements based on the # of rows. + // This is approximate and less accurate then COUNT(*), + // but far more efficient for large InnoDB tables. + execute("USE `" + tableId.catalog() + "`;"); + return queryAndMap( + "SHOW TABLE STATUS LIKE '" + tableId.table() + "';", + rs -> { + if (rs.next()) { + return OptionalLong.of((rs.getLong(5))); + } + return OptionalLong.empty(); + }); + } catch (SQLException e) { + LOGGER.debug( + "Error while getting number of rows in table {}: {}", + tableId, + e.getMessage(), + e); + } + return OptionalLong.empty(); + } + + public boolean isTableIdCaseSensitive() { + return !"0" + .equals( + readMySqlSystemVariables() + .get(MySqlSystemVariables.LOWER_CASE_TABLE_NAMES)); + } + + /** + * Read the MySQL default character sets for exisiting databases. + * + * @return the map of database names with their default character sets; never null + */ + protected Map readDatabaseCollations() { + LOGGER.debug("Reading default database charsets"); + try { + return queryAndMap( + "SELECT schema_name, default_character_set_name, default_collation_name FROM information_schema.schemata", + rs -> { + final Map charsets = new HashMap<>(); + while (rs.next()) { + String dbName = rs.getString(1); + String charset = rs.getString(2); + String collation = rs.getString(3); + if (dbName != null && (charset != null || collation != null)) { + charsets.put(dbName, new DatabaseLocales(charset, collation)); + LOGGER.debug( + "\t{} = {}, {}", + Strings.pad(dbName, 45, ' '), + Strings.pad(charset, 45, ' '), + Strings.pad(collation, 45, ' ')); + } + } + return charsets; + }); + } catch (SQLException e) { + throw new DebeziumException( + "Error reading default database charsets: " + e.getMessage(), e); + } + } + + public String connectionString() { + return connectionString(urlPattern); + } + + public static class MySqlConnectionConfiguration { + + protected static final String JDBC_PROPERTY_LEGACY_DATETIME = "useLegacyDatetimeCode"; + protected static final String JDBC_PROPERTY_CONNECTION_TIME_ZONE = "connectionTimeZone"; + protected static final String JDBC_PROPERTY_LEGACY_SERVER_TIME_ZONE = "serverTimezone"; + + private final Configuration jdbcConfig; + private final ConnectionFactory factory; + private final Configuration config; + + private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties(); + private static final String JDBC_URL_PATTERN = + "jdbc:mysql://${hostname}:${port}/?useSSL=${useSSL}&connectTimeout=${connectTimeout}"; + private static final String JDBC_URL_PATTERN_WITH_CUSTOM_USE_SSL = + "jdbc:mysql://${hostname}:${port}/?connectTimeout=${connectTimeout}"; + private final String urlPattern; + + public MySqlConnectionConfiguration(Configuration config, Properties jdbcProperties) { + // Set up the JDBC connection without actually connecting, with extra MySQL-specific + // properties + // to give us better JDBC database metadata behavior, including using UTF-8 for the + // client-side character encoding + // per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html + this.config = config; + final boolean useSSL = sslModeEnabled(); + final Configuration dbConfig = + config.filter( + x -> + !(x.startsWith( + DatabaseHistory + .CONFIGURATION_FIELD_PREFIX_STRING) + || x.equals( + MySqlConnectorConfig.DATABASE_HISTORY + .name()))) + .edit() + .withDefault( + MySqlConnectorConfig.PORT, + MySqlConnectorConfig.PORT.defaultValue()) + .build() + .subset("database.", true); + + final Builder jdbcConfigBuilder = + dbConfig.edit() + .with( + "connectTimeout", + Long.toString(getConnectionTimeout().toMillis())) + .with("useSSL", Boolean.toString(useSSL)); + + final String legacyDateTime = dbConfig.getString(JDBC_PROPERTY_LEGACY_DATETIME); + if (legacyDateTime == null) { + jdbcConfigBuilder.with(JDBC_PROPERTY_LEGACY_DATETIME, "false"); + } else if ("true".equals(legacyDateTime)) { + LOGGER.warn( + "'{}' is set to 'true'. This setting is not recommended and can result in timezone issues.", + JDBC_PROPERTY_LEGACY_DATETIME); + } + + jdbcConfigBuilder.with( + JDBC_PROPERTY_CONNECTION_TIME_ZONE, determineConnectionTimeZone(dbConfig)); + + this.jdbcConfig = jdbcConfigBuilder.build(); + String driverClassName = this.jdbcConfig.getString(MySqlConnectorConfig.JDBC_DRIVER); + this.urlPattern = formatJdbcUrl(jdbcProperties); + factory = + JdbcConnection.patternBasedFactory( + urlPattern, driverClassName, getClass().getClassLoader()); + } + + private static String determineConnectionTimeZone(final Configuration dbConfig) { + // Debezium by default expects timezoned data delivered in server timezone + String connectionTimeZone = dbConfig.getString(JDBC_PROPERTY_CONNECTION_TIME_ZONE); + + if (connectionTimeZone != null) { + return connectionTimeZone; + } + + // fall back to legacy property + final String serverTimeZone = dbConfig.getString(JDBC_PROPERTY_LEGACY_SERVER_TIME_ZONE); + if (serverTimeZone != null) { + LOGGER.warn( + "Database configuration option '{}' is set but is obsolete, please use '{}' instead", + JDBC_PROPERTY_LEGACY_SERVER_TIME_ZONE, + JDBC_PROPERTY_CONNECTION_TIME_ZONE); + connectionTimeZone = serverTimeZone; + } + + return connectionTimeZone != null ? connectionTimeZone : "SERVER"; + } + + public Configuration config() { + return jdbcConfig; + } + + public ConnectionFactory factory() { + return factory; + } + + public String username() { + return config.getString(MySqlConnectorConfig.USER); + } + + public String password() { + return config.getString(MySqlConnectorConfig.PASSWORD); + } + + public String hostname() { + return config.getString(MySqlConnectorConfig.HOSTNAME); + } + + public int port() { + return config.getInteger(MySqlConnectorConfig.PORT); + } + + public SecureConnectionMode sslMode() { + String mode = config.getString(MySqlConnectorConfig.SSL_MODE); + return SecureConnectionMode.parse(mode); + } + + public boolean sslModeEnabled() { + return sslMode() != SecureConnectionMode.DISABLED; + } + + public Duration getConnectionTimeout() { + return Duration.ofMillis(config.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS)); + } + + public String getUrlPattern() { + return urlPattern; + } + + private String formatJdbcUrl(Properties jdbcProperties) { + Properties combinedProperties = new Properties(); + combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES); + combinedProperties.putAll(jdbcProperties); + + StringBuilder jdbcUrlStringBuilder = + jdbcProperties.getProperty("useSSL") == null + ? new StringBuilder(JDBC_URL_PATTERN) + : new StringBuilder(JDBC_URL_PATTERN_WITH_CUSTOM_USE_SSL); + combinedProperties.forEach( + (key, value) -> { + jdbcUrlStringBuilder.append("&").append(key).append("=").append(value); + }); + + return jdbcUrlStringBuilder.toString(); + } + + private static Properties initializeDefaultJdbcProperties() { + Properties defaultJdbcProperties = new Properties(); + defaultJdbcProperties.setProperty("useInformationSchema", "true"); + defaultJdbcProperties.setProperty("nullCatalogMeansCurrent", "false"); + defaultJdbcProperties.setProperty("useUnicode", "true"); + defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL"); + defaultJdbcProperties.setProperty("characterEncoding", "UTF-8"); + defaultJdbcProperties.setProperty("characterSetResults", "UTF-8"); + return defaultJdbcProperties; + } + + public EventProcessingFailureHandlingMode eventProcessingFailureHandlingMode() { + String mode = + config.getString(CommonConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE); + if (mode == null) { + mode = + config.getString( + MySqlConnectorConfig.EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE); + } + return EventProcessingFailureHandlingMode.parse(mode); + } + + public EventProcessingFailureHandlingMode inconsistentSchemaHandlingMode() { + String mode = config.getString(MySqlConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE); + return EventProcessingFailureHandlingMode.parse(mode); + } + } + + @Override + public > Object getColumnValue( + ResultSet rs, int columnIndex, Column column, Table table, T schema) + throws SQLException { + return mysqlFieldReader.readField(rs, columnIndex, column, table); + } + + @Override + public String quotedTableIdString(TableId tableId) { + return tableId.toQuotedString('`'); + } +} diff --git a/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnectionWithJdbcProperties.java b/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnectionWithJdbcProperties.java deleted file mode 100644 index 422e1056a7..0000000000 --- a/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnectionWithJdbcProperties.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Copyright 2022 Ververica Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.debezium.connector.mysql; - -import io.debezium.config.Configuration; -import io.debezium.jdbc.JdbcConnection; - -import java.util.Properties; - -/** {@link MySqlConnection} extension to be used with MySQL Server. */ -public class MySqlConnectionWithJdbcProperties extends MySqlConnection { - private final String urlPattern; - /** - * Creates a new connection using the supplied configuration. - * - * @param connectionConfig {@link MySqlConnectionConfiguration} instance, may not be null. - */ - public MySqlConnectionWithJdbcProperties( - MySqlConnectionConfigurationWithCustomUrl connectionConfig) { - super(connectionConfig); - this.urlPattern = connectionConfig.getUrlPattern(); - } - - @Override - public String connectionString() { - return connectionString(urlPattern); - } - - /** - * {@link MySqlConnectionConfiguration} extension to be used with {@link - * MySqlConnectionWithJdbcProperties}. - */ - public static class MySqlConnectionConfigurationWithCustomUrl - extends MySqlConnectionConfiguration { - private static final Properties DEFAULT_JDBC_PROPERTIES = initializeDefaultJdbcProperties(); - private static final String JDBC_URL_PATTERN = - "jdbc:mysql://${hostname}:${port}/?useSSL=${useSSL}&connectTimeout=${connectTimeout}"; - private static final String JDBC_URL_PATTERN_WITH_CUSTOM_USE_SSL = - "jdbc:mysql://${hostname}:${port}/?connectTimeout=${connectTimeout}"; - - private final ConnectionFactory customFactory; - private final String urlPattern; - - public MySqlConnectionConfigurationWithCustomUrl( - Configuration config, Properties jdbcProperties) { - // Set up the JDBC connection without actually connecting, with extra MySQL-specific - // properties - // to give us better JDBC database metadata behavior, including using UTF-8 for the - // client-side character encoding - // per https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-charsets.html - super(config); - this.urlPattern = formatJdbcUrl(jdbcProperties); - String driverClassName = config().getString(MySqlConnectorConfig.JDBC_DRIVER); - customFactory = - JdbcConnection.patternBasedFactory( - urlPattern, driverClassName, getClass().getClassLoader()); - } - - public String getUrlPattern() { - return urlPattern; - } - - private String formatJdbcUrl(Properties jdbcProperties) { - Properties combinedProperties = new Properties(); - combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES); - combinedProperties.putAll(jdbcProperties); - - StringBuilder jdbcUrlStringBuilder = - jdbcProperties.getProperty("useSSL") == null - ? new StringBuilder(JDBC_URL_PATTERN) - : new StringBuilder(JDBC_URL_PATTERN_WITH_CUSTOM_USE_SSL); - combinedProperties.forEach( - (key, value) -> { - jdbcUrlStringBuilder.append("&").append(key).append("=").append(value); - }); - - return jdbcUrlStringBuilder.toString(); - } - - private static Properties initializeDefaultJdbcProperties() { - Properties defaultJdbcProperties = new Properties(); - defaultJdbcProperties.setProperty("useInformationSchema", "true"); - defaultJdbcProperties.setProperty("nullCatalogMeansCurrent", "false"); - defaultJdbcProperties.setProperty("useUnicode", "true"); - defaultJdbcProperties.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL"); - defaultJdbcProperties.setProperty("characterEncoding", "UTF-8"); - defaultJdbcProperties.setProperty("characterSetResults", "UTF-8"); - return defaultJdbcProperties; - } - - @Override - public ConnectionFactory factory() { - return customFactory; - } - } -}