Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-693 Initial SQL statements #516

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -508,6 +508,15 @@ public static EventProcessingFailureHandlingMode parse(String value) {
+ "Each distinct MySQL installation should have a separate namespace and monitored by "
+ "at most one Debezium connector.");

public static final Field ON_CONNECT_STATEMENTS = Field.create("database.initial.statements")
.withDisplayName("Initial statements")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withDescription("A semicolon separated list of SQL statements to be executed when JDBC connection (not binlog reading connection) to the database is established. "
+ "Typically used for configuration of session parameters. "
+ "Use doubled semicolon ';;' to use it as a character not as a delimiter");

public static final Field SERVER_ID = Field.create("database.server.id")
.withDisplayName("Cluster ID")
.withType(Type.LONG)
Expand Down Expand Up @@ -908,7 +917,7 @@ public static final Field MASK_COLUMN(int length) {
/**
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(USER, PASSWORD, HOSTNAME, PORT, SERVER_ID,
public static Field.Set ALL_FIELDS = Field.setOf(USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_ID,
SERVER_NAME,
CONNECTION_TIMEOUT_MS, KEEP_ALIVE, KEEP_ALIVE_INTERVAL_MS,
CommonConnectorConfig.MAX_QUEUE_SIZE,
Expand Down Expand Up @@ -968,7 +977,7 @@ public SnapshotLockingMode getSnapshotLockingMode() {

protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, SERVER_NAME, SERVER_ID,
Field.group(config, "MySQL", HOSTNAME, PORT, USER, PASSWORD, ON_CONNECT_STATEMENTS, SERVER_NAME, SERVER_ID,
SSL_MODE, SSL_KEYSTORE, SSL_KEYSTORE_PASSWORD, SSL_TRUSTSTORE, SSL_TRUSTSTORE_PASSWORD, JDBC_DRIVER);
Field.group(config, "History Storage", KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
KafkaDatabaseHistory.TOPIC, KafkaDatabaseHistory.RECOVERY_POLL_ATTEMPTS,
Expand Down
Expand Up @@ -199,6 +199,7 @@ public void shouldValidateAcceptableConfiguration() {
.with(KafkaDatabaseHistory.BOOTSTRAP_SERVERS, "some.host.com")
.with(KafkaDatabaseHistory.TOPIC, "my.db.history.topic")
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.ON_CONNECT_STATEMENTS, "SET SESSION wait_timeout=2000")
.build();
MySqlConnector connector = new MySqlConnector();
Config result = connector.validate(config.asMap());
Expand All @@ -207,6 +208,7 @@ public void shouldValidateAcceptableConfiguration() {
assertNoConfigurationErrors(result, MySqlConnectorConfig.PORT);
assertNoConfigurationErrors(result, MySqlConnectorConfig.USER);
assertNoConfigurationErrors(result, MySqlConnectorConfig.PASSWORD);
assertNoConfigurationErrors(result, MySqlConnectorConfig.ON_CONNECT_STATEMENTS);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SERVER_NAME);
assertNoConfigurationErrors(result, MySqlConnectorConfig.SERVER_ID);
assertNoConfigurationErrors(result, MySqlConnectorConfig.TABLES_IGNORE_BUILTIN);
Expand Down
Expand Up @@ -451,6 +451,15 @@ public String getPostgresPluginName() {
.withValidation(Field::isRequired)
.withDescription("The name of the database the connector should be monitoring");

public static final Field ON_CONNECT_STATEMENTS = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.ON_CONNECT_STATEMENTS)
.withDisplayName("Initial statements")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withDescription("A semicolon separated list of SQL statements to be executed when JDBC connection (not binlog reading connection) to the database is established. "
+ "Typically used for configuration of session parameters. "
+ "Use doubled semicolon ';;' to use it as a character not as a delimiter");

public static final Field SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "server.name")
.withDisplayName("Namespace")
.withType(Type.STRING)
Expand Down Expand Up @@ -683,7 +692,7 @@ public String getPostgresPluginName() {
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, SLOT_NAME, DROP_SLOT_ON_STOP,
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, SERVER_NAME,
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_NAME,
TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS, SCHEMA_WHITELIST,
SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
Expand Down Expand Up @@ -826,7 +835,7 @@ protected String snapshotSelectOverrideForTable(String table) {
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
USER, PASSWORD, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY,
USER, PASSWORD, ON_CONNECT_STATEMENTS, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY,
DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
Expand Down
Expand Up @@ -65,7 +65,7 @@ private PostgresReplicationConnection(Configuration config,
boolean dropSlotOnClose,
Integer statusUpdateIntervalMillis,
TypeRegistry typeRegistry) {
super(config, PostgresConnection.FACTORY, null ,PostgresReplicationConnection::defaultSettings);
super(config, PostgresConnection.FACTORY, null, PostgresReplicationConnection::defaultSettings);

this.originalConfig = config;
this.slotName = slotName;
Expand Down Expand Up @@ -109,6 +109,9 @@ protected void initReplicationSlot() throws SQLException {
}

AtomicLong xlogStart = new AtomicLong();
// replication connection does not support parsing of SQL statements so we need to create
// the connection without executing on connect statements - see JDBC opt preferQueryMode=simple
pgConnection();
execute(statement -> {
String identifySystemStatement = "IDENTIFY_SYSTEM";
LOGGER.debug("running '{}' to validate replication connection", identifySystemStatement);
Expand Down Expand Up @@ -154,7 +157,7 @@ public ReplicationStream startStreaming(Long offset) throws SQLException {
}

protected PGConnection pgConnection() throws SQLException {
return (PGConnection) connection();
return (PGConnection) connection(false);
}

private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) throws SQLException {
Expand Down
Expand Up @@ -242,6 +242,21 @@ public void shouldIgnoreViews() throws Exception {
assertRecordsAfterInsert(2, 3, 3);
}

@Test
public void shouldExecuteOnConnectStatements() throws Exception {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.ON_CONNECT_STATEMENTS, "INSERT INTO s1.a (aa) VALUES (2); INSERT INTO s2.a (aa) VALUES (2)")
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();

SourceRecords actualRecords = consumeRecordsByTopic(2);
assertKey(actualRecords.allRecordsInOrder().get(0), "pk", 1);
assertKey(actualRecords.allRecordsInOrder().get(1), "pk", 2);
}

@Test
public void shouldProduceEventsWhenSnapshotsAreNeverAllowed() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Expand Down
Expand Up @@ -43,16 +43,23 @@ public interface JdbcConfiguration extends Configuration {
* A field for the hostname of the database server. This field has no default value.
*/
public static final Field HOSTNAME = Field.create("hostname", "IP address of the database");

/**
* A field for the port of the database server. There is no default value.
*/
public static final Field PORT = Field.create("port", "Port of the database");

/**
* A semicolon separated list of SQL statements to be executed when the connection to database is established.
* Typical use-case is setting of session parameters. There is no default value.
*/
public static final Field ON_CONNECT_STATEMENTS = Field.create("initial.statements", "A semicolon separated list of statements to be executed on connection");

/**
* The set of names of the pre-defined JDBC configuration fields, including {@link #DATABASE}, {@link #USER},
* {@link #PASSWORD}, {@link #HOSTNAME}, and {@link #PORT}.
*/
public static Set<String> ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT);
public static Set<String> ALL_KNOWN_FIELDS = Collect.unmodifiableSet(Field::name, DATABASE, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS);

