Skip to content

Commit

Permalink
DBZ-6828 Blocking snapshot will generate schema events only for signa…
Browse files Browse the repository at this point in the history
…led tables
  • Loading branch information
mfvitale committed Sep 1, 2023
1 parent c2b4831 commit ddf876e
Show file tree
Hide file tree
Showing 15 changed files with 208 additions and 78 deletions.
Expand Up @@ -189,7 +189,7 @@ public SnapshottingTask getBlockingSnapshottingTask(MongoDbPartition partition,
Map<String, String> filtersByTable = snapshotConfiguration.getAdditionalConditions().stream()
.collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter));

return new MongoDbSnapshottingTask(replicaSets.all(), snapshotConfiguration.getDataCollections(), filtersByTable);
return new MongoDbSnapshottingTask(replicaSets.all(), snapshotConfiguration.getDataCollections(), filtersByTable, true);
}

@Override
Expand All @@ -200,20 +200,20 @@ public SnapshottingTask getSnapshottingTask(MongoDbPartition partition, MongoDbO
// If no snapshot should occur, return task with no replica sets
if (this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) {
LOGGER.info("According to the connector configuration, no snapshot will occur.");
return new MongoDbSnapshottingTask(Collections.emptyList(), dataCollectionsToBeSnapshotted, Map.of());
return new MongoDbSnapshottingTask(Collections.emptyList(), dataCollectionsToBeSnapshotted, Map.of(), false);
}

if (offsetContext == null) {
LOGGER.info("No previous offset has been found");
return new MongoDbSnapshottingTask(replicaSets.all(), dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection());
return new MongoDbSnapshottingTask(replicaSets.all(), dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection(), false);
}

// Collect which replica-sets require being snapshotted
final var replicaSetsToSnapshot = replicaSets.all().stream()
.filter(replicaSet -> isSnapshotExpected(partition, replicaSet, offsetContext))
.collect(Collectors.toList());

return new MongoDbSnapshottingTask(replicaSetsToSnapshot, dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection());
return new MongoDbSnapshottingTask(replicaSetsToSnapshot, dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection(), false);
}

@Override
Expand Down Expand Up @@ -501,8 +501,8 @@ public static class MongoDbSnapshottingTask extends SnapshottingTask {

private final List<ReplicaSet> replicaSetsToSnapshot;

public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot, List<String> dataCollections, Map<String, String> filterQueries) {
super(false, !replicaSetsToSnapshot.isEmpty(), dataCollections, filterQueries);
public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot, List<String> dataCollections, Map<String, String> filterQueries, boolean isBlocking) {
super(false, !replicaSetsToSnapshot.isEmpty(), dataCollections, filterQueries, isBlocking);
this.replicaSetsToSnapshot = replicaSetsToSnapshot;
}

Expand Down
Expand Up @@ -97,7 +97,7 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {

LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false);
}

LOGGER.info("No previous offset has been found");
Expand All @@ -110,7 +110,7 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse

return new SnapshottingTask(this.connectorConfig.getSnapshotMode().includeSchema(), this.connectorConfig.getSnapshotMode().includeData(),
dataCollectionsToBeSnapshotted,
snapshotSelectOverridesByTable);
snapshotSelectOverridesByTable, false);
}

@Override
Expand Down Expand Up @@ -326,7 +326,7 @@ private void addSchemaEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffse
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
MySqlOffsetContext offsetContext)
MySqlOffsetContext offsetContext, SnapshottingTask snapshottingTask)
throws Exception {
Set<TableId> capturedSchemaTables;
if (twoPhaseSchemaSnapshot()) {
Expand All @@ -350,8 +350,10 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
.collect(Collectors.groupingBy(TableId::catalog, LinkedHashMap::new, Collectors.toList()));
final Set<String> databases = tablesToRead.keySet();

// Record default charset
addSchemaEvent(snapshotContext, "", connection.setStatementFor(connection.readMySqlCharsetSystemVariables()));
if (!snapshottingTask.isBlocking()) {
// Record default charset
addSchemaEvent(snapshotContext, "", connection.setStatementFor(connection.readMySqlCharsetSystemVariables()));
}

for (TableId tableId : capturedSchemaTables) {
if (!sourceContext.isRunning()) {
Expand All @@ -374,15 +376,18 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
throw new InterruptedException("Interrupted while reading structure of schema " + databases);
}

LOGGER.info("Reading structure of database '{}'", database);
addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + quote(database));
final StringBuilder createDatabaseDdl = new StringBuilder("CREATE DATABASE " + quote(database));
final DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database);
if (defaultDatabaseLocales != null) {
defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDdl);
if (!snapshottingTask.isBlocking()) {
// in case of blocking snapshot we want to read structures only for collections specified in the signal
LOGGER.info("Reading structure of database '{}'", database);
addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + quote(database));
final StringBuilder createDatabaseDdl = new StringBuilder("CREATE DATABASE " + quote(database));
final DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database);
if (defaultDatabaseLocales != null) {
defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDdl);
}
addSchemaEvent(snapshotContext, database, createDatabaseDdl.toString());
addSchemaEvent(snapshotContext, database, "USE " + quote(database));
}
addSchemaEvent(snapshotContext, database, createDatabaseDdl.toString());
addSchemaEvent(snapshotContext, database, "USE " + quote(database));

