Skip to content

Commit

Permalink
[hotfix] Fix MysqlDebeziumTimeConverter miss timezone convert to time…
Browse files Browse the repository at this point in the history
…stamp
  • Loading branch information
czy006 committed May 27, 2024
1 parent 8e8fd30 commit 90679d4
Showing 1 changed file with 66 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,54 +113,73 @@ private void registerDateConverter(
registration.register(
SchemaBuilder.string().name(schemaName).optional(),
value -> {
log.debug(
"find schema need to change dateType, field name:{} ,field type:{} ,field value:{} ,field "
+ "default:{}",
field.name(),
columnType,
value == null ? "null" : value,
field.hasDefaultValue() ? field.defaultValue() : "null");
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(
columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l =
Math.multiplyExact(
(Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType,
Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
try {
return convertDateObject(field, value, columnType);
} catch (Exception e) {
printConvertDateErrorClassLogs(field, registration, value);
throw new RuntimeException("MysqlDebeziumConverter error", e);
}
});
}

private void printConvertDateErrorClassLogs(
RelationalColumn field,
ConverterRegistration<SchemaBuilder> registration,
Object value) {
boolean useDefaultValueConvert = (value == null);
String fieldName = field.name();
String fieldType = field.typeName().toUpperCase();
String defaultValue = "null";
if (field.hasDefaultValue()) {
if (field.defaultValue() != null) {
defaultValue = field.defaultValue().toString();
}
}
log.warn(
"find schema need to change dateType, field name:||{}|| field type:||{}|| is use default "
+ "convert:||{}|| field default value:||{}|| field charge value fail",
fieldName,
fieldType,
useDefaultValueConvert,
defaultValue);
}

private Object convertDateObject(RelationalColumn field, Object value, String columnType) {
if (value == null) {
return convertDateDefaultValue(field);
}
switch (columnType.toUpperCase(Locale.ROOT)) {
case "DATE":
if (value instanceof Integer) {
return this.convertToDate(columnType, LocalDate.ofEpochDay((Integer) value));
}
return this.convertToDate(columnType, value);
case "TIME":
if (value instanceof Long) {
long l = Math.multiplyExact((Long) value, TimeUnit.MICROSECONDS.toNanos(1));
return this.convertToTime(columnType, LocalTime.ofNanoOfDay(l));
}
return this.convertToTime(columnType, value);
case "DATETIME":
if (value instanceof Long) {
if (getTimePrecision(field) <= 3) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMillis((Long) value));
}
if (getTimePrecision(field) <= 6) {
return this.convertToTimestamp(
columnType, Conversions.toInstantFromMicros((Long) value));
}
}
return this.convertToTimestamp(columnType, value);
case "TIMESTAMP":
return this.convertToTimestampWithTimezone(columnType, value);
default:
throw new IllegalArgumentException(
"Unknown field type " + columnType.toUpperCase(Locale.ROOT));
}
}

private Object convertToTimestampWithTimezone(String columnType, Object timestamp) {
// In snapshot mode, debezium produces a java.sql.Timestamp object for the TIMESTAMPTZ type.
// Conceptually, a timestamp with timezone is an Instant. But t.toInstant() actually
Expand All @@ -174,11 +193,12 @@ private Object convertToTimestampWithTimezone(String columnType, Object timestam
ZonedDateTime zonedDateTime = value.toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(value, zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof OffsetDateTime) {
OffsetDateTime value = (OffsetDateTime) timestamp;
OffsetDateTime value =
((OffsetDateTime) timestamp).toInstant().atZone(zoneId).toOffsetDateTime();
return ConvertTimeBceUtil.resolveEra(
value.toLocalDate(), value.format(timestampFormatter));
} else if (timestamp instanceof ZonedDateTime) {
ZonedDateTime zonedDateTime = (ZonedDateTime) timestamp;
ZonedDateTime zonedDateTime = ((ZonedDateTime) timestamp).toInstant().atZone(zoneId);
return ConvertTimeBceUtil.resolveEra(
zonedDateTime.toLocalDate(), zonedDateTime.format(timestampFormatter));
} else if (timestamp instanceof Instant) {
Expand Down

0 comments on commit 90679d4

Please sign in to comment.