Skip to content

Commit

Permalink
Merge pull request #84 from hchiorean/DBZ-92-97
Browse files Browse the repository at this point in the history
DBZ-92, DBZ-97 Makes logging more verbose and changes the snapshot reader to produce separate events for each DDL change
  • Loading branch information
rhauch committed Aug 10, 2016
2 parents 1e2cadf + 008263e commit bcaacbd
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.connector.mysql.RecordMakers.RecordsForTable;
Expand Down Expand Up @@ -205,10 +204,10 @@ protected void execute() {
// This column exists only in MySQL 5.6.5 or later ...
String gtidSet = rs.getString(5);// GTID set, may be null, blank, or contain a GTID set
source.setGtidSet(gtidSet);
logger.debug("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
logger.info("\t using binlog '{}' at position '{}' and gtid '{}'", binlogFilename, binlogPosition,
gtidSet);
} else {
logger.debug("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
logger.info("\t using binlog '{}' at position '{}'", binlogFilename, binlogPosition);
}
source.startSnapshot();
}
Expand All @@ -229,7 +228,7 @@ protected void execute() {
databaseNames.add(rs.getString(1));
}
});
logger.debug("\t list of available databases is: {}", databaseNames);
logger.info("\t list of available databases is: {}", databaseNames);

// ------
// STEP 5
Expand All @@ -248,9 +247,9 @@ protected void execute() {
if (filters.tableFilter().test(id)) {
tableIds.add(id);
tableIdsByDbName.computeIfAbsent(dbName, k -> new ArrayList<>()).add(id);
logger.debug("\t including '{}'", id);
logger.info("\t including '{}'", id);
} else {
logger.debug("\t '{}' is filtered out, discarding", id);
logger.info("\t '{}' is filtered out, discarding", id);
}
}
});
Expand All @@ -261,38 +260,31 @@ protected void execute() {
// ------
// Transform the current schema so that it reflects the *current* state of the MySQL server's contents.
// First, get the DROP TABLE and CREATE TABLE statement (with keys and constraint definitions) for our tables ...
logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas");
final List<String> ddlStatements = new ArrayList<>();
logger.info("Step 6: generating DROP and CREATE statements to reflect current database schemas:");
// Add DROP TABLE statements for all tables that we knew about AND those tables found in the databases ...
Set<TableId> allTableIds = new HashSet<>(schema.tables().tableIds());
allTableIds.addAll(tableIds);
allTableIds.forEach(tableId -> {
ddlStatements.add("DROP TABLE IF EXISTS " + tableId);
});
allTableIds.forEach(tableId -> schema.applyDdl(source, tableId.schema(), "DROP TABLE IF EXISTS " + tableId, this::enqueueSchemaChanges));
// Add a DROP DATABASE statement for each database that we no longer know about ...
schema.tables().tableIds().stream().map(TableId::catalog)
.filter(Predicates.not(databaseNames::contains))
.forEach(missingDbName -> {
ddlStatements.add("DROP DATABASE IF EXISTS " + missingDbName);
});
.forEach(missingDbName -> schema.applyDdl(source, missingDbName, "DROP DATABASE IF EXISTS " + missingDbName, this::enqueueSchemaChanges));
// Now process all of our tables for each database ...
for (Map.Entry<String, List<TableId>> entry : tableIdsByDbName.entrySet()) {
String dbName = entry.getKey();
// First drop, create, and then use the named database ...
ddlStatements.add("DROP DATABASE IF EXISTS " + dbName);
ddlStatements.add("CREATE DATABASE " + dbName);
ddlStatements.add("USE " + dbName);
schema.applyDdl(source, dbName, "DROP DATABASE IF EXISTS " + dbName, this::enqueueSchemaChanges);
schema.applyDdl(source, dbName, "CREATE DATABASE " + dbName, this::enqueueSchemaChanges);
schema.applyDdl(source, dbName, "USE " + dbName, this::enqueueSchemaChanges);
for (TableId tableId : entry.getValue()) {
sql.set("SHOW CREATE TABLE " + tableId);
mysql.query(sql.get(), rs -> {
if (rs.next()) ddlStatements.add(rs.getString(2)); // CREATE TABLE statement
if (rs.next()) {
schema.applyDdl(source, dbName, rs.getString(2), this::enqueueSchemaChanges);
}
});
}
}
// Finally, apply the DDL statements to the schema and then update the record maker...
logger.debug("Step 6b: applying DROP and CREATE statements to connector's table model");
String ddlStatementsStr = String.join(";" + System.lineSeparator(), ddlStatements);
schema.applyDdl(source, null, ddlStatementsStr, this::enqueueSchemaChanges);
context.makeRecord().regenerate();

// ------
Expand Down Expand Up @@ -348,7 +340,7 @@ protected void execute() {

// Scan the rows in the table ...
long start = clock.currentTimeInMillis();
logger.debug("Step 8: - scanning table '{}' ({} of {} tables)", tableId, ++counter, tableIds.size());
logger.info("Step 8: - scanning table '{}' ({} of {} tables)", tableId, ++counter, tableIds.size());
sql.set("SELECT * FROM " + tableId);
mysql.query(sql.get(), statementFactory, rs -> {
long rowNum = 0;
Expand Down Expand Up @@ -531,10 +523,12 @@ protected SourceRecord replaceOffset(SourceRecord record) {
record.value());
}

protected void enqueueSchemaChanges(String dbName, String ddlStatements) {
if (context.includeSchemaChangeRecords() &&
context.makeRecord().schemaChanges(dbName, ddlStatements, super::enqueueRecord) > 0) {
logger.debug("Recorded DDL statements for database '{}': {}", dbName, ddlStatements);
protected void enqueueSchemaChanges(String dbName, String ddlStatement) {
if (!context.includeSchemaChangeRecords() || ddlStatement.length() == 0) {
return;
}
if (context.makeRecord().schemaChanges(dbName, ddlStatement, super::enqueueRecord) > 0) {
logger.info("\t{}", ddlStatement);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@ public void shouldConsumeAllEventsFromDatabaseUsingSnapshot() throws SQLExceptio
// ---------------------------------------------------------------------------------------------------------------
// Consume all of the events due to startup and initialization of the database
// ---------------------------------------------------------------------------------------------------------------
SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 1); // 1 schema change record
assertThat(records.recordsForTopic("myServer").size()).isEqualTo(1);
SourceRecords records = consumeRecordsByTopic(5 + 9 + 9 + 4 + 11); // 11 schema change records
assertThat(records.recordsForTopic("myServer").size()).isEqualTo(11);
assertThat(records.recordsForTopic("myServer.connector_test.products").size()).isEqualTo(9);
assertThat(records.recordsForTopic("myServer.connector_test.products_on_hand").size()).isEqualTo(9);
assertThat(records.recordsForTopic("myServer.connector_test.customers").size()).isEqualTo(4);
assertThat(records.recordsForTopic("myServer.connector_test.orders").size()).isEqualTo(5);
assertThat(records.topics().size()).isEqualTo(5);
assertThat(records.databaseNames().size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("connector_test").size()).isEqualTo(1);
assertThat(records.ddlRecordsForDatabase("connector_test").size()).isEqualTo(11);
assertThat(records.ddlRecordsForDatabase("readbinlog_test")).isNull();
records.ddlRecordsForDatabase("connector_test").forEach(this::print);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,8 @@ public void shouldCreateSnapshotOfSingleDatabaseWithSchemaChanges() throws Excep
// The last poll should always return null ...
assertThat(records).isNull();

// There should be 1 schema change ...
assertThat(schemaChanges.recordCount()).isEqualTo(1);
// There should be 11 schema changes ...
assertThat(schemaChanges.recordCount()).isEqualTo(11);
assertThat(schemaChanges.databaseCount()).isEqualTo(1);
assertThat(schemaChanges.databases()).containsOnly(DB_NAME);

Expand Down

0 comments on commit bcaacbd

Please sign in to comment.