Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[hotfix][cdc-connector][mysql] skip SchemaChangeEvents that were not included in capturedTableFilter. #2986

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
tableId,
Collections.singletonList(
Column.physicalColumn("DESC3", DataTypes.BIGINT()))));

// Should not catch SchemaChangeEvent of tables other than `products`
statement.execute(
String.format(
"ALTER TABLE `%s`.`orders` ADD COLUMN `desc1` VARCHAR(45) NULL;",
inventoryDatabase.getDatabaseName()));
return expected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStructContainsChunkKey;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isTableChangeRecord;

/**
* A Debezium binlog reader implementation that also support reads binlog and filter overlapping
Expand Down Expand Up @@ -252,9 +254,15 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
}
// not in the monitored splits scope, do not emit
return false;
} else if (isSchemaChangeEvent(sourceRecord)) {
if (isTableChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
return capturedTableFilter.isIncluded(tableId);
} else {
// Not related to changes in table structure, like `CREATE/DROP DATABASE`, skip it
return false;
}
}
// always send the schema change event and signal event
// we need record them to state of Flink
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.flink.table.types.logical.RowType;

import com.ververica.cdc.common.utils.StringUtils;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind;
import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
Expand Down Expand Up @@ -387,6 +388,13 @@ public static TableId getTableId(SourceRecord dataRecord) {
return new TableId(dbName, null, tableName);
}

public static boolean isTableChangeRecord(SourceRecord dataRecord) {
Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String tableName = source.getString(TABLE_NAME_KEY);
return !StringUtils.isNullOrWhitespaceOnly(tableName);
}

public static Object[] getSplitKey(
RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Struct target) {
// the split key field contains single field now
Expand Down