Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-6828 Ad-hoc blocking snaps trigger emits schema changes of all tables #4824

Merged
merged 2 commits into from Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -189,7 +189,7 @@ public SnapshottingTask getBlockingSnapshottingTask(MongoDbPartition partition,
Map<String, String> filtersByTable = snapshotConfiguration.getAdditionalConditions().stream()
.collect(Collectors.toMap(k -> k.getDataCollection().toString(), AdditionalCondition::getFilter));

return new MongoDbSnapshottingTask(replicaSets.all(), snapshotConfiguration.getDataCollections(), filtersByTable);
return new MongoDbSnapshottingTask(replicaSets.all(), snapshotConfiguration.getDataCollections(), filtersByTable, true);
}

@Override
Expand All @@ -200,20 +200,20 @@ public SnapshottingTask getSnapshottingTask(MongoDbPartition partition, MongoDbO
// If no snapshot should occur, return task with no replica sets
if (this.connectorConfig.getSnapshotMode().equals(MongoDbConnectorConfig.SnapshotMode.NEVER)) {
LOGGER.info("According to the connector configuration, no snapshot will occur.");
return new MongoDbSnapshottingTask(Collections.emptyList(), dataCollectionsToBeSnapshotted, Map.of());
return new MongoDbSnapshottingTask(Collections.emptyList(), dataCollectionsToBeSnapshotted, Map.of(), false);
}

if (offsetContext == null) {
LOGGER.info("No previous offset has been found");
return new MongoDbSnapshottingTask(replicaSets.all(), dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection());
return new MongoDbSnapshottingTask(replicaSets.all(), dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection(), false);
}

// Collect which replica-sets require being snapshotted
final var replicaSetsToSnapshot = replicaSets.all().stream()
.filter(replicaSet -> isSnapshotExpected(partition, replicaSet, offsetContext))
.collect(Collectors.toList());

return new MongoDbSnapshottingTask(replicaSetsToSnapshot, dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection());
return new MongoDbSnapshottingTask(replicaSetsToSnapshot, dataCollectionsToBeSnapshotted, connectorConfig.getSnapshotFilterQueryByCollection(), false);
}

@Override
Expand Down Expand Up @@ -501,8 +501,8 @@ public static class MongoDbSnapshottingTask extends SnapshottingTask {

private final List<ReplicaSet> replicaSetsToSnapshot;

public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot, List<String> dataCollections, Map<String, String> filterQueries) {
super(false, !replicaSetsToSnapshot.isEmpty(), dataCollections, filterQueries);
public MongoDbSnapshottingTask(List<ReplicaSet> replicaSetsToSnapshot, List<String> dataCollections, Map<String, String> filterQueries, boolean isBlocking) {
super(false, !replicaSetsToSnapshot.isEmpty(), dataCollections, filterQueries, isBlocking);
this.replicaSetsToSnapshot = replicaSetsToSnapshot;
}

Expand Down
Expand Up @@ -309,19 +309,34 @@ private void emitChangeEvent(MySqlPartition partition, MySqlOffsetContext offset
((TableAlteredEvent) event).previousTableId());
}
else {
Table table = getTable(tableId, type);
schemaChangeEvent = SchemaChangeEvent.of(
type,
partition,
offset,
sanitizedDbName,
null,
event.statement(),
tableId != null ? tables().forTable(tableId) : null,
table,
snapshot);
}
schemaChangeEvents.add(schemaChangeEvent);
}

private Table getTable(TableId tableId, SchemaChangeEventType type) {

if (tableId == null) {
return null;
}

if (SchemaChangeEventType.DROP == type) {
// DROP events don't have information about tableChanges, so we are creating a Table object with just the tableId to be use
// during blocking snapshot to filter out drop events not related to table to be snapshotted.
return Table.editor().tableId(tableId).create();
}
return tables().forTable(tableId);
}

private boolean acceptableDatabase(final String databaseName) {
return !storeOnlyCapturedTables()
|| filters.databaseFilter().test(databaseName)
Expand Down
Expand Up @@ -5,6 +5,8 @@
*/
package io.debezium.connector.mysql;

import static java.util.function.Predicate.not;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -97,7 +99,7 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse
if (previousOffset != null && !previousOffset.isSnapshotRunning()) {

LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted.");
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false);
}

LOGGER.info("No previous offset has been found");
Expand All @@ -110,7 +112,7 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse

return new SnapshottingTask(this.connectorConfig.getSnapshotMode().includeSchema(), this.connectorConfig.getSnapshotMode().includeData(),
dataCollectionsToBeSnapshotted,
snapshotSelectOverridesByTable);
snapshotSelectOverridesByTable, false);
}

@Override
Expand Down Expand Up @@ -319,14 +321,19 @@ else if (!connectorConfig.getSnapshotMode().shouldStream()) {

private void addSchemaEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
String database, String ddl) {
schemaEvents.addAll(databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database,
snapshotContext.offset, clock.currentTimeAsInstant()));

