diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 4f1c708994..52ec217567 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -87,10 +87,13 @@ * Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously + *

Line 263-265: Skip null events in event deserializer to compatible with {@link + * com.github.shyiko.mysql.binlog.BinaryLogFileReader} which reads local binlog files. + * + *

Line 1433-1439 : Adjust GTID merging logic to support recovering from job which previously * specifying starting offset on start. * - *

Line 1485 : Add more error details for some exceptions. + *

Line 1491 : Add more error details for some exceptions. */ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource { @@ -257,6 +260,9 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { try { // Delegate to the superclass ... Event event = super.nextEvent(inputStream); + if (event == null) { + return null; + } // We have to record the most recent TableMapEventData for each table // number for our custom deserializers ...