From 68eb4b2df2afc3d62e2a0c171a55fed431dec05a Mon Sep 17 00:00:00 2001 From: mfvitale Date: Mon, 18 Sep 2023 16:03:37 +0200 Subject: [PATCH] DBZ-6828 Process drop table events during blocking snapshot --- .../connector/mysql/MySqlDatabaseSchema.java | 17 +++++++++- .../mysql/MySqlSnapshotChangeEventSource.java | 11 ++++-- .../connector/mysql/BlockingSnapshotIT.java | 34 ++++++++++++++++--- .../connector/oracle/BlockingSnapshotIT.java | 10 ++++-- .../RelationalSnapshotChangeEventSource.java | 2 +- .../io/debezium/schema/SchemaChangeEvent.java | 18 ++++++++++ .../AbstractBlockingSnapshotTest.java | 20 ++++++----- 7 files changed, 93 insertions(+), 19 deletions(-) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java index 80d3bfa104f..1f4ae1a5408 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlDatabaseSchema.java @@ -309,6 +309,7 @@ private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset ((TableAlteredEvent) event).previousTableId()); } else { + Table table = getTable(tableId, type); schemaChangeEvent = SchemaChangeEvent.of( type, partition, @@ -316,12 +317,26 @@ private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset sanitizedDbName, null, event.statement(), - tableId != null ? tables().forTable(tableId) : null, + table, snapshot); } schemaChangeEvents.add(schemaChangeEvent); } + private Table getTable(TableId tableId, SchemaChangeEventType type) { + + if (tableId == null) { + return null; + } + + if (SchemaChangeEventType.DROP == type) { + // DROP events don't have information about tableChanges, so we are creating a Table object with just the tableId to be use + // during blocking snapshot to filter out drop events not related to table to be snapshotted. + return Table.editor().tableId(tableId).create(); + } + return tables().forTable(tableId); + } + private boolean acceptableDatabase(final String databaseName) { return !storeOnlyCapturedTables() || filters.databaseFilter().test(databaseName) diff --git a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java index 235212a940f..430bfcf0498 100644 --- a/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java +++ b/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java @@ -5,6 +5,8 @@ */ package io.debezium.connector.mysql; +import static java.util.function.Predicate.not; + import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; @@ -319,8 +321,13 @@ else if (!connectorConfig.getSnapshotMode().shouldStream()) { private void addSchemaEvent(RelationalSnapshotContext snapshotContext, String database, String ddl) { - schemaEvents.addAll(databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database, - snapshotContext.offset, clock.currentTimeAsInstant())); + + List schemaChangeEvents = databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database, + snapshotContext.offset, clock.currentTimeAsInstant()); + List missingSchemaChangeEvents = schemaChangeEvents.stream() + .filter(not(schemaEvents::contains)) + .collect(Collectors.toList()); + schemaEvents.addAll(missingSchemaChangeEvents); } @Override diff --git a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java index c0f4c4bcac0..e4f161bca07 100644 --- a/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java +++ b/debezium-connector-mysql/src/test/java/io/debezium/connector/mysql/BlockingSnapshotIT.java @@ -11,11 +11,13 @@ import java.sql.SQLException; import java.util.List; +import org.jetbrains.annotations.NotNull; import org.junit.After; import org.junit.Before; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.connector.mysql.junit.MySqlDatabaseVersionResolver; import io.debezium.jdbc.JdbcConnection; import io.debezium.pipeline.AbstractBlockingSnapshotTest; import io.debezium.relational.TableId; @@ -25,7 +27,9 @@ public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest { protected static final String SERVER_NAME = "is_test"; + public static final int MYSQL8 = 8; protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot_test", "1", null).withDbHistoryPath(SCHEMA_HISTORY_PATH); + private final MySqlDatabaseVersionResolver databaseVersionResolver = new MySqlDatabaseVersionResolver(); @Before public void before() throws SQLException { @@ -71,7 +75,7 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s .with(MySqlConnectorConfig.USER, "mysqluser") .with(MySqlConnectorConfig.PASSWORD, "mysqlpw") .with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL.getValue()) - .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false) .with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal")) .with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5) .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl) @@ -79,6 +83,13 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s .with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO); } + @Override + protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) { + + return mutableConfig(signalTableOnly, storeOnlyCapturedDdl) + .with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true); + } + @Override protected String connector() { return "mysql"; @@ -158,11 +169,24 @@ protected int expectedDdlsCount() { protected void assertDdl(List schemaChangesDdls) { assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 2)).isEqualTo("DROP TABLE IF EXISTS `blocking_snapshot_test_1`.`b`"); - assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo("\"CREATE TABLE `b` (\n" + - " `pk` int NOT NULL AUTO_INCREMENT,\n" + - " `aa` int DEFAULT NULL,\n" + + + assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo(getDdlString(databaseVersionResolver)); + + } + + @NotNull + private static String getDdlString(MySqlDatabaseVersionResolver databaseVersionResolver) { + + return databaseVersionResolver.getVersion().getMajor() < MYSQL8 ? "CREATE TABLE `b` (\n" + + " `pk` int(11) NOT NULL AUTO_INCREMENT,\n" + + " `aa` int(11) DEFAULT NULL,\n" + " PRIMARY KEY (`pk`)\n" + - ") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci\""); + ") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=latin1" + : "CREATE TABLE `b` (\n" + + " `pk` int NOT NULL AUTO_INCREMENT,\n" + + " `aa` int DEFAULT NULL,\n" + + " PRIMARY KEY (`pk`)\n" + + ") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci"; } } diff --git a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java index d0b0f50cde6..818d290f428 100644 --- a/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java +++ b/debezium-connector-oracle/src/test/java/io/debezium/connector/oracle/BlockingSnapshotIT.java @@ -127,8 +127,7 @@ protected Configuration.Builder config() { .with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL") .with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM") .with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, TestHelper.getDatabaseName() + ".DEBEZIUM.A") - .with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) - .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true); + .with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, false); } @Override @@ -136,6 +135,13 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s return config(); } + @Override + protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) { + return config() + .with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true) + .with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true); + } + @Override protected String valueFieldName() { return "AA"; diff --git a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java index 72ea133fce6..eb64a57fa8f 100644 --- a/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java +++ b/debezium-core/src/main/java/io/debezium/relational/RelationalSnapshotChangeEventSource.java @@ -151,7 +151,7 @@ public SnapshotResult doExecute(ChangeEventSourceContext context, O previousO if (snapshottingTask.snapshotSchema()) { LOGGER.info("Snapshot step 6 - Persisting schema history"); - createSchemaChangeEventsForTables(context, ctx, snapshottingTask); // TODO check + createSchemaChangeEventsForTables(context, ctx, snapshottingTask); // if we've been interrupted before, the TX rollback will cause any locks to be released releaseSchemaSnapshotLocks(ctx); diff --git a/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java b/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java index a7b4064687a..dc822b4d855 100644 --- a/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java +++ b/debezium-core/src/main/java/io/debezium/schema/SchemaChangeEvent.java @@ -125,6 +125,24 @@ public TableChanges getTableChanges() { return tableChanges; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SchemaChangeEvent that = (SchemaChangeEvent) o; + return Objects.equals(database, that.database) && Objects.equals(schema, that.schema) && Objects.equals(ddl, + that.ddl) && type == that.type; + } + + @Override + public int hashCode() { + return Objects.hash(database, schema, ddl, type); + } + @Override public String toString() { return "SchemaChangeEvent [database=" + database + ", schema=" + schema + ", ddl=" + ddl + ", tables=" + tables diff --git a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java index ab382b193eb..7f4bffaa072 100644 --- a/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java +++ b/debezium-embedded/src/test/java/io/debezium/pipeline/AbstractBlockingSnapshotTest.java @@ -40,7 +40,6 @@ import io.debezium.junit.logging.LogInterceptor; import io.debezium.pipeline.source.AbstractSnapshotChangeEventSource; import io.debezium.pipeline.source.snapshot.incremental.AbstractSnapshotTest; -import io.debezium.util.Testing; public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest { private int signalingRecords; @@ -63,6 +62,10 @@ public abstract class AbstractBlockingSnapshotTest extends AbstractSnapshotTest @Override protected abstract String server(); + protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) { + return mutableConfig(signalTableOnly, storeOnlyCapturedDdl); + } + @Test public void executeBlockingSnapshot() throws Exception { // Testing.Print.enable(); @@ -75,7 +78,8 @@ public void executeBlockingSnapshot() throws Exception { insertRecords(ROW_COUNT, ROW_COUNT); - assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2, consumeRecordsByTopic(ROW_COUNT * 2, 10)); + SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(ROW_COUNT * 2, 10); + assertRecordsFromSnapshotAndStreamingArePresent(ROW_COUNT * 2, consumedRecordsByTopic); sendAdHocSnapshotSignalWithAdditionalConditionWithSurrogateKey("", "", BLOCKING, tableDataCollectionId()); @@ -119,8 +123,9 @@ public void executeBlockingSnapshotWhileStreaming() throws Exception { signalingRecords = 1; // from streaming + SourceRecords consumeRecordsByTopic = consumeRecordsByTopic((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), 10); assertRecordsWithValuesPresent((int) ((ROW_COUNT * 3) + totalSnapshotRecords), - getExpectedValues(totalSnapshotRecords), topicName(), consumeRecordsByTopic((int) ((ROW_COUNT * 3) + totalSnapshotRecords + signalingRecords), 10)); + getExpectedValues(totalSnapshotRecords), topicName(), consumeRecordsByTopic); } @Test @@ -141,8 +146,9 @@ public void executeBlockingSnapshotWithAdditionalCondition() throws Exception { signalingRecords = 1; // from streaming + SourceRecords consumedRecordsByTopic = consumeRecordsByTopic(500 + signalingRecords, 10); assertRecordsWithValuesPresent(500, IntStream.rangeClosed(0, 499).boxed().collect(Collectors.toList()), topicNames().get(1).toString(), - consumeRecordsByTopic(500 + signalingRecords, 10)); + consumedRecordsByTopic); } @@ -151,11 +157,11 @@ public void executeBlockingSnapshotWithAdditionalCondition() throws Exception { @SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.SQL_SERVER) @SkipWhenConnectorUnderTest(check = EqualityCheck.EQUAL, value = SkipWhenConnectorUnderTest.Connector.DB2) public void readsSchemaOnlyForSignaledTables() throws Exception { - Testing.Print.enable(); + // Testing.Print.enable(); populateTable(tableNames().get(1).toString()); - startConnectorWithSnapshot(x -> mutableConfig(false, false)); + startConnectorWithSnapshot(x -> historizedMutableConfig(false, false)); waitForStreamingRunning(connector(), server(), getStreamingNamespace(), task()); @@ -176,8 +182,6 @@ public void readsSchemaOnlyForSignaledTables() throws Exception { .map(sourceRecord -> ((Struct) sourceRecord.value()).getString("ddl")) .collect(Collectors.toList()); - Testing.print(ddls); - assertDdl(ddls); }