List<SchemaChangeEvent> schemaChangeEvents = databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database,
snapshotContext.offset, clock.currentTimeAsInstant());
List<SchemaChangeEvent> missingSchemaChangeEvents = schemaChangeEvents.stream()
.filter(not(schemaEvents::contains))
.collect(Collectors.toList());
Comment on lines +327 to +329

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfvitale Hi here have a performance bottleneck. time complexity is O(databaseSchemaNumber²), We have 140+ databases on single instance, here process need 2 hour+.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @minchowang can you please open a separate issue? Thanks.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schemaEvents.addAll(missingSchemaChangeEvents);
}

@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
MySqlOffsetContext offsetContext)
MySqlOffsetContext offsetContext, SnapshottingTask snapshottingTask)
throws Exception {
Set<TableId> capturedSchemaTables;
if (twoPhaseSchemaSnapshot()) {
Expand All @@ -350,8 +357,10 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
.collect(Collectors.groupingBy(TableId::catalog, LinkedHashMap::new, Collectors.toList()));
final Set<String> databases = tablesToRead.keySet();

// Record default charset
addSchemaEvent(snapshotContext, "", connection.setStatementFor(connection.readMySqlCharsetSystemVariables()));
if (!snapshottingTask.isBlocking()) {
// Record default charset
addSchemaEvent(snapshotContext, "", connection.setStatementFor(connection.readMySqlCharsetSystemVariables()));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpechane I noted that for Charset statement no database name is provided. Is it right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, IIRC it does not depnd on database

}

for (TableId tableId : capturedSchemaTables) {
if (!sourceContext.isRunning()) {
Expand All @@ -374,15 +383,18 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
throw new InterruptedException("Interrupted while reading structure of schema " + databases);
}

LOGGER.info("Reading structure of database '{}'", database);
addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + quote(database));
final StringBuilder createDatabaseDdl = new StringBuilder("CREATE DATABASE " + quote(database));
final DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database);
if (defaultDatabaseLocales != null) {
defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDdl);
if (!snapshottingTask.isBlocking()) {
// in case of blocking snapshot we want to read structures only for collections specified in the signal
LOGGER.info("Reading structure of database '{}'", database);
addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + quote(database));
final StringBuilder createDatabaseDdl = new StringBuilder("CREATE DATABASE " + quote(database));
final DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database);
if (defaultDatabaseLocales != null) {
defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDdl);
}
addSchemaEvent(snapshotContext, database, createDatabaseDdl.toString());
addSchemaEvent(snapshotContext, database, "USE " + quote(database));
}
addSchemaEvent(snapshotContext, database, createDatabaseDdl.toString());
addSchemaEvent(snapshotContext, database, "USE " + quote(database));

if (connectorConfig.getSnapshotLockingMode().usesLocking()) {
createSchemaEventsForTables(snapshotContext, tablesToRead.get(database), true);
Expand Down Expand Up @@ -469,8 +481,7 @@ private boolean twoPhaseSchemaSnapshot() {

@Override
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext,
Table table)
throws SQLException {
Table table) {
return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table);
}

Expand Down Expand Up @@ -635,6 +646,10 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source
LOGGER.debug("Processing schema event {}", event);

final TableId tableId = event.getTables().isEmpty() ? null : event.getTables().iterator().next().id();
if (snapshottingTask.isBlocking() && !snapshotContext.capturedTables.contains(tableId)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpechane Here I need to avoid to resend already processed events and process the only ones requested in bloking snapshot signal. Is there a way to check the same for the DROP statements for which tableId is null?

I mean something more robust then checking the name of the table in the ddl field.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfvitale Under which conditions the DROP even will not contain tableId?

Copy link
Member Author

@mfvitale mfvitale Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpechane Any DROP events both from snapshot and streaming. Do you know if it's for some limitation?

image

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jpechane I think I have found an acceptable solution. Please check commit f478c7e

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mfvitale It is good workaround. But I'd argu that the missing table change is bug in this case. Would you be able to dig into it a bit deeper? If not we can merge this PR and create another Jira just for that problem.

Copy link
Member Author

@mfvitale mfvitale Sep 19, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have the same feeling but not 100% sure.

I think in this case is better to create a separate Jira issue to investigate it.

LOGGER.debug("Event {} will be skipped since it's not related to blocking snapshot captured table {}", event, snapshotContext.capturedTables);
continue;
}
snapshotContext.offset.event(tableId, getClock().currentTime());
dispatcher.dispatchSchemaChangeEvent(snapshotContext.partition, snapshotContext.offset, tableId, (receiver) -> receiver.schemaChangeEvent(event));
}
Expand Down
Expand Up @@ -6,14 +6,18 @@

package io.debezium.connector.mysql;

import static org.assertj.core.api.Assertions.assertThat;

import java.sql.SQLException;
import java.util.List;

