From 9f8a340ef0af1eec5c633cc2be88ad1c36f692fb Mon Sep 17 00:00:00 2001 From: ConradJam Date: Fri, 17 May 2024 17:45:42 +0800 Subject: [PATCH 1/2] [FLINK-35592][cdc-connector][mysql] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp --- .../MysqlDebeziumTimeConverter.java | 112 +++++++++++------- .../MysqlDebeziumTimeConverterITCase.java | 8 +- .../test/resources/ddl/date_convert_test.sql | 3 +- 3 files changed, 73 insertions(+), 50 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java index c893d18011e..65114bc5dd0 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -117,54 +117,73 @@ private void registerDateConverter( registration.register( SchemaBuilder.string().name(schemaName).optional(), value -> { - log.debug( - "find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field " - + "default:{}", - field.name(), - columnType, - value == null ? "null" : value, - field.hasDefaultValue() ? field.defaultValue() : "null"); - if (value == null) { - return convertDateDefaultValue(field); - } - switch (columnType.toUpperCase(Locale.ROOT)) { - case DATE: - if (value instanceof Integer) { - return this.convertToDate( - columnType, LocalDate.ofEpochDay((Integer) value)); - } - return this.convertToDate(columnType, value); - case TIME: - if (value instanceof Long) { - long l = - Math.multiplyExact( - (Long) value, TimeUnit.MICROSECONDS.toNanos(1)); - return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l)); - } - return this.convertToTime(columnType, value); - case DATETIME: - if (value instanceof Long) { - if (getTimePrecision(field) <= 3) { - return this.convertToTimestamp( - columnType, - Conversions.toInstantFromMillis((Long) value)); - } - if (getTimePrecision(field) <= 6) { - return this.convertToTimestamp( - columnType, - Conversions.toInstantFromMicros((Long) value)); - } - } - return this.convertToTimestamp(columnType, value); - case TIMESTAMP: - return this.convertToTimestampWithTimezone(columnType, value); - default: - throw new IllegalArgumentException( - "Unknown field type " + columnType.toUpperCase(Locale.ROOT)); + try { + return convertDateObject(field, value, columnType); + } catch (Exception e) { + printConvertDateErrorClassLogs(field, registration, value); + throw new RuntimeException("MysqlDebeziumConverter error", e); } }); } + private void printConvertDateErrorClassLogs( + RelationalColumn field, + ConverterRegistration registration, + Object value) { + boolean useDefaultValueConvert = (value == null); + String fieldName = field.name(); + String fieldType = field.typeName().toUpperCase(); + String defaultValue = "null"; + if (field.hasDefaultValue()) { + if (field.defaultValue() != null) { + defaultValue = field.defaultValue().toString(); + } + } + log.warn( + "Find schema need to change dateType, but failed. Field name:{}, field type:{}, " + + "useDefaultValueConvert:{}, field default value:{}", + fieldName, + fieldType, + useDefaultValueConvert, + defaultValue); + } + + private Object convertDateObject(RelationalColumn field, Object value, String columnType) { + if (value == null) { + return convertDateDefaultValue(field); + } + switch (columnType.toUpperCase(Locale.ROOT)) { + case "DATE": + if (value instanceof Integer) { + return this.convertToDate(columnType, LocalDate.ofEpochDay((Integer) value)); + } + return this.convertToDate(columnType, value); + case "TIME": + if (value instanceof Long) { + long l = Math.multiplyExact((Long) value, TimeUnit.MICROSECONDS.toNanos(1)); + return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l)); + } + return this.convertToTime(columnType, value); + case "DATETIME": + if (value instanceof Long) { + if (getTimePrecision(field) <= 3) { + return this.convertToTimestamp( + columnType, Conversions.toInstantFromMillis((Long) value)); + } + if (getTimePrecision(field) <= 6) { + return this.convertToTimestamp( + columnType, Conversions.toInstantFromMicros((Long) value)); + } + } + return this.convertToTimestamp(columnType, value); + case "TIMESTAMP": + return this.convertToTimestampWithTimezone(columnType, value); + default: + throw new IllegalArgumentException( + "Unknown field type " + columnType.toUpperCase(Locale.ROOT)); + } + } + private Object convertToTimestampWithTimezone(String columnType, Object timestamp) { // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type. // Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually @@ -178,11 +197,12 @@ private Object convertToTimestampWithTimezone(String columnType, Object timestam ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId); return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter)); } else if (timestamp instanceof OffsetDateTime) { - OffsetDateTime value = (OffsetDateTime) timestamp; + OffsetDateTime value = + ((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime(); return ConvertTimeBceUtil.resolveEra( value.toLocalDate(), value.format(timestampFormatter)); } else if (timestamp instanceof ZonedDateTime) { - ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp; + ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId); return ConvertTimeBceUtil.resolveEra( zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter)); } else if (timestamp instanceof Instant) { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java index d14f31f9787..8adb92454ae 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/converters/MysqlDebeziumTimeConverterITCase.java @@ -154,7 +154,7 @@ private void testReadDateConvertDataStreamSource(String timezone) throws Excepti private void validTimestampValue(List result) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); - String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00"}; + String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00", "15:04:00"}; for (String after : result) { JsonNode jsonNode = mapper.readTree(after); Assert.assertEquals( @@ -232,7 +232,8 @@ private void checkData(TableResult tableResult) { new String[] { "+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]", "+I[3, 00:00:00, null, null, 00:01:20]", - "+I[2, 00:00:00, null, null, 00:00:00]" + "+I[2, 00:00:00, null, null, 00:00:00]", + "+I[4, 15:04:00, null, null, 00:01:10]" }; List expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable)); @@ -283,7 +284,8 @@ private String buildMySqlConfigWithTimezone(String timezone) { + "binlog_format = row\n" + "log_bin = mysql-bin\n" + "server-id = 223344\n" - + "binlog_row_image = FULL\n"; + + "binlog_row_image = FULL\n" + + "sql_mode = ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION\n"; String timezoneConf = "default-time_zone = '" + timezone + "'\n"; Files.write( cnf, diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql index 262c1ceb19f..ed9aadfdc3a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/date_convert_test.sql @@ -33,4 +33,5 @@ INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, test VALUES (1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'), (2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'), -(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120); \ No newline at end of file +(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120), +(4,20240612150400, DEFAULT, NULL ,110); \ No newline at end of file From 3f621bc6c70bf8f0001ef031f0be9f4a5d3f1658 Mon Sep 17 00:00:00 2001 From: Hang Ruan Date: Fri, 8 Nov 2024 15:38:33 +0800 Subject: [PATCH 2/2] fix some parts --- .../MysqlDebeziumTimeConverter.java | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java index 65114bc5dd0..753bb8f2d03 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -120,31 +120,25 @@ private void registerDateConverter( try { return convertDateObject(field, value, columnType); } catch (Exception e) { - printConvertDateErrorClassLogs(field, registration, value); + logConvertDateError(field, value); throw new RuntimeException("MysqlDebeziumConverter error", e); } }); } - private void printConvertDateErrorClassLogs( - RelationalColumn field, - ConverterRegistration registration, - Object value) { - boolean useDefaultValueConvert = (value == null); + private void logConvertDateError(RelationalColumn field, Object value) { String fieldName = field.name(); String fieldType = field.typeName().toUpperCase(); String defaultValue = "null"; - if (field.hasDefaultValue()) { - if (field.defaultValue() != null) { - defaultValue = field.defaultValue().toString(); - } + if (field.hasDefaultValue() && field.defaultValue() != null) { + defaultValue = field.defaultValue().toString(); } - log.warn( + log.error( "Find schema need to change dateType, but failed. Field name:{}, field type:{}, " - + "useDefaultValueConvert:{}, field default value:{}", + + "field value:{}, field default value:{}", fieldName, fieldType, - useDefaultValueConvert, + value == null ? "null" : value, defaultValue); }