Skip to content

Commit

Permalink
DBZ-6828 Process drop table events during blocking snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
mfvitale authored and jpechane committed Sep 20, 2023
1 parent ae199d2 commit 68eb4b2
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 19 deletions.
Expand Up @@ -309,19 +309,34 @@ private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset
((TableAlteredEvent) event).previousTableId());
}
else {
Table table = getTable(tableId, type);
schemaChangeEvent = SchemaChangeEvent.of(
type,
partition,
offset,
sanitizedDbName,
null,
event.statement(),
tableId != null ? tables().forTable(tableId) : null,
table,
snapshot);
}
schemaChangeEvents.add(schemaChangeEvent);
}

private Table getTable(TableId tableId, SchemaChangeEventType type) {

if (tableId == null) {
return null;
}

if (SchemaChangeEventType.DROP == type) {
// DROP events don't have information about tableChanges, so we are creating a Table object with just the tableId to be use
// during blocking snapshot to filter out drop events not related to table to be snapshotted.
return Table.editor().tableId(tableId).create();
}
return tables().forTable(tableId);
}

private boolean acceptableDatabase(final String databaseName) {
return !storeOnlyCapturedTables()
|| filters.databaseFilter().test(databaseName)
Expand Down
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.connector.mysql;

import static java.util.function.Predicate.not;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -319,8 +321,13 @@ else if (!connectorConfig.getSnapshotMode().shouldStream()) {

private void addSchemaEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
String database, String ddl) {
schemaEvents.addAll(databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database,
snapshotContext.offset, clock.currentTimeAsInstant()));

List<SchemaChangeEvent> schemaChangeEvents = databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database,
snapshotContext.offset, clock.currentTimeAsInstant());
List<SchemaChangeEvent> missingSchemaChangeEvents = schemaChangeEvents.stream()
.filter(not(schemaEvents::contains))
.collect(Collectors.toList());
schemaEvents.addAll(missingSchemaChangeEvents);
}

@Override
Expand Down
Expand Up @@ -11,11 +11,13 @@
import java.sql.SQLException;
import java.util.List;

import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.junit.MySqlDatabaseVersionResolver;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
import io.debezium.relational.TableId;
Expand All @@ -25,7 +27,9 @@
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {

protected static final String SERVER_NAME = "is_test";
public static final int MYSQL8 = 8;
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot_test", "1", null).withDbHistoryPath(SCHEMA_HISTORY_PATH);
private final MySqlDatabaseVersionResolver databaseVersionResolver = new MySqlDatabaseVersionResolver();

@Before
public void before() throws SQLException {
Expand Down Expand Up @@ -71,14 +75,21 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
.with(MySqlConnectorConfig.USER, "mysqluser")
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL.getValue())
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal"))
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl)
.with(MySqlConnectorConfig.SNAPSHOT_MODE_TABLES, DATABASE.qualifiedTableName("a"))
.with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
}

@Override
protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {

return mutableConfig(signalTableOnly, storeOnlyCapturedDdl)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true);
}

@Override
protected String connector() {
return "mysql";
Expand Down Expand Up @@ -158,11 +169,24 @@ protected int expectedDdlsCount() {
protected void assertDdl(List<String> schemaChangesDdls) {

assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 2)).isEqualTo("DROP TABLE IF EXISTS `blocking_snapshot_test_1`.`b`");
assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo("\"CREATE TABLE `b` (\n" +
" `pk` int NOT NULL AUTO_INCREMENT,\n" +
" `aa` int DEFAULT NULL,\n" +

assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo(getDdlString(databaseVersionResolver));

}

@NotNull
private static String getDdlString(MySqlDatabaseVersionResolver databaseVersionResolver) {

return databaseVersionResolver.getVersion().getMajor() < MYSQL8 ? "CREATE TABLE `b` (\n" +
" `pk` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `aa` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`pk`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci\"");
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=latin1"

: "CREATE TABLE `b` (\n" +
" `pk` int NOT NULL AUTO_INCREMENT,\n" +
" `aa` int DEFAULT NULL,\n" +
" PRIMARY KEY (`pk`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci";
}
}
Expand Up @@ -127,15 +127,21 @@ protected Configuration.Builder config() {
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL")
.with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM")
.with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, TestHelper.getDatabaseName() + ".DEBEZIUM.A")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false);
}

