Skip to content

Commit

Permalink
[connector-mysql] Allow user to pass custom JDBC URL parameters used …
Browse files Browse the repository at this point in the history
…by MySQL data source.
  • Loading branch information
paul8263 committed Mar 14, 2022
1 parent 16ec365 commit 3b32e5f
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 19 deletions.
7 changes: 7 additions & 0 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,13 @@ During a snapshot operation, the connector will query each included table to pro
<td>Integer</td>
<td>The connection pool size.</td>
</tr>
<tr>
<td>jdbc.properties.*</td>
<td>optional</td>
<td style="word-wrap: break-word;">20</td>
<td>String</td>
<td>Custom JDBC URL properties. Default value is 'zeroDateTimeBehavior=CONVERT_TO_NULL'.</td>
</tr>
<tr>
<td>debezium.*</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,12 @@ public MySqlSourceBuilder<T> startupOptions(StartupOptions startupOptions) {
return this;
}

/** Custom properties that will append to the JDBC connection URL. */
public MySqlSourceBuilder<T> jdbcProperties(Properties jdbcProperties) {
this.configFactory.jdbcProperties(jdbcProperties);
return this;
}

/** The Debezium MySQL connector properties. For example, "snapshot.mode". */
public MySqlSourceBuilder<T> debeziumProperties(Properties properties) {
this.configFactory.debeziumProperties(properties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;

// --------------------------------------------------------------------------------------------
// Debezium Configurations
Expand Down Expand Up @@ -84,7 +85,8 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorLower,
boolean includeSchemaChanges,
boolean scanNewlyAddedTableEnabled,
Properties dbzProperties) {
Properties dbzProperties,
Properties jdbcProperties) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand All @@ -107,6 +109,7 @@ public class MySqlSourceConfig implements Serializable {
this.dbzProperties = checkNotNull(dbzProperties);
this.dbzConfiguration = Configuration.from(dbzProperties);
this.dbzMySqlConfig = new MySqlConnectorConfig(dbzConfiguration);
this.jdbcProperties = jdbcProperties;
}

public String getHostname() {
Expand Down Expand Up @@ -201,4 +204,8 @@ public MySqlConnectorConfig getMySqlConnectorConfig() {
public RelationalTableFilters getTableFilters() {
return dbzMySqlConfig.getTableFilters();
}

public Properties getJdbcProperties() {
return jdbcProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public class MySqlSourceConfigFactory implements Serializable {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
private boolean scanNewlyAddedTableEnabled = false;
private Properties jdbcProperties;
private Properties dbzProperties;

public MySqlSourceConfigFactory hostname(String hostname) {
Expand Down Expand Up @@ -214,6 +215,12 @@ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdde
return this;
}

/** Custom properties that will append to the JDBC connection URL. */
public MySqlSourceConfigFactory jdbcProperties(Properties jdbcProperties) {
this.jdbcProperties = jdbcProperties;
return this;
}

/** Specifies the startup options. */
public MySqlSourceConfigFactory startupOptions(StartupOptions startupOptions) {
switch (startupOptions.startupMode) {
Expand Down Expand Up @@ -311,6 +318,7 @@ public MySqlSourceConfig createConfig(int subtaskId) {
distributionFactorLower,
includeSchemaChanges,
scanNewlyAddedTableEnabled,
props);
props,
jdbcProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,23 @@
import com.zaxxer.hikari.HikariDataSource;
import io.debezium.connector.mysql.MySqlConnectorConfig;

import java.util.Properties;

/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
public class PooledDataSourceFactory {

public static final String JDBC_URL_PATTERN =
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterSetResults=UTF-8";
public static final String CONNECTION_POOL_PREFIX = "connection-pool-";
public static final String SERVER_TIMEZONE_KEY = "serverTimezone";
public static final int MINIMUM_POOL_SIZE = 1;
private static final Properties DEFAULT_JDBC_PROPERTIES;

static {
DEFAULT_JDBC_PROPERTIES = new Properties();
DEFAULT_JDBC_PROPERTIES.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL");
DEFAULT_JDBC_PROPERTIES.setProperty("characterEncoding", "UTF-8");
}

private PooledDataSourceFactory() {}

Expand All @@ -39,9 +48,10 @@ public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceCo

String hostName = sourceConfig.getHostname();
int port = sourceConfig.getPort();
Properties jdbcProperties = sourceConfig.getJdbcProperties();

config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
config.setJdbcUrl(String.format(JDBC_URL_PATTERN, hostName, port));
config.setJdbcUrl(formatJdbcUrl(hostName, port, jdbcProperties));
config.setUsername(sourceConfig.getUsername());
config.setPassword(sourceConfig.getPassword());
config.setMinimumIdle(MINIMUM_POOL_SIZE);
Expand All @@ -58,4 +68,20 @@ public static HikariDataSource createPooledDataSource(MySqlSourceConfig sourceCo

return new HikariDataSource(config);
}

private static String formatJdbcUrl(String hostName, int port, Properties jdbcProperties) {
Properties combinedProperties = new Properties();
combinedProperties.putAll(DEFAULT_JDBC_PROPERTIES);
combinedProperties.putAll(jdbcProperties);

StringBuilder jdbcUrlStringBuilder =
new StringBuilder(String.format(JDBC_URL_PATTERN, hostName, port));

combinedProperties.forEach(
(key, value) -> {
jdbcUrlStringBuilder.append("&").append(key).append("=").append(value);
});

return jdbcUrlStringBuilder.toString();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.mysql.table;

import java.util.Map;
import java.util.Properties;

/** Option utils for JDBC options. */
public class JDBCOptions {

// Prefix for JDBC specific properties.
public static final String PROPERTIES_PREFIX = "jdbc.properties.";

public static Properties getJdbcProperties(Map<String, String> tableOptions) {
Properties jdbcProperties = new Properties();
if (hasJdbcProperties(tableOptions)) {
tableOptions.keySet().stream()
.filter(key -> key.startsWith(PROPERTIES_PREFIX))
.forEach(
key -> {
final String value = tableOptions.get(key);
final String subKey = key.substring((PROPERTIES_PREFIX).length());
jdbcProperties.put(subKey, value);
});
}
return jdbcProperties;
}

/**
* Decides if the table options contains JDBC properties that start with prefix
* 'jdbc.properties'.
*/
private static boolean hasJdbcProperties(Map<String, String> tableOptions) {
return tableOptions.keySet().stream().anyMatch(k -> k.startsWith(PROPERTIES_PREFIX));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final double distributionFactorLower;
private final StartupOptions startupOptions;
private final boolean scanNewlyAddedTableEnabled;
private final Properties jdbcProperties;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -132,7 +133,8 @@ public MySqlTableSource(
distributionFactorUpper,
distributionFactorLower,
startupOptions,
false);
false,
new Properties());
}

public MySqlTableSource(
Expand All @@ -156,7 +158,8 @@ public MySqlTableSource(
double distributionFactorUpper,
double distributionFactorLower,
StartupOptions startupOptions,
boolean scanNewlyAddedTableEnabled) {
boolean scanNewlyAddedTableEnabled,
Properties jdbcProperties) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand All @@ -178,6 +181,7 @@ public MySqlTableSource(
this.distributionFactorLower = distributionFactorLower;
this.startupOptions = startupOptions;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.jdbcProperties = jdbcProperties;
// Mutable attributes
this.producedDataType = physicalSchema.toPhysicalRowDataType();
this.metadataKeys = Collections.emptyList();
Expand Down Expand Up @@ -233,6 +237,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.startupOptions(startupOptions)
.deserializer(deserializer)
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.jdbcProperties(jdbcProperties)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -309,7 +314,8 @@ public DynamicTableSource copy() {
distributionFactorUpper,
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
jdbcProperties);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down Expand Up @@ -346,7 +352,8 @@ public boolean equals(Object o) {
&& Objects.equals(connectionPoolSize, that.connectionPoolSize)
&& Objects.equals(startupOptions, that.startupOptions)
&& Objects.equals(producedDataType, that.producedDataType)
&& Objects.equals(metadataKeys, that.metadataKeys);
&& Objects.equals(metadataKeys, that.metadataKeys)
&& Objects.equals(jdbcProperties, that.jdbcProperties);
}

@Override
Expand Down Expand Up @@ -374,7 +381,8 @@ public int hashCode() {
startupOptions,
producedDataType,
metadataKeys,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
jdbcProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ public class MySqlTableSourceFactory implements DynamicTableSourceFactory {
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
helper.validateExcept(DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX);
helper.validateExcept(
DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX, JDBCOptions.PROPERTIES_PREFIX);

final ReadableConfig config = helper.getOptions();
String hostname = config.get(HOSTNAME);
Expand Down Expand Up @@ -132,7 +133,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
distributionFactorUpper,
distributionFactorLower,
startupOptions,
scanNewlyAddedTableEnabled);
scanNewlyAddedTableEnabled,
JDBCOptions.getJdbcProperties(context.getCatalogTable().getOptions()));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,9 @@ public void testCommonProperties() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
false,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -158,7 +160,9 @@ public void testEnableParallelReadSource() {
CONNECTION_POOL_SIZE.defaultValue(),
40.5d,
0.01d,
StartupOptions.initial());
StartupOptions.initial(),
false,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -194,7 +198,9 @@ public void testEnableParallelReadSourceWithSingleServerId() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
false,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -228,7 +234,9 @@ public void testEnableParallelReadSourceLatestOffset() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest());
StartupOptions.latest(),
false,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -266,7 +274,8 @@ public void testOptionalProperties() {
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial(),
true);
true,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -322,7 +331,9 @@ public void testStartupFromInitial() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
false,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -387,7 +398,9 @@ public void testStartupFromLatestOffset() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.latest());
StartupOptions.latest(),
false,
new Properties());
assertEquals(expectedSource, actualSource);
}

Expand Down Expand Up @@ -424,7 +437,9 @@ public void testMetadataColumns() {
CONNECTION_POOL_SIZE.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
StartupOptions.initial());
StartupOptions.initial(),
false,
new Properties());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");

Expand Down

0 comments on commit 3b32e5f

Please sign in to comment.