/**
* Obtain a {@link JdbcConfiguration} adapter for the given {@link Configuration}.
Expand Down
39 changes: 39 additions & 0 deletions debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java
Expand Up @@ -53,6 +53,7 @@
*/
public class JdbcConnection implements AutoCloseable {

private static final char STATEMENT_DELIMITER = ';';
private final static Logger LOGGER = LoggerFactory.getLogger(JdbcConnection.class);

/**
Expand Down Expand Up @@ -581,11 +582,49 @@ public synchronized boolean isConnected() throws SQLException {
}

public synchronized Connection connection() throws SQLException {
return connection(true);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be safer to not execute the statements when calling the parameterless connection(). Note that when using the PG connector, I see the insert being executed three times, so something is still not quite correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gunnarmorling It is correct, as postgres connector creates multiple connections in sequence. The functionality is not for executing DML (albeit I abuse it for tests) just mainly for session configuration and in this case I expect the session to be configured every time the connection is created.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I'll expand the option's description a little bit then to clarify that fact.

}

public synchronized Connection connection(boolean executeOnConnect) throws SQLException {
if (conn == null) {
conn = factory.connect(JdbcConfiguration.adapt(config));
if (conn == null) throw new SQLException("Unable to obtain a JDBC connection");
// Always run the initial operations on this new connection
if (initialOps != null) execute(initialOps);
final String statements = config.getString(JdbcConfiguration.ON_CONNECT_STATEMENTS);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest to extract all the parsing logic into its own method which then returns the list of statements to here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (statements != null && executeOnConnect) {
final List<String> splitStatements = new ArrayList<>();
final char[] statementsChars = statements.toCharArray();
StringBuilder activeStatement = new StringBuilder();
for (int i = 0; i < statementsChars.length; i++) {
if (statementsChars[i] == STATEMENT_DELIMITER) {
if (i == statementsChars.length - 1) {
// last character so it is the delimiter
}
else if (statementsChars[i + 1] == STATEMENT_DELIMITER) {
// two semicolons in a row - escaped semicolon
activeStatement.append(STATEMENT_DELIMITER);
i++;
}
else {
// semicolon as a delimiter
final String trimmedStatement = activeStatement.toString().trim();
if (!trimmedStatement.isEmpty()) {
splitStatements.add(trimmedStatement);
}
activeStatement = new StringBuilder();
}
}
else {
activeStatement.append(statementsChars[i]);
}
}
final String trimmedStatement = activeStatement.toString().trim();
if (!trimmedStatement.isEmpty()) {
splitStatements.add(trimmedStatement);
}
execute(splitStatements.toArray(new String[splitStatements.size()]));
}
}
return conn;
}
Expand Down