@Override
protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
return config();
}

@Override
protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
return config()
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
}

@Override
protected String valueFieldName() {
return "AA";
Expand Down
Expand Up @@ -151,7 +151,7 @@ public SnapshotResult<O> doExecute(ChangeEventSourceContext context, O previousO
if (snapshottingTask.snapshotSchema()) {
LOGGER.info("Snapshot step 6 - Persisting schema history");

createSchemaChangeEventsForTables(context, ctx, snapshottingTask); // TODO check
createSchemaChangeEventsForTables(context, ctx, snapshottingTask);

// if we've been interrupted before, the TX rollback will cause any locks to be released
releaseSchemaSnapshotLocks(ctx);
Expand Down
Expand Up @@ -125,6 +125,24 @@ public TableChanges getTableChanges() {
return tableChanges;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SchemaChangeEvent that = (SchemaChangeEvent) o;
return Objects.equals(database, that.database) && Objects.equals(schema, that.schema) && Objects.equals(ddl,
that.ddl) && type == that.type;
}

@Override
public int hashCode() {
return Objects.hash(database, schema, ddl, type);
}

@Override
public String toString() {
return "SchemaChangeEvent [database=" + database + ", schema=" + schema + ", ddl=" + ddl + ", tables=" + tables
Expand Down
Expand Up @@ -40,7 +40,6 @@
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource;
import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest;
import io.debezium.util.Testing;

public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest {
private int signalingRecords;
Expand All @@ -63,6 +62,10 @@ public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest
@Override
protected abstract String server();

protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
return mutableConfig(signalTableOnly, storeOnlyCapturedDdl);
}

@Test
public void executeBlockingSnapshot() throws Exception {
// Testing.Print.enable();
Expand All @@ -75,7 +78,8 @@ public void executeBlockingSnapshot() throws Exception {

insertRecords(ROW_COUNT, ROW_COUNT);

assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2, consumeRecordsByTopic(ROW_COUNT * 2, 10));
SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(ROW_COUNT * 2, 10);
assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2, consumedRecordsByTopic);

sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId());

Expand Down Expand Up @@ -119,8 +123,9 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception {

signalingRecords = 1; // from streaming

SourceRecords consumeRecordsByTopic = consumeRecordsByTopic((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), 10);
assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords),
getExpectedValues(totalSnapshotRecords), topicName(), consumeRecordsByTopic((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), 10));
getExpectedValues(totalSnapshotRecords), topicName(), consumeRecordsByTopic);
}

@Test
Expand All @@ -141,8 +146,9 @@ public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {

signalingRecords = 1; // from streaming

SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(500 + signalingRecords, 10);
assertRecordsWithValuesPresent(500, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1).toString(),
consumeRecordsByTopic(500 + signalingRecords, 10));
consumedRecordsByTopic);

}

Expand All @@ -151,11 +157,11 @@ public void executeBlockingSnapshotWithAdditionalCondition() throws Exception {
@SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.SQL_SERVER)
@SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.DB2)
public void readsSchemaOnlyForSignaledTables() throws Exception {
Testing.Print.enable();
// Testing.Print.enable();

populateTable(tableNames().get(1).toString());

startConnectorWithSnapshot(x -> mutableConfig(false, false));
startConnectorWithSnapshot(x -> historizedMutableConfig(false, false));

waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task());

Expand All @@ -176,8 +182,6 @@ public void readsSchemaOnlyForSignaledTables() throws Exception {
.map(sourceRecord -> ((Struct) sourceRecord.value()).getString("ddl"))
.collect(Collectors.toList());

Testing.print(ddls);

assertDdl(ddls);
}

Expand Down

0 comments on commit 68eb4b2

Please sign in to comment.