Skip to content

Commit

Permalink
DBZ-7616 Align query timeout changes to MariaDB connector
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale committed Apr 18, 2024
1 parent 9183855 commit 828960f
Show file tree
Hide file tree
Showing 12 changed files with 108 additions and 20 deletions.
Expand Up @@ -664,6 +664,7 @@ public interface SnapshotLockingStrategy {
PORT,
USER,
PASSWORD,
QUERY_TIMEOUT_MS,
ON_CONNECT_STATEMENTS,
SERVER_ID,
SERVER_ID_OFFSET,
Expand Down
Expand Up @@ -106,6 +106,7 @@ public abstract class BinlogStreamingChangeEventSource<P extends BinlogPartition
private static final Logger LOGGER = LoggerFactory.getLogger(BinlogStreamingChangeEventSource.class);

private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
private static final String SET_STATEMENT_REGEX = "SET STATEMENT .* FOR";

private final BinaryLogClient client;
private final BinlogStreamingChangeEventSourceMetrics<?, P> metrics;
Expand Down Expand Up @@ -701,7 +702,7 @@ protected void handleQueryEvent(P partition, O offsetContext, Event event) throw
return;
}

String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
String upperCasedStatementBegin = Strings.getBegin(removeSetStatement(sql), 7).toUpperCase();

if (upperCasedStatementBegin.startsWith("XA ")) {
// This is an XA transaction, and we currently ignore these and do nothing ...
Expand Down Expand Up @@ -750,6 +751,10 @@ protected void handleQueryEvent(P partition, O offsetContext, Event event) throw
}
}

private String removeSetStatement(String sql) {
return sql.replaceAll(SET_STATEMENT_REGEX, "").trim();
}

