New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DBZ-6828 Ad-hoc blocking snaps trigger emits schema changes of all tables #4824
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ | |
*/ | ||
package io.debezium.connector.mysql; | ||
|
||
import static java.util.function.Predicate.not; | ||
|
||
import java.sql.Connection; | ||
import java.sql.ResultSet; | ||
import java.sql.SQLException; | ||
|
@@ -97,7 +99,7 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse | |
if (previousOffset != null && !previousOffset.isSnapshotRunning()) { | ||
|
||
LOGGER.info("A previous offset indicating a completed snapshot has been found. Neither schema nor data will be snapshotted."); | ||
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable); | ||
return new SnapshottingTask(databaseSchema.isStorageInitializationExecuted(), false, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false); | ||
} | ||
|
||
LOGGER.info("No previous offset has been found"); | ||
|
@@ -110,7 +112,7 @@ public SnapshottingTask getSnapshottingTask(MySqlPartition partition, MySqlOffse | |
|
||
return new SnapshottingTask(this.connectorConfig.getSnapshotMode().includeSchema(), this.connectorConfig.getSnapshotMode().includeData(), | ||
dataCollectionsToBeSnapshotted, | ||
snapshotSelectOverridesByTable); | ||
snapshotSelectOverridesByTable, false); | ||
} | ||
|
||
@Override | ||
|
@@ -319,14 +321,19 @@ else if (!connectorConfig.getSnapshotMode().shouldStream()) { | |
|
||
private void addSchemaEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, | ||
String database, String ddl) { | ||
schemaEvents.addAll(databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database, | ||
snapshotContext.offset, clock.currentTimeAsInstant())); | ||
|
||
List<SchemaChangeEvent> schemaChangeEvents = databaseSchema.parseSnapshotDdl(snapshotContext.partition, ddl, database, | ||
snapshotContext.offset, clock.currentTimeAsInstant()); | ||
List<SchemaChangeEvent> missingSchemaChangeEvents = schemaChangeEvents.stream() | ||
.filter(not(schemaEvents::contains)) | ||
.collect(Collectors.toList()); | ||
schemaEvents.addAll(missingSchemaChangeEvents); | ||
} | ||
|
||
@Override | ||
protected void readTableStructure(ChangeEventSourceContext sourceContext, | ||
RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, | ||
MySqlOffsetContext offsetContext) | ||
MySqlOffsetContext offsetContext, SnapshottingTask snapshottingTask) | ||
throws Exception { | ||
Set<TableId> capturedSchemaTables; | ||
if (twoPhaseSchemaSnapshot()) { | ||
|
@@ -350,8 +357,10 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, | |
.collect(Collectors.groupingBy(TableId::catalog, LinkedHashMap::new, Collectors.toList())); | ||
final Set<String> databases = tablesToRead.keySet(); | ||
|
||
// Record default charset | ||
addSchemaEvent(snapshotContext, "", connection.setStatementFor(connection.readMySqlCharsetSystemVariables())); | ||
if (!snapshottingTask.isBlocking()) { | ||
// Record default charset | ||
addSchemaEvent(snapshotContext, "", connection.setStatementFor(connection.readMySqlCharsetSystemVariables())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jpechane I noted that for Charset statement no database name is provided. Is it right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, IIRC it does not depnd on database |
||
} | ||
|
||
for (TableId tableId : capturedSchemaTables) { | ||
if (!sourceContext.isRunning()) { | ||
|
@@ -374,15 +383,18 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, | |
throw new InterruptedException("Interrupted while reading structure of schema " + databases); | ||
} | ||
|
||
LOGGER.info("Reading structure of database '{}'", database); | ||
addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + quote(database)); | ||
final StringBuilder createDatabaseDdl = new StringBuilder("CREATE DATABASE " + quote(database)); | ||
final DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database); | ||
if (defaultDatabaseLocales != null) { | ||
defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDdl); | ||
if (!snapshottingTask.isBlocking()) { | ||
// in case of blocking snapshot we want to read structures only for collections specified in the signal | ||
LOGGER.info("Reading structure of database '{}'", database); | ||
addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS " + quote(database)); | ||
final StringBuilder createDatabaseDdl = new StringBuilder("CREATE DATABASE " + quote(database)); | ||
final DatabaseLocales defaultDatabaseLocales = databaseCharsets.get(database); | ||
if (defaultDatabaseLocales != null) { | ||
defaultDatabaseLocales.appendToDdlStatement(database, createDatabaseDdl); | ||
} | ||
addSchemaEvent(snapshotContext, database, createDatabaseDdl.toString()); | ||
addSchemaEvent(snapshotContext, database, "USE " + quote(database)); | ||
} | ||
addSchemaEvent(snapshotContext, database, createDatabaseDdl.toString()); | ||
addSchemaEvent(snapshotContext, database, "USE " + quote(database)); | ||
|
||
if (connectorConfig.getSnapshotLockingMode().usesLocking()) { | ||
createSchemaEventsForTables(snapshotContext, tablesToRead.get(database), true); | ||
|
@@ -469,8 +481,7 @@ private boolean twoPhaseSchemaSnapshot() { | |
|
||
@Override | ||
protected SchemaChangeEvent getCreateTableEvent(RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> snapshotContext, | ||
Table table) | ||
throws SQLException { | ||
Table table) { | ||
return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table); | ||
} | ||
|
||
|
@@ -635,6 +646,10 @@ protected void createSchemaChangeEventsForTables(ChangeEventSourceContext source | |
LOGGER.debug("Processing schema event {}", event); | ||
|
||
final TableId tableId = event.getTables().isEmpty() ? null : event.getTables().iterator().next().id(); | ||
if (snapshottingTask.isBlocking() && !snapshotContext.capturedTables.contains(tableId)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jpechane Here I need to avoid to resend already processed events and process the only ones requested in bloking snapshot signal. Is there a way to check the same for the I mean something more robust then checking the name of the table in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mfvitale Under which conditions the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @jpechane Any DROP events both from snapshot and streaming. Do you know if it's for some limitation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @mfvitale It is good workaround. But I'd argu that the missing table change is bug in this case. Would you be able to dig into it a bit deeper? If not we can merge this PR and create another Jira just for that problem. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have the same feeling but not 100% sure. I think in this case is better to create a separate Jira issue to investigate it. |
||
LOGGER.debug("Event {} will be skipped since it's not related to blocking snapshot captured table {}", event, snapshotContext.capturedTables); | ||
continue; | ||
} | ||
snapshotContext.offset.event(tableId, getClock().currentTime()); | ||
dispatcher.dispatchSchemaChangeEvent(snapshotContext.partition, snapshotContext.offset, tableId, (receiver) -> receiver.schemaChangeEvent(event)); | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,14 +6,18 @@ | |
|
||
package io.debezium.connector.mysql; | ||
|
||
import static org.assertj.core.api.Assertions.assertThat; | ||
|
||
import java.sql.SQLException; | ||
import java.util.List; | ||
|
||
import org.jetbrains.annotations.NotNull; | ||
import org.junit.After; | ||
import org.junit.Before; | ||
|
||
import io.debezium.config.CommonConnectorConfig; | ||
import io.debezium.config.Configuration; | ||
import io.debezium.connector.mysql.junit.MySqlDatabaseVersionResolver; | ||
import io.debezium.jdbc.JdbcConnection; | ||
import io.debezium.pipeline.AbstractBlockingSnapshotTest; | ||
import io.debezium.relational.TableId; | ||
|
@@ -23,7 +27,9 @@ | |
public class BlockingSnapshotIT extends AbstractBlockingSnapshotTest { | ||
|
||
protected static final String SERVER_NAME = "is_test"; | ||
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot-test").withDbHistoryPath(SCHEMA_HISTORY_PATH); | ||
public static final int MYSQL8 = 8; | ||
protected final UniqueDatabase DATABASE = new UniqueDatabase(SERVER_NAME, "blocking_snapshot_test", "1", null).withDbHistoryPath(SCHEMA_HISTORY_PATH); | ||
private final MySqlDatabaseVersionResolver databaseVersionResolver = new MySqlDatabaseVersionResolver(); | ||
|
||
@Before | ||
public void before() throws SQLException { | ||
|
@@ -37,6 +43,13 @@ public void before() throws SQLException { | |
public void after() { | ||
try { | ||
stopConnector(); | ||
|
||
JdbcConnection connection = databaseConnection(); | ||
connection.execute("drop database if exists blocking_snapshot_test_1"); | ||
|
||
} | ||
catch (SQLException e) { | ||
throw new RuntimeException(e); | ||
} | ||
finally { | ||
Testing.Files.delete(SCHEMA_HISTORY_PATH); | ||
|
@@ -56,13 +69,7 @@ protected Configuration.Builder config() { | |
|
||
@Override | ||
protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) { | ||
final String tableIncludeList; | ||
if (signalTableOnly) { | ||
tableIncludeList = DATABASE.qualifiedTableName("c"); | ||
} | ||
else { | ||
tableIncludeList = DATABASE.qualifiedTableName("a") + ", " + DATABASE.qualifiedTableName("c"); | ||
} | ||
|
||
return DATABASE.defaultConfig() | ||
.with(MySqlConnectorConfig.INCLUDE_SQL_QUERY, true) | ||
.with(MySqlConnectorConfig.USER, "mysqluser") | ||
|
@@ -76,6 +83,13 @@ protected Configuration.Builder mutableConfig(boolean signalTableOnly, boolean s | |
.with(CommonConnectorConfig.SCHEMA_NAME_ADJUSTMENT_MODE, CommonConnectorConfig.SchemaNameAdjustmentMode.AVRO); | ||
} | ||
|
||
@Override | ||
protected Configuration.Builder historizedMutableConfig(boolean signalTableOnly, boolean storeOnlyCapturedDdl) { | ||
|
||
return mutableConfig(signalTableOnly, storeOnlyCapturedDdl) | ||
.with(MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES, true); | ||
} | ||
|
||
@Override | ||
protected String connector() { | ||
return "mysql"; | ||
|
@@ -146,4 +160,33 @@ private TableId tableNameId(String table) { | |
return TableId.parse(DATABASE.qualifiedTableName(table)); | ||
} | ||
|
||
@Override | ||
protected int expectedDdlsCount() { | ||
return 12; | ||
} | ||
|
||
@Override | ||
protected void assertDdl(List<String> schemaChangesDdls) { | ||
|
||
assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 2)).isEqualTo("DROP TABLE IF EXISTS `blocking_snapshot_test_1`.`b`"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Currently this will fail because for the |
||
|
||
assertThat(schemaChangesDdls.get(schemaChangesDdls.size() - 1)).isEqualTo(getDdlString(databaseVersionResolver)); | ||
|
||
} | ||
|
||
@NotNull | ||
private static String getDdlString(MySqlDatabaseVersionResolver databaseVersionResolver) { | ||
|
||
return databaseVersionResolver.getVersion().getMajor() < MYSQL8 ? "CREATE TABLE `b` (\n" + | ||
" `pk` int(11) NOT NULL AUTO_INCREMENT,\n" + | ||
" `aa` int(11) DEFAULT NULL,\n" + | ||
" PRIMARY KEY (`pk`)\n" + | ||
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=latin1" | ||
|
||
: "CREATE TABLE `b` (\n" + | ||
" `pk` int NOT NULL AUTO_INCREMENT,\n" + | ||
" `aa` int DEFAULT NULL,\n" + | ||
" PRIMARY KEY (`pk`)\n" + | ||
") ENGINE=InnoDB AUTO_INCREMENT=1001 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci"; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mfvitale Hi here have a performance bottleneck. time complexity is
O(databaseSchemaNumber²)
, We have 140+ databases on single instance, here process need 2 hour+.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @minchowang can you please open a separate issue? Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://issues.redhat.com/browse/DBZ-7608