Skip to content

Commit

Permalink
[connector-mysql] Allow user to pass custom JDBC URL parameters used …
Browse files Browse the repository at this point in the history
…by MySQL data source.
  • Loading branch information
paul8263 committed Mar 9, 2022
1 parent 16ec365 commit 7dd72c0
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 16 deletions.
7 changes: 7 additions & 0 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ During a snapshot operation, the connector will query each included table to pro
<td>Integer</td>
<td>The connection pool size.</td>
</tr>
<tr>
<td>connection.pool.url.properties</td>
<td>optional</td>
<td style="word-wrap: break-word;">20</td>
<td>String</td>
<td>Custom JDBC URL properties. Default value is 'zeroDateTimeBehavior=CONVERT_TO_NULL'.</td>
</tr>
<tr>
<td>debezium.*</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ public MySqlSourceBuilder<T> startupOptions(StartupOptions startupOptions) {
return this;
}

/** Custom properties that will append to the JDBC connection URL. */
public MySqlSourceBuilder<T> customProperties(String customProperties) {
this.configFactory.customProperties(customProperties);
return this;
}

/** The Debezium MySQL connector properties. For example, "snapshot.mode". */
public MySqlSourceBuilder<T> debeziumProperties(Properties properties) {
this.configFactory.debeziumProperties(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
private final boolean scanNewlyAddedTableEnabled;
@Nullable private final String customProperties;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -84,6 +85,7 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorLower,
boolean includeSchemaChanges,
boolean scanNewlyAddedTableEnabled,
String customProperties,
Properties dbzProperties) {
this.hostname = checkNotNull(hostname);
this.port = port;
Expand All @@ -104,6 +106,7 @@ public class MySqlSourceConfig implements Serializable {
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.customProperties = customProperties;
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
Expand Down Expand Up @@ -186,6 +189,11 @@ public boolean isScanNewlyAddedTableEnabled() {
return scanNewlyAddedTableEnabled;
}

@Nullable
public String getCustomProperties() {
return customProperties;
}

public Properties getDbzProperties() {
return dbzProperties;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CUSTOM_JDBC_URL_PROPERTIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SERVER_TIME_ZONE;
Expand Down Expand Up @@ -69,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean scanNewlyAddedTableEnabled = false;
private String customProperties = CUSTOM_JDBC_URL_PROPERTIES.defaultValue();
private Properties dbzProperties;

public MySqlSourceConfigFactory hostname(String hostname) {
Expand Down Expand Up @@ -214,6 +216,12 @@ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdde
return this;
}

/** Custom properties that will append to the JDBC connection URL. */
public MySqlSourceConfigFactory customProperties(String customProperties) {
this.customProperties = customProperties;
return this;
}

/** Specifies the startup options. */
public MySqlSourceConfigFactory startupOptions(StartupOptions startupOptions) {
switch (startupOptions.startupMode) {
Expand Down Expand Up @@ -311,6 +319,7 @@ public MySqlSourceConfig createConfig(int subtaskId) {
distributionFactorLower,
includeSchemaChanges,
scanNewlyAddedTableEnabled,
customProperties,
props);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ public class MySqlSourceOptions {
.withDescription(
"Optional timestamp used in case of \"timestamp\" startup mode");

public static final ConfigOption<String> CUSTOM_JDBC_URL_PROPERTIES =
ConfigOptions.key("connection.pool.url.properties")
.stringType()
.defaultValue("zeroDateTimeBehavior=CONVERT_TO_NULL")
.withDescription("Custom JDBC URL properties");

// ----------------------------------------------------------------------------
// experimental options, won't add them to documentation
// ----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
public class PooledDataSourceFactory {

public static final String JDBC_URL_PATTERN =
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8";
public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
public static final String SERVER_TIMEZONE_KEY = "serverTimezone";
public static final int MINIMUM_POOL_SIZE = 1;
Expand All @@ -39,9 +39,10 @@ public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceCo

String hostName = sourceConfig.getHostname();
int port = sourceConfig.getPort();
String customProperties = sourceConfig.getCustomProperties();

config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
config.setJdbcUrl(String.format(JDBC_URL_PATTERN, hostName, port));
config.setJdbcUrl(formatJdbcUrlPattern(hostName, port, customProperties));
config.setUsername(sourceConfig.getUsername());
config.setPassword(sourceConfig.getPassword());
config.setMinimumIdle(MINIMUM_POOL_SIZE);
Expand All @@ -58,4 +59,12 @@ public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceCo

return new HikariDataSource(config);
}

private static String formatJdbcUrlPattern(String hostName, int port, String customProperties) {
String jdbcUrl = String.format(JDBC_URL_PATTERN, hostName, port);
if (null != customProperties) {
jdbcUrl += "&" + customProperties;
}
return jdbcUrl;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final double distributionFactorLower;
private final StartupOptions startupOptions;
private final boolean scanNewlyAddedTableEnabled;
private final String customProperties;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -132,7 +133,8 @@ public MySqlTableSource(
distributionFactorUpper,
distributionFactorLower,
startupOptions,
false);
false,
null);
}

public MySqlTableSource(
Expand All @@ -156,7 +158,8 @@ public MySqlTableSource(
double distributionFactorUpper,
double distributionFactorLower,
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled) {
boolean scanNewlyAddedTableEnabled,
String customProperties) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand All @@ -178,6 +181,7 @@ public MySqlTableSource(
this.distributionFactorLower = distributionFactorLower;
this.startupOptions = startupOptions;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.customProperties = customProperties;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
Expand Down Expand Up @@ -233,6 +237,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.startupOptions(startupOptions)
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.customProperties(customProperties)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -309,7 +314,8 @@ public DynamicTableSource copy() {
distributionFactorUpper,
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
customProperties);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -346,7 +352,8 @@ public boolean equals(Object o) {
&& Objects.equals(connectionPoolSize, that.connectionPoolSize)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(customProperties, that.customProperties);
}

@Override
Expand Down Expand Up @@ -374,7 +381,8 @@ public int hashCode() {
startupOptions,
producedDataType,
metadataKeys,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
customProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CUSTOM_JDBC_URL_PROPERTIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.DATABASE_NAME;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.HOSTNAME;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.PASSWORD;
Expand Down Expand Up @@ -97,6 +98,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
double distributionFactorUpper = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
double distributionFactorLower = config.get(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
boolean scanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
String customProperties = config.get(CUSTOM_JDBC_URL_PROPERTIES);

boolean enableParallelRead = config.get(SCAN_INCREMENTAL_SNAPSHOT_ENABLED);
if (enableParallelRead) {
Expand Down Expand Up @@ -132,7 +134,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
distributionFactorUpper,
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
customProperties);
}

@Override
Expand Down Expand Up @@ -171,6 +174,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(CONNECT_MAX_RETRIES);
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(CUSTOM_JDBC_URL_PROPERTIES);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECTION_POOL_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_MAX_RETRIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CONNECT_TIMEOUT;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.CUSTOM_JDBC_URL_PROPERTIES;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
Expand Down Expand Up @@ -119,7 +120,9 @@ public void testCommonProperties() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
false,
CUSTOM_JDBC_URL_PROPERTIES.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -158,7 +161,9 @@ public void testEnableParallelReadSource() {
CONNECTION_POOL_SIZE.defaultValue(),
40.5d,
0.01d,
StartupOptions.initial());
StartupOptions.initial(),
false,
CUSTOM_JDBC_URL_PROPERTIES.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -194,7 +199,9 @@ public void testEnableParallelReadSourceWithSingleServerId() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
false,
CUSTOM_JDBC_URL_PROPERTIES.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -228,7 +235,9 @@ public void testEnableParallelReadSourceLatestOffset() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest());
StartupOptions.latest(),
false,
CUSTOM_JDBC_URL_PROPERTIES.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -266,7 +275,8 @@ public void testOptionalProperties() {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
true);
true,
CUSTOM_JDBC_URL_PROPERTIES.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -322,7 +332,9 @@ public void testStartupFromInitial() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
false,
CUSTOM_JDBC_URL_PROPERTIES.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -387,7 +399,9 @@ public void testStartupFromLatestOffset() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest());
StartupOptions.latest(),
false,
CUSTOM_JDBC_URL_PROPERTIES.defaultValue());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -424,7 +438,9 @@ public void testMetadataColumns() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
false,
CUSTOM_JDBC_URL_PROPERTIES.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

Expand Down

0 comments on commit 7dd72c0

Please sign in to comment.