if (connectorConfig.getSnapshotLockingMode().usesLocking()) {
createSchemaEventsForTables(snapshotContext, tablesToRead.get(database), true);
Expand Down Expand Up @@ -469,8 +474,7 @@ private boolean twoPhaseSchemaSnapshot() {

@Override
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
Table table)
throws SQLException {
Table table) {
return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table);
}

Expand Down Expand Up @@ -635,6 +639,10 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
LOGGER.debug("Processing schema event {}", event);

final TableId tableId = event.getTables().isEmpty() ? null : event.getTables().iterator().next().id();
if (snapshottingTask.isBlocking() && !snapshotContext.capturedTables.contains(tableId)) {
LOGGER.debug("Event {} will be skipped since it's not related to blocking snapshot captured table {}", event, snapshotContext.capturedTables);
continue;
}
snapshotContext.offset.event(tableId, getClock().currentTime());
dispatcher.dispatchSchemaChangeEvent(snapshotContext.partition, snapshotContext.offset, tableId, (receiver) -> receiver.schemaChangeEvent(event));
}
Expand Down
Expand Up @@ -6,6 +6,8 @@

package io.debezium.connector.mysql;

import static org.assertj.core.api.Assertions.assertThat;

import java.sql.SQLException;
import java.util.List;

Expand All @@ -23,7 +25,7 @@
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {

protected static final String SERVER_NAME = "is_test";
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot_test", "1", null).withDbHistoryPath(SCHEMA_HISTORY_PATH);

@Before
public void before() throws SQLException {
Expand All @@ -37,6 +39,13 @@ public void before() throws SQLException {
public void after() {
try {
stopConnector();

JdbcConnection connection = databaseConnection();
connection.execute("drop database if exists blocking_snapshot_test_1");

}
catch (SQLException e) {
throw new RuntimeException(e);
}
finally {
Testing.Files.delete(SCHEMA_HISTORY_PATH);
Expand All @@ -56,19 +65,13 @@ protected Configuration.Builder config() {

@Override
protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
final String tableIncludeList;
if (signalTableOnly) {
tableIncludeList = DATABASE.qualifiedTableName("c");
}
else {
tableIncludeList = DATABASE.qualifiedTableName("a") + ", " + DATABASE.qualifiedTableName("c");
}

return DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.with(MySqlConnectorConfig.USER, "mysqluser")
.with(MySqlConnectorConfig.PASSWORD, "mysqlpw")
.with(MySqlConnectorConfig.SNAPSHOT_MODE, MySqlConnectorConfig.SnapshotMode.INITIAL.getValue())
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, false)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(MySqlConnectorConfig.SIGNAL_DATA_COLLECTION, DATABASE.qualifiedTableName("debezium_signal"))
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, storeOnlyCapturedDdl)
Expand Down Expand Up @@ -146,4 +149,20 @@ private TableId tableNameId(String table) {
return TableId.parse(DATABASE.qualifiedTableName(table));
}

@Override
protected int expectedDdlsCount() {
return 12;
}

@Override
protected void assertDdl(List<String> schemaChangesDdls) {

assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 2)).isEqualTo("DROP TABLE IF EXISTS `blocking_snapshot_test_1`.`b`");
assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo("\"CREATE TABLE `b` (\n" +
" `pk` int NOT NULL AUTO_INCREMENT,\n" +
" `aa` int DEFAULT NULL,\n" +
" PRIMARY KEY (`pk`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci\"");

}
}
Expand Up @@ -92,7 +92,7 @@ else if (snapshotSchema) {
LOGGER.info("According to the connector configuration only schema will be snapshot.");
}

return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false);
}

