Skip to content

Commit

Permalink
DBZ-7616 Avoid to filter out any SET STATEMENT queries and filter j…
Browse files Browse the repository at this point in the history
…ust the ones related to Amazon RDS
  • Loading branch information
mfvitale committed Apr 15, 2024
1 parent 00e2edd commit fc6566e
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 20 deletions.
Expand Up @@ -78,6 +78,7 @@ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSour
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStreamingChangeEventSource.class);

private static final String KEEPALIVE_THREAD_NAME = "blc-keepalive";
public static final String SET_STATEMENT_REGEX = "SET STATEMENT .* FOR";

private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class);
private final BinaryLogClient client;
Expand Down Expand Up @@ -492,7 +493,7 @@ protected void handleQueryEvent(MySqlPartition partition, MySqlOffsetContext off
return;
}

String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase();
String upperCasedStatementBegin = Strings.getBegin(removeSetStatement(sql), 7).toUpperCase();

if (upperCasedStatementBegin.startsWith("XA ")) {
// This is an XA transaction, and we currently ignore these and do nothing ...
Expand Down Expand Up @@ -541,6 +542,10 @@ protected void handleQueryEvent(MySqlPartition partition, MySqlOffsetContext off
}
}

private String removeSetStatement(String sql) {
return sql.replaceAll(SET_STATEMENT_REGEX, "").trim();
}

