Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-4782 Flush SCN values upon schema changes #3489

Merged
merged 2 commits into from
May 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -557,10 +557,40 @@ protected void handleSchemaChange(LogMinerEventRow row) throws InterruptedExcept
return;
}

final Scn commitScn = offsetContext.getCommitScn();
if (commitScn != null && commitScn.compareTo(row.getScn()) >= 0) {
LOGGER.trace("DDL: SQL '{}' skipped with {} (SCN) <= {} (commit SCN)", row.getRedoSql(), row.getScn(), commitScn);
return;
}

LOGGER.trace("DDL: '{}' {}", row.getRedoSql(), row);
if (row.getTableName() != null) {
counters.ddlCount++;
final TableId tableId = row.getTableId();

final int activeTransactions = getTransactionCache().size();
boolean advanceLowerScnBoundary = false;
if (activeTransactions == 0) {
// The DDL isn't wrapped in a transaction, fast-forward the lower boundary
advanceLowerScnBoundary = true;
}
else if (activeTransactions == 1) {
final String transactionId = getTransactionCache().keySet().iterator().next();
if (transactionId.equals(row.getTransactionId())) {
// The row's transaction is the current and only active transaction.
advanceLowerScnBoundary = true;
}
}

if (advanceLowerScnBoundary) {
LOGGER.debug("Schema change advanced offset SCN to {}", row.getScn());
offsetContext.setScn(row.getScn());
}

// Should always advance the commit SCN point with schema changes
LOGGER.debug("Schema change advanced offset commit SCN to {}", row.getScn());
offsetContext.setCommitScn(row.getScn());

dispatcher.dispatchSchemaChangeEvent(partition,
tableId,
new OracleSchemaChangeEventEmitter(
Expand Down
Expand Up @@ -1197,6 +1197,195 @@ public void shouldParseSchemaChangeOnTableWithRawDataType() throws Exception {
}
}

@Test
@FixFor("DBZ-4782")
public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestart() throws Exception {
TestHelper.dropTable(connection, "dbz4782");
try {
connection.execute("CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))");
TestHelper.streamTable(connection, "dbz4782");

Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();

start(OracleConnector.class, config);
assertConnectorIsRunning();

waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);

connection.execute("ALTER TABLE dbz4782 add data2 varchar2(50)");

// CREATE, ALTER
SourceRecords sourceRecords = consumeRecordsByTopic(2);
List<SourceRecord> records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME);
assertThat(records).hasSize(2);

assertSnapshotSchemaChange(records.get(0));
List<Struct> tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782");

assertStreamingSchemaChange(records.get(1));
tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782");

// Stop the connector
stopConnector();

// Restart connector and verify that we do not re-emit the ALTER table
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);

// Wait for 20 seconds and assert that there are no available records
waitForAvailableRecords(20, TimeUnit.SECONDS);
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4782");
}
}

@Test
@FixFor("DBZ-4782")
public void shouldNotResendSchemaChangeIfLastEventReadBeforeRestartWithFollowupDml() throws Exception {
TestHelper.dropTable(connection, "dbz4782");
try {
createTable("dbz4782", "CREATE TABLE dbz4782 (id numeric(9,0) primary key, data varchar2(50))");

Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();

start(OracleConnector.class, config);
assertConnectorIsRunning();

waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);

connection.execute("ALTER TABLE dbz4782 add data2 varchar2(50)");

// CREATE, ALTER
SourceRecords sourceRecords = consumeRecordsByTopic(2);
List<SourceRecord> records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME);
assertThat(records).hasSize(2);

assertSnapshotSchemaChange(records.get(0));
List<Struct> tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782");

assertStreamingSchemaChange(records.get(1));
tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782");

// Stop the connector
stopConnector();

// Restart connector and verify that we do not re-emit the ALTER table
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);

connection.execute("INSERT INTO dbz4782 values (1, 'data1', 'data2')");

sourceRecords = consumeRecordsByTopic(1);
records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782"));
assertThat(records).hasSize(1);
VerifyRecord.isValidInsert(records.get(0), "ID", 1);

// There should be no other records to consume
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4782");
}
}

@Test
@FixFor("DBZ-4782")
public void shouldNotResendSchemaChangeWithInprogressTransactionOnSecondTable() throws Exception {
TestHelper.dropTable(connection, "dbz4782a");
TestHelper.dropTable(connection, "dbz4782b");
try {
createTable("dbz4782a", "CREATE TABLE dbz4782a (id numeric(9,0) primary key, data varchar2(50))");
createTable("dbz4782b", "CREATE TABLE dbz4782b (id numeric(9,0) primary key, data varchar2(50))");

Configuration config = TestHelper.defaultConfig()
.with(OracleConnectorConfig.TABLE_INCLUDE_LIST, "DEBEZIUM\\.DBZ4782[A|B]")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.build();

start(OracleConnector.class, config);
assertConnectorIsRunning();

waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);

// Start in-progress transaction for dbz4728b
try (OracleConnection connection2 = TestHelper.testConnection()) {

// Perform in-progress operation on second connection & alter the other table in primary connection
connection2.executeWithoutCommitting("INSERT INTO dbz4782b values (2, 'connection2')");
connection.execute("ALTER TABLE dbz4782a add data2 varchar2(50)");

// CREATEx2, ALTER (INSERT isn't here yet, its in progress)
SourceRecords sourceRecords = consumeRecordsByTopic(3);
List<SourceRecord> records = sourceRecords.recordsForTopic(TestHelper.SERVER_NAME);
assertThat(records).hasSize(3);

assertSnapshotSchemaChange(records.get(0));
List<Struct> tableChanges = ((Struct) records.get(0).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782A");

assertSnapshotSchemaChange(records.get(1));
tableChanges = ((Struct) records.get(1).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "CREATE", "DEBEZIUM", "DBZ4782B");

assertStreamingSchemaChange(records.get(2));
tableChanges = ((Struct) records.get(2).value()).getArray("tableChanges");
assertThat(tableChanges).hasSize(1);
assertTableChange(tableChanges.get(0), "ALTER", "DEBEZIUM", "DBZ4782A");

// Stop the connector
stopConnector();

// Now commit the in-progress transaction while connector is down
connection2.commit();

// Restart the connector and verify we don't re-emit the ALTER table; however that we do
// capture the in-progress transaction correctly when it is committed.
start(OracleConnector.class, config);
assertConnectorIsRunning();
waitForStreamingRunning(TestHelper.CONNECTOR_NAME, TestHelper.SERVER_NAME);

connection.execute("INSERT INTO dbz4782a values (1, 'data1', 'data2')");

sourceRecords = consumeRecordsByTopic(2);
sourceRecords.allRecordsInOrder().forEach(System.out::println);
records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782A"));
assertThat(records).hasSize(1);
VerifyRecord.isValidInsert(records.get(0), "ID", 1);

records = sourceRecords.recordsForTopic(topicName("DEBEZIUM", "DBZ4782B"));
assertThat(records).hasSize(1);
VerifyRecord.isValidInsert(records.get(0), "ID", 2);
}

// There should be no other records to consume
assertNoRecordsToConsume();
}
finally {
TestHelper.dropTable(connection, "dbz4782a");
TestHelper.dropTable(connection, "dbz4782b");
}
}

private static String getTableIdString(String schemaName, String tableName) {
return new TableId(TestHelper.getDatabaseName(), schemaName, tableName).toDoubleQuotedString();
}
Expand Down