From 0f8303403f051575a03e6ebac105e2cf446c57e5 Mon Sep 17 00:00:00 2001 From: "guiyuan.hx" Date: Tue, 7 May 2024 09:25:45 +0800 Subject: [PATCH] [FLINK-35300][cdc][mysql] Improve MySqlStreamingChangeEventSource to skip null events in event deserializer --- .../mysql/MySqlStreamingChangeEventSource.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index 4f1c708994..52ec217567 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -87,10 +87,13 @@ * Copied from Debezium project(1.9.8.Final) to fix * https://github.com/ververica/flink-cdc-connectors/issues/1944. * - *

Line 1427-1433 : Adjust GTID merging logic to support recovering from job which previously + *

Line 263-265: Skip null events in event deserializer to compatible with {@link + * com.github.shyiko.mysql.binlog.BinaryLogFileReader} which reads local binlog files. + * + *

Line 1433-1439 : Adjust GTID merging logic to support recovering from job which previously * specifying starting offset on start. * - *

Line 1485 : Add more error details for some exceptions. + *

Line 1491 : Add more error details for some exceptions. */ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource { @@ -257,6 +260,9 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException { try { // Delegate to the superclass ... Event event = super.nextEvent(inputStream); + if (event == null) { + return null; + } // We have to record the most recent TableMapEventData for each table // number for our custom deserializers ...