From 828960f9262b7acc7f71e958e4c7a3776ae7cff1 Mon Sep 17 00:00:00 2001 From: mfvitale Date: Thu, 18 Apr 2024 12:39:03 +0200 Subject: [PATCH] DBZ-7616 Align query timeout changes to MariaDB connector --- .../binlog/BinlogConnectorConfig.java | 1 + .../BinlogStreamingChangeEventSource.java | 7 ++++- .../connector/binlog/BinlogConnectorIT.java | 29 +++++++++++-------- .../connector/binlog/BinlogConnectorTest.java | 2 ++ .../connector/mariadb/ConnectionIT.java | 24 +++++++++++++++ .../connector/mariadb/MariaDbCommon.java | 5 ++++ .../connector/mariadb/MariaDbConnectorIT.java | 6 ++++ .../mariadb/util/MariaDbTestConnection.java | 15 ++++++++++ .../connector/mysql/MySqlConnectorConfig.java | 3 +- .../connector/mysql/ConnectionIT.java | 24 +++++++++++++++ .../debezium/connector/mysql/MySqlCommon.java | 5 ++++ .../connector/mysql/MySqlTestConnection.java | 7 ++--- 12 files changed, 108 insertions(+), 20 deletions(-) diff --git a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogConnectorConfig.java b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogConnectorConfig.java index b8770c91ae3..b3a0a46a586 100644 --- a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogConnectorConfig.java +++ b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogConnectorConfig.java @@ -664,6 +664,7 @@ public interface SnapshotLockingStrategy { PORT, USER, PASSWORD, + QUERY_TIMEOUT_MS, ON_CONNECT_STATEMENTS, SERVER_ID, SERVER_ID_OFFSET, diff --git a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogStreamingChangeEventSource.java b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogStreamingChangeEventSource.java index 4ce757183ed..c13a02b0843 100644 --- a/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogStreamingChangeEventSource.java +++ b/debezium-connector-binlog/src/main/java/io/debezium/connector/binlog/BinlogStreamingChangeEventSource.java @@ -106,6 +106,7 @@ public abstract class BinlogStreamingChangeEventSource

metrics; @@ -701,7 +702,7 @@ protected void handleQueryEvent(P partition, O offsetContext, Event event) throw return; } - String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase(); + String upperCasedStatementBegin = Strings.getBegin(removeSetStatement(sql), 7).toUpperCase(); if (upperCasedStatementBegin.startsWith("XA ")) { // This is an XA transaction, and we currently ignore these and do nothing ... @@ -750,6 +751,10 @@ protected void handleQueryEvent(P partition, O offsetContext, Event event) throw } } + private String removeSetStatement(String sql) { + return sql.replaceAll(SET_STATEMENT_REGEX, "").trim(); + } + /** * Handle a change in the table metadata.

* diff --git a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorIT.java b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorIT.java index 5b55825fcae..3bc89fe417f 100644 --- a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorIT.java +++ b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorIT.java @@ -1749,7 +1749,7 @@ public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Except // Should have been an insert with query parsed. validate(sourceRecord); assertInsert(sourceRecord, "id", 110); - assertSourceQuery(sourceRecord, insertSqlStatement); + assertSourceQuery(sourceRecord, getExpectedQuery(insertSqlStatement)); } /** @@ -1805,7 +1805,7 @@ public void parseMultipleInsertStatements() throws Exception { // Should have been an insert with query parsed. validate(sourceRecord1); assertInsert(sourceRecord1, "id", 110); - assertSourceQuery(sourceRecord1, insertSqlStatement1); + assertSourceQuery(sourceRecord1, getExpectedQuery(insertSqlStatement1)); // Grab second event final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1); @@ -1813,7 +1813,7 @@ public void parseMultipleInsertStatements() throws Exception { // Should have been an insert with query parsed. validate(sourceRecord2); assertInsert(sourceRecord2, "id", 111); - assertSourceQuery(sourceRecord2, insertSqlStatement2); + assertSourceQuery(sourceRecord2, getExpectedQuery(insertSqlStatement2)); } /** @@ -1867,7 +1867,7 @@ public void parseMultipleRowInsertStatement() throws Exception { // Should have been an insert with query parsed. validate(sourceRecord1); assertInsert(sourceRecord1, "id", 110); - assertSourceQuery(sourceRecord1, insertSqlStatement); + assertSourceQuery(sourceRecord1, getExpectedQuery(insertSqlStatement)); // Grab second event final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1); @@ -1875,7 +1875,7 @@ public void parseMultipleRowInsertStatement() throws Exception { // Should have been an insert with query parsed. validate(sourceRecord2); assertInsert(sourceRecord2, "id", 111); - assertSourceQuery(sourceRecord2, insertSqlStatement); + assertSourceQuery(sourceRecord2, getExpectedQuery(insertSqlStatement)); } /** @@ -1927,7 +1927,7 @@ public void parseDeleteQuery() throws Exception { // Should have been a delete with query parsed. validate(sourceRecord); assertDelete(sourceRecord, "order_number", 10001); - assertSourceQuery(sourceRecord, deleteSqlStatement); + assertSourceQuery(sourceRecord, getExpectedQuery(deleteSqlStatement)); } /** @@ -1979,7 +1979,7 @@ public void parseMultiRowDeleteQuery() throws Exception { // Should have been a delete with query parsed. validate(sourceRecord1); assertDelete(sourceRecord1, "order_number", 10002); - assertSourceQuery(sourceRecord1, deleteSqlStatement); + assertSourceQuery(sourceRecord1, getExpectedQuery(deleteSqlStatement)); // Validate second event. final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1); @@ -1987,7 +1987,7 @@ public void parseMultiRowDeleteQuery() throws Exception { // Should have been a delete with query parsed. validate(sourceRecord2); assertDelete(sourceRecord2, "order_number", 10004); - assertSourceQuery(sourceRecord2, deleteSqlStatement); + assertSourceQuery(sourceRecord2, getExpectedQuery(deleteSqlStatement)); } /** @@ -2039,7 +2039,7 @@ public void parseUpdateQuery() throws Exception { // Should have been a delete with query parsed. validate(sourceRecord); assertUpdate(sourceRecord, "id", 109); - assertSourceQuery(sourceRecord, updateSqlStatement); + assertSourceQuery(sourceRecord, getExpectedQuery(updateSqlStatement)); } /** @@ -2091,7 +2091,7 @@ public void parseMultiRowUpdateQuery() throws Exception { // Should have been a delete with query parsed. validate(sourceRecord1); assertUpdate(sourceRecord1, "order_number", 10001); - assertSourceQuery(sourceRecord1, updateSqlStatement); + assertSourceQuery(sourceRecord1, getExpectedQuery(updateSqlStatement)); // Validate second event final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1); @@ -2099,7 +2099,7 @@ public void parseMultiRowUpdateQuery() throws Exception { // Should have been a delete with query parsed. validate(sourceRecord2); assertUpdate(sourceRecord2, "order_number", 10004); - assertSourceQuery(sourceRecord2, updateSqlStatement); + assertSourceQuery(sourceRecord2, getExpectedQuery(updateSqlStatement)); } /** @@ -2108,7 +2108,7 @@ public void parseMultiRowUpdateQuery() throws Exception { */ @Test @FixFor("DBZ-1234") - public void shouldFailToValidateAdaptivePrecisionMode() throws InterruptedException { + public void shouldFailToValidateAdaptivePrecisionMode() { config = DATABASE.defaultConfig() .with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) .with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER) @@ -2652,6 +2652,11 @@ public void shouldEmitTruncateOperation() throws Exception { stopConnector(); } + protected String getExpectedQuery(String statement) { + + return statement; + } + private static class NoTombStonesHandler implements DebeziumEngine.ChangeConsumer { protected BlockingQueue recordQueue; diff --git a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorTest.java b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorTest.java index e741dc6d1d7..da77ae2dade 100644 --- a/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorTest.java +++ b/debezium-connector-binlog/src/test/java/io/debezium/connector/binlog/BinlogConnectorTest.java @@ -19,6 +19,8 @@ public interface BinlogConnectorTest { BinlogTestConnection getTestDatabaseConnection(String databaseName); + BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout); + BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName); boolean isMariaDb(); diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/ConnectionIT.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/ConnectionIT.java index 44ccbf686a6..f8ca3af4f2b 100644 --- a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/ConnectionIT.java +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/ConnectionIT.java @@ -5,11 +5,35 @@ */ package io.debezium.connector.mariadb; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; + +import java.sql.SQLException; +import java.sql.SQLTimeoutException; + +import org.junit.Test; + import io.debezium.connector.binlog.BinlogConnectionIT; +import io.debezium.connector.binlog.util.BinlogTestConnection; +import io.debezium.connector.binlog.util.TestHelper; +import io.debezium.connector.binlog.util.UniqueDatabase; /** * @author Chris Cranford */ public class ConnectionIT extends BinlogConnectionIT implements MariaDbCommon { + @Test + public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException { + + final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("readbinlog", "readbinlog_test"); + DATABASE.createAndInitialize(); + try (BinlogTestConnection conn = getTestDatabaseConnection(DATABASE.getDatabaseName(), 1000)) { + conn.connect(); + + assertThatThrownBy(() -> conn.execute("SELECT SLEEP(10)")) + .isInstanceOf(SQLTimeoutException.class) + .hasMessageContaining("Query execution was interrupted (max_statement_time exceeded)"); + + } + } } diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbCommon.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbCommon.java index 05679e7c3e5..3809819bd2d 100644 --- a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbCommon.java +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbCommon.java @@ -33,6 +33,11 @@ default BinlogTestConnection getTestDatabaseConnection(String databaseName) { return MariaDbTestConnection.forTestDatabase(databaseName); } + @Override + default BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout) { + return MariaDbTestConnection.forTestDatabase(databaseName, queryTimeout); + } + @Override default BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName) { return MariaDbTestConnection.forTestReplicaDatabase(databaseName); diff --git a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbConnectorIT.java b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbConnectorIT.java index b4ae7ebc769..9a2f62a857b 100644 --- a/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbConnectorIT.java +++ b/debezium-connector-mariadb/src/test/java/io/debezium/connector/mariadb/MariaDbConnectorIT.java @@ -68,4 +68,10 @@ protected MariaDbOffsetContext loadOffsets(Configuration configuration, Map implements MySqlCommon { + @Test + public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException { + + final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("readbinlog", "readbinlog_test"); + DATABASE.createAndInitialize(); + try (BinlogTestConnection conn = getTestDatabaseConnection(DATABASE.getDatabaseName(), 1000)) { + conn.connect(); + + assertThatThrownBy(() -> conn.execute("SELECT SLEEP(10)")) + .isInstanceOf(MySQLTimeoutException.class) + .hasMessage("Statement cancelled due to timeout or client request"); + } + } } diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlCommon.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlCommon.java index 10b56f0a7e6..714eecd0392 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlCommon.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlCommon.java @@ -32,6 +32,11 @@ default BinlogTestConnection getTestDatabaseConnection(String databaseName) { return MySqlTestConnection.forTestDatabase(databaseName); } + @Override + default BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout) { + return MySqlTestConnection.forTestDatabase(databaseName, queryTimeout); + } + @Override default BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName) { return MySqlTestConnection.forTestReplicaDatabase(databaseName); diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java index fc293131f0e..e95a8ef3588 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/MySqlTestConnection.java @@ -54,16 +54,13 @@ public static MySqlTestConnection forTestReplicaDatabase(String databaseName) { * * * @param databaseName the name of the test database - * @param queryTimeout + * @param queryTimeout the seconds to wait for query execution * @return the MySQLConnection instance; never null */ public static MySqlTestConnection forTestDatabase(String databaseName, int queryTimeout) { - return new MySqlTestConnection(JdbcConfiguration.copy( - Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX))) - .withDatabase(databaseName) + return new MySqlTestConnection(getDefaultJdbcConfig(databaseName) .withQueryTimeoutMs(queryTimeout) - .with("characterEncoding", "utf8") .build()); }