import org.jetbrains.annotations.NotNull;
import org.junit.After;
import org.junit.Before;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.connector.mysql.junit.MySqlDatabaseVersionResolver;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.pipeline.AbstractBlockingSnapshotTest;
import io.debezium.relational.TableId;
Expand All @@ -23,7 +27,9 @@
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest {

protected static final String SERVER_NAME = "is_test";
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH);
public static final int MYSQL8 = 8;
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot_test", "1", null).withDbHistoryPath(SCHEMA_HISTORY_PATH);
private final MySqlDatabaseVersionResolver databaseVersionResolver = new MySqlDatabaseVersionResolver();

@Before
public void before() throws SQLException {
Expand All @@ -37,6 +43,13 @@ public void before() throws SQLException {
public void after() {
try {
stopConnector();

JdbcConnection connection = databaseConnection();
connection.execute("drop database if exists blocking_snapshot_test_1");

}
catch (SQLException e) {
throw new RuntimeException(e);
}
finally {
Testing.Files.delete(SCHEMA_HISTORY_PATH);
Expand All @@ -56,13 +69,7 @@ protected Configuration.Builder config() {

@Override
protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {
final String tableIncludeList;
if (signalTableOnly) {
tableIncludeList = DATABASE.qualifiedTableName("c");
}
else {
tableIncludeList = DATABASE.qualifiedTableName("a") + ", " + DATABASE.qualifiedTableName("c");
}

return DATABASE.defaultConfig()
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true)
.with(MySqlConnectorConfig.USER, "mysqluser")
Expand All @@ -76,6 +83,13 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s
.with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO);
}

@Override
protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) {

return mutableConfig(signalTableOnly, storeOnlyCapturedDdl)
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true);
}

@Override
protected String connector() {
return "mysql";
Expand Down Expand Up @@ -146,4 +160,33 @@ private TableId tableNameId(String table) {
return TableId.parse(DATABASE.qualifiedTableName(table));
}

@Override
protected int expectedDdlsCount() {
return 12;
}

@Override
protected void assertDdl(List<String> schemaChangesDdls) {

assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 2)).isEqualTo("DROP TABLE IF EXISTS `blocking_snapshot_test_1`.`b`");
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently this will fail because for the DROP events we don't have information about the table. See https://github.com/debezium/debezium/pull/4824/files#diff-63840528f25c3335e78d8b91c2ea67602750af012c1620d9981f11cb476b480aR642


assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo(getDdlString(databaseVersionResolver));

}

@NotNull
private static String getDdlString(MySqlDatabaseVersionResolver databaseVersionResolver) {

return databaseVersionResolver.getVersion().getMajor() < MYSQL8 ? "CREATE TABLE `b` (\n" +
" `pk` int(11) NOT NULL AUTO_INCREMENT,\n" +
" `aa` int(11) DEFAULT NULL,\n" +
" PRIMARY KEY (`pk`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=latin1"

: "CREATE TABLE `b` (\n" +
" `pk` int NOT NULL AUTO_INCREMENT,\n" +
" `aa` int DEFAULT NULL,\n" +
" PRIMARY KEY (`pk`)\n" +
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci";
}
}
Expand Up @@ -92,7 +92,7 @@ else if (snapshotSchema) {
LOGGER.info("According to the connector configuration only schema will be snapshot.");
}

return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable);
return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false);
}

@Override
Expand Down Expand Up @@ -171,7 +171,7 @@ protected void determineSnapshotOffset(RelationalSnapshotContext<OraclePartition
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext,
OracleOffsetContext offsetContext)
OracleOffsetContext offsetContext, SnapshottingTask snapshottingTask)
throws SQLException, InterruptedException {
Set<TableId> capturedSchemaTables;
if (databaseSchema.storeOnlyCapturedTables()) {
Expand All @@ -185,11 +185,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,

Set<String> schemas = capturedSchemaTables.stream().map(TableId::schema).collect(Collectors.toSet());

// reading info only for the schemas we're interested in as per the set of captured tables;
// while the passed table name filter alone would skip all non-included tables, reading the schema
// would take much longer that way
// however, for users interested only in captured tables, we need to pass also table filter
final Tables.TableFilter tableFilter = connectorConfig.storeOnlyCapturedTables() ? connectorConfig.getTableFilters().dataCollectionFilter() : null;
final Tables.TableFilter tableFilter = getTableFilter(snapshottingTask, snapshotContext);
for (String schema : schemas) {
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while reading structure of schema " + schema);
Expand All @@ -204,6 +200,19 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
}
}

private Tables.TableFilter getTableFilter(SnapshottingTask snapshottingTask, RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext) {

if (snapshottingTask.isBlocking()) {
return Tables.TableFilter.fromPredicate(snapshotContext.capturedTables::contains);
}

// reading info only for the schemas we're interested in as per the set of captured tables;
// while the passed table name filter alone would skip all non-included tables, reading the schema
// would take much longer that way
// however, for users interested only in captured tables, we need to pass also table filter
return connectorConfig.storeOnlyCapturedTables() ? connectorConfig.getTableFilters().dataCollectionFilter() : null;
}

@Override
protected String enhanceOverriddenSelect(RelationalSnapshotContext<OraclePartition, OracleOffsetContext> snapshotContext,
String overriddenSelect, TableId tableId) {
Expand Down