@Override
Expand Down Expand Up @@ -171,7 +171,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext<OraclePartition
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext,
OracleOffsetContext offsetContext)
OracleOffsetContext offsetContext, SnapshottingTask snapshottingTask)
throws SQLException, InterruptedException {
Set<TableId> capturedSchemaTables;
if (databaseSchema.storeOnlyCapturedTables()) {
Expand All @@ -185,11 +185,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,

Set<String> schemas = capturedSchemaTables.stream().map(TableId::schema).collect(Collectors.toSet());

// reading info only for the schemas we're interested in as per the set of captured tables;
// while the passed table name filter alone would skip all non-included tables, reading the schema
// would take much longer that way
// however, for users interested only in captured tables, we need to pass also table filter
final Tables.TableFilter tableFilter = connectorConfig.storeOnlyCapturedTables() ? connectorConfig.getTableFilters().dataCollectionFilter() : null;
final Tables.TableFilter tableFilter = getTableFilter(snapshottingTask, snapshotContext);
for (String schema : schemas) {
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while reading structure of schema " + schema);
Expand All @@ -204,6 +200,19 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
}
}

private Tables.TableFilter getTableFilter(SnapshottingTask snapshottingTask, RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext) {

if (snapshottingTask.isBlocking()) {
return Tables.TableFilter.fromPredicate(snapshotContext.capturedTables::contains);
}

// reading info only for the schemas we're interested in as per the set of captured tables;
// while the passed table name filter alone would skip all non-included tables, reading the schema
// would take much longer that way
// however, for users interested only in captured tables, we need to pass also table filter
return connectorConfig.storeOnlyCapturedTables() ? connectorConfig.getTableFilters().dataCollectionFilter() : null;
}

@Override
protected String enhanceOverriddenSelect(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext,
String overriddenSelect, TableId tableId) {
Expand Down
Expand Up @@ -6,6 +6,8 @@

package io.debezium.connector.oracle;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.List;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -125,6 +127,7 @@ protected Configuration.Builder config() {
.with(OracleConnectorConfig.SIGNAL_DATA_COLLECTION, TestHelper.getDatabaseName() + ".DEBEZIUM.DEBEZIUM_SIGNAL")
.with(OracleConnectorConfig.SCHEMA_INCLUDE_LIST, "DEBEZIUM")
.with(OracleConnectorConfig.SNAPSHOT_MODE_TABLES, TestHelper.getDatabaseName() + ".DEBEZIUM.A")
.with(OracleConnectorConfig.INCLUDE_SCHEMA_CHANGES, true)
.with(SchemaHistory.STORE_ONLY_CAPTURED_TABLES_DDL, true);
}

Expand Down Expand Up @@ -158,4 +161,20 @@ protected String server() {
return TestHelper.SERVER_NAME;
}

@Override
protected int expectedDdlsCount() {
return 4;
}

@Override
protected void assertDdl(List<String> schemaChangesDdls) {
assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo("\n" +
" CREATE TABLE \"DEBEZIUM\".\"B\" \n" +
" (\t\"PK\" NUMBER(9,0), \n" +
"\t\"AA\" NUMBER(9,0), \n" +
"\t PRIMARY KEY (\"PK\")\n" +
" USING INDEX ENABLE, \n" +
"\t SUPPLEMENTAL LOG DATA (ALL) COLUMNS\n" +
" ) ;");
}
}
Expand Up @@ -30,6 +30,7 @@
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;

Expand Down Expand Up @@ -76,7 +77,7 @@ public SnapshottingTask getSnapshottingTask(PostgresPartition partition, Postgre
snapshotSchema = false;
}

return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false);
}

@Override
Expand Down Expand Up @@ -183,7 +184,7 @@ else if (!snapshotter.shouldStreamEventsStartingFromSnapshot() && startingSlotIn
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<PostgresPartition, PostgresOffsetContext> snapshotContext,
PostgresOffsetContext offsetContext)
PostgresOffsetContext offsetContext, SnapshottingTask snapshottingTask)
throws SQLException, InterruptedException {
Set<String> schemas = snapshotContext.capturedTables.stream()
.map(TableId::schema)
Expand All @@ -198,11 +199,15 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
}

LOGGER.info("Reading structure of schema '{}' of catalog '{}'", schema, snapshotContext.catalogName);

Tables.TableFilter tableFilter = snapshottingTask.isBlocking() ? Tables.TableFilter.fromPredicate(snapshotContext.capturedTables::contains)
: connectorConfig.getTableFilters().dataCollectionFilter();

jdbcConnection.readSchema(
snapshotContext.tables,
snapshotContext.catalogName,
schema,
connectorConfig.getTableFilters().dataCollectionFilter(),
tableFilter,
null,
false);
}
Expand Down
Expand Up @@ -71,7 +71,8 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
.with(CommonConnectorConfig.SIGNAL_POLL_INTERVAL_MS, 5)
.with(PostgresConnectorConfig.INCREMENTAL_SNAPSHOT_CHUNK_SIZE, 10)
.with(PostgresConnectorConfig.SCHEMA_INCLUDE_LIST, "s1")
.with(PostgresConnectorConfig.SNAPSHOT_MODE_TABLES, "s1.a");
.with(PostgresConnectorConfig.SNAPSHOT_MODE_TABLES, "s1.a")
.with(PostgresConnectorConfig.INCLUDE_SCHEMA_CHANGES, true);
}

@Override
Expand Down

0 comments on commit ddf876e

Please sign in to comment.