diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java index 493fd682c2..1eb98947df 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/converters/MysqlDebeziumTimeConverter.java @@ -113,54 +113,73 @@ private void registerDateConverter( registration.register( SchemaBuilder.string().name(schemaName).optional(), value -> { - log.debug( - "find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field " - + "default:{}", - field.name(), - columnType, - value == null ? "null" : value, - field.hasDefaultValue() ? field.defaultValue() : "null"); - if (value == null) { - return convertDateDefaultValue(field); - } - switch (columnType.toUpperCase(Locale.ROOT)) { - case "DATE": - if (value instanceof Integer) { - return this.convertToDate( - columnType, LocalDate.ofEpochDay((Integer) value)); - } - return this.convertToDate(columnType, value); - case "TIME": - if (value instanceof Long) { - long l = - Math.multiplyExact( - (Long) value, TimeUnit.MICROSECONDS.toNanos(1)); - return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l)); - } - return this.convertToTime(columnType, value); - case "DATETIME": - if (value instanceof Long) { - if (getTimePrecision(field) <= 3) { - return this.convertToTimestamp( - columnType, - Conversions.toInstantFromMillis((Long) value)); - } - if (getTimePrecision(field) <= 6) { - return this.convertToTimestamp( - columnType, - Conversions.toInstantFromMicros((Long) value)); - } - } - return this.convertToTimestamp(columnType, value); - case "TIMESTAMP": - return this.convertToTimestampWithTimezone(columnType, value); - default: - throw new IllegalArgumentException( - "Unknown field type " + columnType.toUpperCase(Locale.ROOT)); + try { + return convertDateObject(field, value, columnType); + } catch (Exception e) { + printConvertDateErrorClassLogs(field, registration, value); + throw new RuntimeException("MysqlDebeziumConverter error", e); } }); } + private void printConvertDateErrorClassLogs( + RelationalColumn field, + ConverterRegistration registration, + Object value) { + boolean useDefaultValueConvert = (value == null); + String fieldName = field.name(); + String fieldType = field.typeName().toUpperCase(); + String defaultValue = "null"; + if (field.hasDefaultValue()) { + if (field.defaultValue() != null) { + defaultValue = field.defaultValue().toString(); + } + } + log.warn( + "find schema need to change dateType, field name:||{}|| field type:||{}|| is use default " + + "convert:||{}|| field default value:||{}|| field charge value fail", + fieldName, + fieldType, + useDefaultValueConvert, + defaultValue); + } + + private Object convertDateObject(RelationalColumn field, Object value, String columnType) { + if (value == null) { + return convertDateDefaultValue(field); + } + switch (columnType.toUpperCase(Locale.ROOT)) { + case "DATE": + if (value instanceof Integer) { + return this.convertToDate(columnType, LocalDate.ofEpochDay((Integer) value)); + } + return this.convertToDate(columnType, value); + case "TIME": + if (value instanceof Long) { + long l = Math.multiplyExact((Long) value, TimeUnit.MICROSECONDS.toNanos(1)); + return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l)); + } + return this.convertToTime(columnType, value); + case "DATETIME": + if (value instanceof Long) { + if (getTimePrecision(field) <= 3) { + return this.convertToTimestamp( + columnType, Conversions.toInstantFromMillis((Long) value)); + } + if (getTimePrecision(field) <= 6) { + return this.convertToTimestamp( + columnType, Conversions.toInstantFromMicros((Long) value)); + } + } + return this.convertToTimestamp(columnType, value); + case "TIMESTAMP": + return this.convertToTimestampWithTimezone(columnType, value); + default: + throw new IllegalArgumentException( + "Unknown field type " + columnType.toUpperCase(Locale.ROOT)); + } + } + private Object convertToTimestampWithTimezone(String columnType, Object timestamp) { // In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type. // Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually @@ -174,11 +193,12 @@ private Object convertToTimestampWithTimezone(String columnType, Object timestam ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId); return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter)); } else if (timestamp instanceof OffsetDateTime) { - OffsetDateTime value = (OffsetDateTime) timestamp; + OffsetDateTime value = + ((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime(); return ConvertTimeBceUtil.resolveEra( value.toLocalDate(), value.format(timestampFormatter)); } else if (timestamp instanceof ZonedDateTime) { - ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp; + ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId); return ConvertTimeBceUtil.resolveEra( zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter)); } else if (timestamp instanceof Instant) {