diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java index 493fd682c20..c40b8b45dd4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -174,11 +174,12 @@ private Object convertToTimestampWithTimezone(String columnType, Object timestam ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId); return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter)); } else if (timestamp instanceof OffsetDateTime) { - OffsetDateTime value = (OffsetDateTime) timestamp; + OffsetDateTime value = + ((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime(); return ConvertTimeBceUtil.resolveEra( value.toLocalDate(), value.format(timestampFormatter)); } else if (timestamp instanceof ZonedDateTime) { - ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp; + ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId); return ConvertTimeBceUtil.resolveEra( zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter)); } else if (timestamp instanceof Instant) {