Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ public final class RowDataDebeziumDeserializeSchema

private static final DateTimeFormatter timeFormatter = DateTimeFormatter.ISO_TIME;

private static final DateTimeFormatter timestampFormatter = DateTimeFormatter.ofPattern(
"yyyy-MM-dd HH:mm:ss");
private static final ZoneId ZONE_UTC = ZoneId.of("UTC");

/**
* TypeInformation of the produced {@link RowData}. *
Expand Down Expand Up @@ -691,7 +690,7 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
if (schemaName != null) {
// normal type doesn't have schema name
// schema names are time schemas
fieldValue = getTimeValue(fieldValue, schemaName);
fieldValue = getValueWithSchema(fieldValue, schemaName);
}
data.put(fieldName, fieldValue);
}
Expand All @@ -714,6 +713,7 @@ public Object convert(Object dbzObj, Schema schema, TableChange tableSchema) thr
String fieldName = field.name();
Object fieldValue = struct.getWithoutDefault(fieldName);
Schema fieldSchema = schema.field(fieldName).schema();
String schemaName = fieldSchema.name();

// struct type convert normal type
if (fieldValue instanceof Struct) {
Expand All @@ -732,6 +732,9 @@ public Object convert(Object dbzObj, Schema schema, TableChange tableSchema) thr
fieldValue = ((TimestampData) fieldValue).toTimestamp();
}
}
if (schemaName != null) {
fieldValue = getValueWithSchema(fieldValue, schemaName);
}
if (fieldValue instanceof ByteBuffer) {
fieldValue = new String(((ByteBuffer) fieldValue).array());
}
Expand All @@ -748,33 +751,28 @@ public Object convert(Object dbzObj, Schema schema, TableChange tableSchema) thr
}

/**
* transform debezium time format to database format
* extract the data with the format provided by debezium
*
* @param fieldValue
* @param schemaName
* @return
* @return the extracted data with schema
*/
private Object getTimeValue(Object fieldValue, String schemaName) {
switch (schemaName) {
case MicroTime.SCHEMA_NAME:
Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant, serverTimeZone));
break;
case Date.SCHEMA_NAME:
fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
break;
case ZonedTimestamp.SCHEMA_NAME:
ZonedDateTime zonedDateTime = ZonedDateTime.parse((CharSequence) fieldValue);
fieldValue = timestampFormatter.format(zonedDateTime
.withZoneSameInstant(serverTimeZone).toLocalDateTime());
break;
case Timestamp.SCHEMA_NAME:
Instant instantTime = Instant.ofEpochMilli((Long) fieldValue);
fieldValue = timestampFormatter.format(LocalDateTime.ofInstant(instantTime,
serverTimeZone));
break;
default:
LOG.error("parse schema {} error", schemaName);
private Object getValueWithSchema(Object fieldValue, String schemaName) {
if (fieldValue == null) {
return null;
}
if (MicroTime.SCHEMA_NAME.equals(schemaName)) {
Instant instant = Instant.ofEpochMilli((Long) fieldValue / 1000);
fieldValue = timeFormatter.format(LocalDateTime.ofInstant(instant, ZONE_UTC));
} else if (Date.SCHEMA_NAME.equals(schemaName)) {
fieldValue = dateFormatter.format(LocalDate.ofEpochDay((Integer) fieldValue));
} else if (ZonedTimestamp.SCHEMA_NAME.equals(schemaName)) {
ZonedDateTime zonedDateTime = ZonedDateTime.parse((CharSequence) fieldValue);
fieldValue = zonedDateTime.withZoneSameInstant(serverTimeZone).toLocalDateTime()
.atZone(ZONE_UTC).format(DateTimeFormatter.ISO_INSTANT);
} else if (Timestamp.SCHEMA_NAME.equals(schemaName)) {
Instant instantTime = Instant.ofEpochMilli((Long) fieldValue);
fieldValue = LocalDateTime.ofInstant(instantTime, ZONE_UTC).toString();
}
return fieldValue;
}
Expand Down