/**
* Handle a change in the table metadata.<p></p>
*
Expand Down
Expand Up @@ -1749,7 +1749,7 @@ public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Except
// Should have been an insert with query parsed.
validate(sourceRecord);
assertInsert(sourceRecord, "id", 110);
assertSourceQuery(sourceRecord, insertSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(insertSqlStatement));
}

/**
Expand Down Expand Up @@ -1805,15 +1805,15 @@ public void parseMultipleInsertStatements() throws Exception {
// Should have been an insert with query parsed.
validate(sourceRecord1);
assertInsert(sourceRecord1, "id", 110);
assertSourceQuery(sourceRecord1, insertSqlStatement1);
assertSourceQuery(sourceRecord1, getExpectedQuery(insertSqlStatement1));

// Grab second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);

// Should have been an insert with query parsed.
validate(sourceRecord2);
assertInsert(sourceRecord2, "id", 111);
assertSourceQuery(sourceRecord2, insertSqlStatement2);
assertSourceQuery(sourceRecord2, getExpectedQuery(insertSqlStatement2));
}

/**
Expand Down Expand Up @@ -1867,15 +1867,15 @@ public void parseMultipleRowInsertStatement() throws Exception {
// Should have been an insert with query parsed.
validate(sourceRecord1);
assertInsert(sourceRecord1, "id", 110);
assertSourceQuery(sourceRecord1, insertSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(insertSqlStatement));

// Grab second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);

// Should have been an insert with query parsed.
validate(sourceRecord2);
assertInsert(sourceRecord2, "id", 111);
assertSourceQuery(sourceRecord2, insertSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(insertSqlStatement));
}

/**
Expand Down Expand Up @@ -1927,7 +1927,7 @@ public void parseDeleteQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord);
assertDelete(sourceRecord, "order_number", 10001);
assertSourceQuery(sourceRecord, deleteSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(deleteSqlStatement));
}

/**
Expand Down Expand Up @@ -1979,15 +1979,15 @@ public void parseMultiRowDeleteQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord1);
assertDelete(sourceRecord1, "order_number", 10002);
assertSourceQuery(sourceRecord1, deleteSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(deleteSqlStatement));

// Validate second event.
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);

// Should have been a delete with query parsed.
validate(sourceRecord2);
assertDelete(sourceRecord2, "order_number", 10004);
assertSourceQuery(sourceRecord2, deleteSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(deleteSqlStatement));
}

/**
Expand Down Expand Up @@ -2039,7 +2039,7 @@ public void parseUpdateQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord);
assertUpdate(sourceRecord, "id", 109);
assertSourceQuery(sourceRecord, updateSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(updateSqlStatement));
}

/**
Expand Down Expand Up @@ -2091,15 +2091,15 @@ public void parseMultiRowUpdateQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord1);
assertUpdate(sourceRecord1, "order_number", 10001);
assertSourceQuery(sourceRecord1, updateSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(updateSqlStatement));

// Validate second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);

// Should have been a delete with query parsed.
validate(sourceRecord2);
assertUpdate(sourceRecord2, "order_number", 10004);
assertSourceQuery(sourceRecord2, updateSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(updateSqlStatement));
}

/**
Expand All @@ -2108,7 +2108,7 @@ public void parseMultiRowUpdateQuery() throws Exception {
*/
@Test
@FixFor("DBZ-1234")
public void shouldFailToValidateAdaptivePrecisionMode() throws InterruptedException {
public void shouldFailToValidateAdaptivePrecisionMode() {
config = DATABASE.defaultConfig()
.with(BinlogConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(BinlogConnectorConfig.SNAPSHOT_MODE, BinlogConnectorConfig.SnapshotMode.NEVER)
Expand Down Expand Up @@ -2652,6 +2652,11 @@ public void shouldEmitTruncateOperation() throws Exception {
stopConnector();
}

protected String getExpectedQuery(String statement) {

return statement;
}

private static class NoTombStonesHandler implements DebeziumEngine.ChangeConsumer<SourceRecord> {
protected BlockingQueue<SourceRecord> recordQueue;

Expand Down
Expand Up @@ -19,6 +19,8 @@ public interface BinlogConnectorTest<C extends SourceConnector> {

BinlogTestConnection getTestDatabaseConnection(String databaseName);

BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout);

BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName);

boolean isMariaDb();
Expand Down
Expand Up @@ -5,11 +5,35 @@
*/
package io.debezium.connector.mariadb;

import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

import java.sql.SQLException;
import java.sql.SQLTimeoutException;

import org.junit.Test;

import io.debezium.connector.binlog.BinlogConnectionIT;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;

/**
* @author Chris Cranford
*/
public class ConnectionIT extends BinlogConnectionIT<MariaDbConnector> implements MariaDbCommon {

@Test
public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException {

final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("readbinlog", "readbinlog_test");
DATABASE.createAndInitialize();
try (BinlogTestConnection conn = getTestDatabaseConnection(DATABASE.getDatabaseName(), 1000)) {
conn.connect();

assertThatThrownBy(() -> conn.execute("SELECT SLEEP(10)"))
.isInstanceOf(SQLTimeoutException.class)
.hasMessageContaining("Query execution was interrupted (max_statement_time exceeded)");

}
}
}
Expand Up @@ -33,6 +33,11 @@ default BinlogTestConnection getTestDatabaseConnection(String databaseName) {
return MariaDbTestConnection.forTestDatabase(databaseName);
}

@Override
default BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout) {
return MariaDbTestConnection.forTestDatabase(databaseName, queryTimeout);
}

@Override
default BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName) {
return MariaDbTestConnection.forTestReplicaDatabase(databaseName);
Expand Down
Expand Up @@ -68,4 +68,10 @@ protected MariaDbOffsetContext loadOffsets(Configuration configuration, Map<Stri
protected void assertBinlogPosition(long offsetPosition, long beforeInsertsPosition) {
assertThat(offsetPosition).isGreaterThanOrEqualTo(beforeInsertsPosition);
}

@Override
protected String getExpectedQuery(String statement) {

return "SET STATEMENT max_statement_time=600 FOR " + statement;
}
}
Expand Up @@ -88,6 +88,21 @@ public static MariaDbTestConnection forTestDatabase(String databaseName) {
return new MariaDbTestConnection(getDefaultJdbcConfig(databaseName).build());
}

/**
* Obtain a connection instance to the named test database.
*
*
* @param databaseName the name of the test database
* @param queryTimeout the seconds to wait for query execution
* @return the connection instance; never null
*/

public static MariaDbTestConnection forTestDatabase(String databaseName, int queryTimeout) {
return new MariaDbTestConnection(getDefaultJdbcConfig(databaseName)
.withQueryTimeoutMs(queryTimeout)
.build());
}

/**
* Obtain a connection instance to the named test database.
*
Expand Down
Expand Up @@ -208,8 +208,7 @@ public static SnapshotLockingMode parse(String value, String defaultValue) {
BinlogConnectorConfig.GTID_SOURCE_EXCLUDES)
.type(
JDBC_DRIVER,
JDBC_PROTOCOL,
QUERY_TIMEOUT_MS,)
JDBC_PROTOCOL)
.connector(SNAPSHOT_LOCKING_MODE)
.events(
GTID_SOURCE_INCLUDES,
Expand Down
Expand Up @@ -5,8 +5,32 @@
*/
package io.debezium.connector.mysql;

import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

import java.sql.SQLException;

import org.junit.Test;

import com.mysql.cj.jdbc.exceptions.MySQLTimeoutException;

import io.debezium.connector.binlog.BinlogConnectionIT;
import io.debezium.connector.binlog.util.BinlogTestConnection;
import io.debezium.connector.binlog.util.TestHelper;
import io.debezium.connector.binlog.util.UniqueDatabase;

public class ConnectionIT extends BinlogConnectionIT<MySqlConnector> implements MySqlCommon {

@Test
public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown() throws SQLException {

final UniqueDatabase DATABASE = TestHelper.getUniqueDatabase("readbinlog", "readbinlog_test");
DATABASE.createAndInitialize();
try (BinlogTestConnection conn = getTestDatabaseConnection(DATABASE.getDatabaseName(), 1000)) {
conn.connect();

assertThatThrownBy(() -> conn.execute("SELECT SLEEP(10)"))
.isInstanceOf(MySQLTimeoutException.class)
.hasMessage("Statement cancelled due to timeout or client request");
}
}
}
Expand Up @@ -32,6 +32,11 @@ default BinlogTestConnection getTestDatabaseConnection(String databaseName) {
return MySqlTestConnection.forTestDatabase(databaseName);
}

@Override
default BinlogTestConnection getTestDatabaseConnection(String databaseName, int queryTimeout) {
return MySqlTestConnection.forTestDatabase(databaseName, queryTimeout);
}

@Override
default BinlogTestConnection getTestReplicaDatabaseConnection(String databaseName) {
return MySqlTestConnection.forTestReplicaDatabase(databaseName);
Expand Down
Expand Up @@ -54,16 +54,13 @@ public static MySqlTestConnection forTestReplicaDatabase(String databaseName) {
*
*
* @param databaseName the name of the test database
* @param queryTimeout
* @param queryTimeout the seconds to wait for query execution
* @return the MySQLConnection instance; never null
*/

public static MySqlTestConnection forTestDatabase(String databaseName, int queryTimeout) {
return new MySqlTestConnection(JdbcConfiguration.copy(
Configuration.fromSystemProperties(DATABASE_CONFIG_PREFIX).merge(Configuration.fromSystemProperties(DRIVER_CONFIG_PREFIX)))
.withDatabase(databaseName)
return new MySqlTestConnection(getDefaultJdbcConfig(databaseName)
.withQueryTimeoutMs(queryTimeout)
.with("characterEncoding", "utf8")
.build());
}

Expand Down

0 comments on commit 828960f

Please sign in to comment.