Skip to content

Commit

Permalink
DBZ-693 On connect statements supported for PostgreSQL
Browse files Browse the repository at this point in the history
  • Loading branch information
jpechane committed May 24, 2018
1 parent 624c4fb commit 0e0a6d8
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 8 deletions.
Expand Up @@ -513,8 +513,9 @@ public static EventProcessingFailureHandlingMode parse(String value) {
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withDescription("A semicolon separated list of SQL statements to be executed when connection to database is established. "
+ "Typically used for configuration of session parameters.");
.withDescription("A semicolon separated list of SQL statements to be executed when JDBC connection (not binlog reading connection) to the database is established. "
+ "Typically used for configuration of session parameters. "
+ "Use doubled semicolon ';;' to use it as a character not as a delimiter");

public static final Field SERVER_ID = Field.create("database.server.id")
.withDisplayName("Cluster ID")
Expand Down
Expand Up @@ -451,6 +451,15 @@ public String getPostgresPluginName() {
.withValidation(Field::isRequired)
.withDescription("The name of the database the connector should be monitoring");

public static final Field ON_CONNECT_STATEMENTS = Field.create(DATABASE_CONFIG_PREFIX + JdbcConfiguration.ON_CONNECT_STATEMENTS)
.withDisplayName("Initial statements")
.withType(Type.STRING)
.withWidth(Width.LONG)
.withImportance(Importance.LOW)
.withDescription("A semicolon separated list of SQL statements to be executed when JDBC connection (not binlog reading connection) to the database is established. "
+ "Typically used for configuration of session parameters. "
+ "Use doubled semicolon ';;' to use it as a character not as a delimiter");

public static final Field SERVER_NAME = Field.create(DATABASE_CONFIG_PREFIX + "server.name")
.withDisplayName("Namespace")
.withType(Type.STRING)
Expand Down Expand Up @@ -683,7 +692,7 @@ public String getPostgresPluginName() {
* The set of {@link Field}s defined as part of this configuration.
*/
public static Field.Set ALL_FIELDS = Field.setOf(PLUGIN_NAME, SLOT_NAME, DROP_SLOT_ON_STOP,
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, SERVER_NAME,
DATABASE_NAME, USER, PASSWORD, HOSTNAME, PORT, ON_CONNECT_STATEMENTS, SERVER_NAME,
TOPIC_SELECTION_STRATEGY, CommonConnectorConfig.MAX_BATCH_SIZE,
CommonConnectorConfig.MAX_QUEUE_SIZE, CommonConnectorConfig.POLL_INTERVAL_MS, SCHEMA_WHITELIST,
SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
Expand Down Expand Up @@ -826,7 +835,7 @@ protected String snapshotSelectOverrideForTable(String table) {
protected static ConfigDef configDef() {
ConfigDef config = new ConfigDef();
Field.group(config, "Postgres", SLOT_NAME, PLUGIN_NAME, SERVER_NAME, DATABASE_NAME, HOSTNAME, PORT,
USER, PASSWORD, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY,
USER, PASSWORD, ON_CONNECT_STATEMENTS, SSL_MODE, SSL_CLIENT_CERT, SSL_CLIENT_KEY_PASSWORD, SSL_ROOT_CERT, SSL_CLIENT_KEY,
DROP_SLOT_ON_STOP, SSL_SOCKET_FACTORY, STATUS_UPDATE_INTERVAL_MS, TCP_KEEPALIVE);
Field.group(config, "Events", SCHEMA_WHITELIST, SCHEMA_BLACKLIST, TABLE_WHITELIST, TABLE_BLACKLIST,
COLUMN_BLACKLIST, INCLUDE_UNKNOWN_DATATYPES, SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
Expand Down
Expand Up @@ -65,7 +65,7 @@ private PostgresReplicationConnection(Configuration config,
boolean dropSlotOnClose,
Integer statusUpdateIntervalMillis,
TypeRegistry typeRegistry) {
super(config, PostgresConnection.FACTORY, null ,PostgresReplicationConnection::defaultSettings);
super(config, PostgresConnection.FACTORY, null, PostgresReplicationConnection::defaultSettings);

this.originalConfig = config;
this.slotName = slotName;
Expand Down Expand Up @@ -109,6 +109,9 @@ protected void initReplicationSlot() throws SQLException {
}

AtomicLong xlogStart = new AtomicLong();
// replication connection does not support parsing of SQL statements so we need to create
// the connection without executing on connect statements - see JDBC opt preferQueryMode=simple
pgConnection();
execute(statement -> {
String identifySystemStatement = "IDENTIFY_SYSTEM";
LOGGER.debug("running '{}' to validate replication connection", identifySystemStatement);
Expand Down Expand Up @@ -154,7 +157,7 @@ public ReplicationStream startStreaming(Long offset) throws SQLException {
}

protected PGConnection pgConnection() throws SQLException {
return (PGConnection) connection();
return (PGConnection) connection(false);
}

private ReplicationStream createReplicationStream(final LogSequenceNumber lsn) throws SQLException {
Expand Down
Expand Up @@ -242,6 +242,21 @@ public void shouldIgnoreViews() throws Exception {
assertRecordsAfterInsert(2, 3, 3);
}

@Test
public void shouldExecuteOnConnectStatements() throws Exception {
TestHelper.execute(SETUP_TABLES_STMT);
Configuration.Builder configBuilder = TestHelper.defaultConfig()
.with(PostgresConnectorConfig.SNAPSHOT_MODE, INITIAL.getValue())
.with(PostgresConnectorConfig.ON_CONNECT_STATEMENTS, "INSERT INTO s1.a (aa) VALUES (2); INSERT INTO s2.a (aa) VALUES (2)")
.with(PostgresConnectorConfig.DROP_SLOT_ON_STOP, Boolean.FALSE);
start(PostgresConnector.class, configBuilder.build());
assertConnectorIsRunning();

SourceRecords actualRecords = consumeRecordsByTopic(2);
assertKey(actualRecords.allRecordsInOrder().get(0), "pk", 1);
assertKey(actualRecords.allRecordsInOrder().get(1), "pk", 2);
}

@Test
public void shouldProduceEventsWhenSnapshotsAreNeverAllowed() throws InterruptedException {
TestHelper.execute(SETUP_TABLES_STMT);
Expand Down
39 changes: 37 additions & 2 deletions debezium-core/src/main/java/io/debezium/jdbc/JdbcConnection.java
Expand Up @@ -53,6 +53,7 @@
*/
public class JdbcConnection implements AutoCloseable {

private static final char STATEMENT_DELIMITER = ';';
private final static Logger LOGGER = LoggerFactory.getLogger(JdbcConnection.class);

/**
Expand Down Expand Up @@ -581,14 +582,48 @@ public synchronized boolean isConnected() throws SQLException {
}

public synchronized Connection connection() throws SQLException {
return connection(true);
}

public synchronized Connection connection(boolean executeOnConnect) throws SQLException {
if (conn == null) {
conn = factory.connect(JdbcConfiguration.adapt(config));
if (conn == null) throw new SQLException("Unable to obtain a JDBC connection");
// Always run the initial operations on this new connection
if (initialOps != null) execute(initialOps);
final String statements = config.getString(JdbcConfiguration.ON_CONNECT_STATEMENTS);
if (statements != null) {
execute(statements.split(";"));
if (statements != null && executeOnConnect) {
final List<String> splitStatements = new ArrayList<>();
final char[] statementsChars = statements.toCharArray();
StringBuilder activeStatement = new StringBuilder();
for (int i = 0; i < statementsChars.length; i++) {
if (statementsChars[i] == STATEMENT_DELIMITER) {
if (i == statementsChars.length - 1) {
// last character so it is the delimiter
}
else if (statementsChars[i + 1] == STATEMENT_DELIMITER) {
// two semicolons in a row - escaped semicolon
activeStatement.append(STATEMENT_DELIMITER);
i++;
}
else {
// semicolon as a delimiter
final String trimmedStatement = activeStatement.toString().trim();
if (!trimmedStatement.isEmpty()) {
splitStatements.add(trimmedStatement);
}
activeStatement = new StringBuilder();
}
}
else {
activeStatement.append(statementsChars[i]);
}
}
final String trimmedStatement = activeStatement.toString().trim();
if (!trimmedStatement.isEmpty()) {
splitStatements.add(trimmedStatement);
}
execute(splitStatements.toArray(new String[splitStatements.size()]));
}
}
return conn;
Expand Down

0 comments on commit 0e0a6d8

Please sign in to comment.