private void handleTransactionBegin(MySqlPartition partition, MySqlOffsetContext offsetContext, Event event, Long threadId) throws InterruptedException {
Instant eventTime = Conversions.toInstantFromMillis(eventTimestamp.toEpochMilli());
// We are starting a new transaction ...
Expand Down
Expand Up @@ -10,6 +10,7 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLTimeoutException;

import org.junit.Ignore;
import org.junit.Rule;
Expand Down Expand Up @@ -65,7 +66,12 @@ public void whenQueryTakesMoreThenConfiguredQueryTimeoutAnExceptionMustBeThrown(

try (MySqlTestConnection conn = MySqlTestConnection.forTestDatabase(DATABASE.getDatabaseName(), 1000)) {
conn.connect();

if (conn.isVersionCommentMariaDb()) {
assertThatThrownBy(() -> conn.execute("SELECT SLEEP(10)"))
.isInstanceOf(SQLTimeoutException.class)
.hasMessageContaining("Query execution was interrupted (max_statement_time exceeded)");
return;
}
assertThatThrownBy(() -> conn.execute("SELECT SLEEP(10)"))
.isInstanceOf(MySQLTimeoutException.class)
.hasMessage("Statement cancelled due to timeout or client request");
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.source.SourceRecord;
import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
Expand All @@ -46,6 +47,7 @@
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotMode;
import io.debezium.connector.mysql.MySqlConnectorConfig.SnapshotNewTables;
import io.debezium.connector.mysql.MySqlTestConnection.MySqlVersion;
import io.debezium.connector.mysql.junit.MySqlDatabaseVersionResolver;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.data.Envelope;
import io.debezium.doc.FixFor;
Expand Down Expand Up @@ -87,6 +89,8 @@ public class MySqlConnectorIT extends AbstractAsyncEngineConnectorTest {

private Configuration config;

private final MySqlDatabaseVersionResolver databaseVersionResolver = new MySqlDatabaseVersionResolver();

@Before
public void beforeEach() {
stopConnector();
Expand Down Expand Up @@ -1786,7 +1790,7 @@ public void shouldParseQueryIfAvailableAndConnectorOptionEnabled() throws Except
// Should have been an insert with query parsed.
validate(sourceRecord);
assertInsert(sourceRecord, "id", 110);
assertSourceQuery(sourceRecord, insertSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(insertSqlStatement));
}

/**
Expand Down Expand Up @@ -1842,15 +1846,25 @@ public void parseMultipleInsertStatements() throws Exception {
// Should have been an insert with query parsed.
validate(sourceRecord1);
assertInsert(sourceRecord1, "id", 110);
assertSourceQuery(sourceRecord1, insertSqlStatement1);
assertSourceQuery(sourceRecord1, getExpectedQuery(insertSqlStatement1));

// Grab second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);

// Should have been an insert with query parsed.
validate(sourceRecord2);
assertInsert(sourceRecord2, "id", 111);
assertSourceQuery(sourceRecord2, insertSqlStatement2);
assertSourceQuery(sourceRecord2, getExpectedQuery(insertSqlStatement2));
}

@NotNull
private String getExpectedQuery(String statement) {

if (databaseVersionResolver.isMariaDb()) {
return "SET STATEMENT max_statement_time=600 FOR " + statement;
}

return statement;
}

/**
Expand Down Expand Up @@ -1904,15 +1918,15 @@ public void parseMultipleRowInsertStatement() throws Exception {
// Should have been an insert with query parsed.
validate(sourceRecord1);
assertInsert(sourceRecord1, "id", 110);
assertSourceQuery(sourceRecord1, insertSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(insertSqlStatement));

// Grab second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);

// Should have been an insert with query parsed.
validate(sourceRecord2);
assertInsert(sourceRecord2, "id", 111);
assertSourceQuery(sourceRecord2, insertSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(insertSqlStatement));
}

/**
Expand Down Expand Up @@ -1964,7 +1978,7 @@ public void parseDeleteQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord);
assertDelete(sourceRecord, "order_number", 10001);
assertSourceQuery(sourceRecord, deleteSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(deleteSqlStatement));
}

/**
Expand Down Expand Up @@ -2016,15 +2030,15 @@ public void parseMultiRowDeleteQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord1);
assertDelete(sourceRecord1, "order_number", 10002);
assertSourceQuery(sourceRecord1, deleteSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(deleteSqlStatement));

// Validate second event.
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);

// Should have been a delete with query parsed.
validate(sourceRecord2);
assertDelete(sourceRecord2, "order_number", 10004);
assertSourceQuery(sourceRecord2, deleteSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(deleteSqlStatement));
}

/**
Expand Down Expand Up @@ -2076,7 +2090,7 @@ public void parseUpdateQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord);
assertUpdate(sourceRecord, "id", 109);
assertSourceQuery(sourceRecord, updateSqlStatement);
assertSourceQuery(sourceRecord, getExpectedQuery(updateSqlStatement));
}

/**
Expand Down Expand Up @@ -2128,15 +2142,15 @@ public void parseMultiRowUpdateQuery() throws Exception {
// Should have been a delete with query parsed.
validate(sourceRecord1);
assertUpdate(sourceRecord1, "order_number", 10001);
assertSourceQuery(sourceRecord1, updateSqlStatement);
assertSourceQuery(sourceRecord1, getExpectedQuery(updateSqlStatement));

// Validate second event
final SourceRecord sourceRecord2 = records.recordsForTopic(DATABASE.topicForTable(tableName)).get(1);

// Should have been a delete with query parsed.
validate(sourceRecord2);
assertUpdate(sourceRecord2, "order_number", 10004);
assertSourceQuery(sourceRecord2, updateSqlStatement);
assertSourceQuery(sourceRecord2, getExpectedQuery(updateSqlStatement));
}

/**
Expand Down
Expand Up @@ -14,6 +14,7 @@
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.Statement;
import java.time.Duration;

import org.apache.kafka.connect.errors.RetriableException;
import org.junit.Before;
Expand All @@ -32,6 +33,7 @@ public class OracleConnectionTest {
public void setUp() throws Exception {

jdbcConfiguration = mock(JdbcConfiguration.class);
when(jdbcConfiguration.getQueryTimeout()).thenReturn(Duration.ZERO);
connectionFactory = mock(JdbcConnection.ConnectionFactory.class);
Connection connection = mock(Connection.class);
statement = mock(Statement.class);
Expand Down
Expand Up @@ -79,16 +79,15 @@ public interface SchemaHistory {
.withDefault(
"DROP TEMPORARY TABLE IF EXISTS .+ /\\* generated by server \\*/," +
// Filter out RDS heartbeat statements, see DBZ-469 / DBZ-1492 / DBZ-2275 / DBZ-6864
"INSERT INTO (mysql\\.)?rds_heartbeat2\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*," +
"DELETE FROM (mysql\\.)?rds_sysinfo.*," +
"INSERT INTO (mysql\\.)?rds_sysinfo\\(.*\\) values \\(.*\\)," +
"INSERT INTO (mysql\\.)?rds_monitor\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*," +
"INSERT INTO (mysql\\.)?rds_monitor\\(.*\\) values \\(.*\\)," +
"DELETE FROM (mysql\\.)?rds_monitor.*," +
"(SET STATEMENT .*)?INSERT INTO (mysql\\.)?rds_heartbeat2\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*," +
"(SET STATEMENT .*)?DELETE FROM (mysql\\.)?rds_sysinfo.*," +
"(SET STATEMENT .*)?INSERT INTO (mysql\\.)?rds_sysinfo\\(.*\\) values \\(.*\\)," +
"(SET STATEMENT .*)?INSERT INTO (mysql\\.)?rds_monitor\\(.*\\) values \\(.*\\) ON DUPLICATE KEY UPDATE value = .*," +
"(SET STATEMENT .*)?INSERT INTO (mysql\\.)?rds_monitor\\(.*\\) values \\(.*\\)," +
"(SET STATEMENT .*)?DELETE FROM (mysql\\.)?rds_monitor.*," +
"FLUSH RELAY LOGS.*," +
"flush relay logs.*," +
"SAVEPOINT .*," +
"SET STATEMENT .*," +
// Filter out the comment start with "# Dummy event" according https://jira.mariadb.org/browse/MDEV-225
"^\\s*#\\s*Dummy event.*")
.withWidth(Width.LONG)
Expand Down

0 comments on commit fc6566e

Please sign in to comment.