Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35592] Fix MysqlDebeziumTimeConverter miss timezone convert to timestamp #3332

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -113,54 +113,73 @@ private void registerDateConverter(
registration.register(
SchemaBuilder.string().name(schemaName).optional(),
value -> {
log.debug(
"find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field "
+ "default:{}",
field.name(),
columnType,
value == null ? "null" : value,
field.hasDefaultValue() ? field.defaultValue() : "null");
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(
columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l =
Math.multiplyExact(
(Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
try {
return convertDateObject(field, value, columnType);
} catch (Exception e) {
printConvertDateErrorClassLogs(field, registration, value);
throw new RuntimeException("MysqlDebeziumConverter error", e);
}
});
}

private void printConvertDateErrorClassLogs(
RelationalColumn field,
ConverterRegistration<SchemaBuilder> registration,
Object value) {
boolean useDefaultValueConvert = (value == null);
String fieldName = field.name();
String fieldType = field.typeName().toUpperCase();
String defaultValue = "null";
if (field.hasDefaultValue()) {
if (field.defaultValue() != null) {
defaultValue = field.defaultValue().toString();
}
}
log.warn(
"find schema need to change dateType, field name:||{}|| field type:||{}|| is use default "
+ "convert:||{}|| field default value:||{}|| field charge value fail",
fieldName,
fieldType,
useDefaultValueConvert,
defaultValue);
}

private Object convertDateObject(RelationalColumn field, Object value, String columnType) {
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l = Math.multiplyExact((Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
}
}

private Object convertToTimestampWithTimezone(String columnType, Object timestamp) {
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually
Expand All @@ -174,11 +193,12 @@ private Object convertToTimestampWithTimezone(String columnType, Object timestam
ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof OffsetDateTime) {
OffsetDateTime value = (OffsetDateTime) timestamp;
OffsetDateTime value =
((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime();
return ConvertTimeBceUtil.resolveEra(
value.toLocalDate(), value.format(timestampFormatter));
} else if (timestamp instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp;
ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(
zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof Instant) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ private void testReadDateConvertDataStreamSource(String timezone) throws Excepti

private void validTimestampValue(List<String> result) throws JsonProcessingException {
ObjectMapper mapper = new ObjectMapper();
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00"};
String[] timestampValues = new String[] {"14:23:00", "00:00:00", "00:00:00", "15:04:00"};
for (String after : result) {
JsonNode jsonNode = mapper.readTree(after);
Assert.assertEquals(
Expand Down Expand Up @@ -232,7 +232,8 @@ private void checkData(TableResult tableResult) {
new String[] {
"+I[1, 14:23:00, 2023-04-01 14:24:00, 2023-04-01, 14:25:00]",
"+I[3, 00:00:00, null, null, 00:01:20]",
"+I[2, 00:00:00, null, null, 00:00:00]"
"+I[2, 00:00:00, null, null, 00:00:00]",
"+I[4, 15:04:00, null, null, 00:01:10]"
};

List<String> expectedSnapshotData = new ArrayList<>(Arrays.asList(snapshotForSingleTable));
Expand Down Expand Up @@ -283,7 +284,8 @@ private String buildMySqlConfigWithTimezone(String timezone) {
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
+ "binlog_row_image = FULL\n";
+ "binlog_row_image = FULL\n"
+ "sql_mode = ALLOW_INVALID_DATES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION\n";
String timezoneConf = "default-time_zone = '" + timezone + "'\n";
Files.write(
cnf,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ INSERT INTO date_convert_test (id,test_timestamp, test_datetime, test_date, test
VALUES
(1,'2023-04-01 14:23:00', '2023-04-01 14:24:00', '2023-04-01', '14:25:00'),
(2,'2024-04-23 00:00:00', DEFAULT, NULL ,'00:00:00'),
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120);
(3,'2024-04-23 00:00:00', DEFAULT, NULL ,120),
(4,20240612150400, DEFAULT, NULL ,110);
Loading