Skip to content

Commit

Permalink
[hotfix][cdc-connector][mysql] Skip SchemaChangeEvents that were not …
Browse files Browse the repository at this point in the history
…included in capturedTableFilter (#2986)

This closes #2981
  • Loading branch information
lvyanquan committed Jan 15, 2024
1 parent 44692aa commit 9abf2cf
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -522,6 +522,12 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
tableId,
Collections.singletonList(
Column.physicalColumn("DESC3", DataTypes.BIGINT()))));

// Should not catch SchemaChangeEvent of tables other than `products`
statement.execute(
String.format(
"ALTER TABLE `%s`.`orders` ADD COLUMN `desc1` VARCHAR(45) NULL;",
inventoryDatabase.getDatabaseName()));
return expected;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStructContainsChunkKey;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isDataChangeRecord;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isSchemaChangeEvent;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.isTableChangeRecord;

/**
* A Debezium binlog reader implementation that also support reads binlog and filter overlapping
Expand Down Expand Up @@ -245,9 +247,15 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
}
// not in the monitored splits scope, do not emit
return false;
} else if (isSchemaChangeEvent(sourceRecord)) {
if (isTableChangeRecord(sourceRecord)) {
TableId tableId = getTableId(sourceRecord);
return capturedTableFilter.isIncluded(tableId);
} else {
// Not related to changes in table structure, like `CREATE/DROP DATABASE`, skip it
return false;
}
}
// always send the schema change event and signal event
// we need record them to state of Flink
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.flink.table.types.logical.RowType;

import com.ververica.cdc.common.utils.StringUtils;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.WatermarkKind;
import com.ververica.cdc.connectors.mysql.debezium.reader.DebeziumReader;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
Expand Down Expand Up @@ -387,6 +388,13 @@ public static TableId getTableId(SourceRecord dataRecord) {
return new TableId(dbName, null, tableName);
}

public static boolean isTableChangeRecord(SourceRecord dataRecord) {
Struct value = (Struct) dataRecord.value();
Struct source = value.getStruct(Envelope.FieldName.SOURCE);
String tableName = source.getString(TABLE_NAME_KEY);
return !StringUtils.isNullOrWhitespaceOnly(tableName);
}

public static Object[] getSplitKey(
RowType splitBoundaryType, SchemaNameAdjuster nameAdjuster, Struct target) {
// the split key field contains single field now
Expand Down

0 comments on commit 9abf2cf

Please sign in to comment.