Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.job.cdc.DataSourceConfigKeys;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils;
import org.apache.flink.cdc.debezium.utils.TemporalConversions;
import org.apache.flink.table.data.TimestampData;
Expand Down Expand Up @@ -161,6 +162,7 @@ private String extractAfterRow(Struct value, Schema valueSchema, Set<String> exc
if (!excludeColumns.contains(field.name())) {
Object valueConverted =
convert(
field.name(),
field.schema(),
after.getWithoutDefault(field.name()));
record.put(field.name(), valueConverted);
Expand All @@ -185,6 +187,7 @@ private String extractBeforeRow(Struct value, Schema valueSchema, Set<String> ex
if (!excludeColumns.contains(field.name())) {
Object valueConverted =
convert(
field.name(),
field.schema(),
before.getWithoutDefault(field.name()));
record.put(field.name(), valueConverted);
Expand All @@ -194,7 +197,20 @@ private String extractBeforeRow(Struct value, Schema valueSchema, Set<String> ex
return objectMapper.writeValueAsString(record);
}

private Object convert(Schema fieldSchema, Object dbzObj) {
private Object convert(String fieldName, Schema fieldSchema, Object dbzObj) {
try {
return convertInternal(fieldSchema, dbzObj);
} catch (Exception e) {
String msg =
String.format(
"Failed to convert column '%s' value=%s: %s",
fieldName, dbzObj, ExceptionUtils.getMessage(e));
LOG.error(msg, e);
throw new RuntimeException(msg);
}
}

private Object convertInternal(Schema fieldSchema, Object dbzObj) {
if (dbzObj == null) {
return null;
}
Expand Down Expand Up @@ -307,15 +323,25 @@ private Object convertTimestamp(String typeName, Object dbzObj) {
case Timestamp.SCHEMA_NAME:
return TimestampData.fromEpochMillis((Long) dbzObj).toTimestamp().toString();
case MicroTimestamp.SCHEMA_NAME:
long micro = (long) dbzObj;
return TimestampData.fromEpochMillis(micro / 1000, (int) (micro % 1000 * 1000))
.toTimestamp()
.toString();
{
// floorDiv/floorMod keep nanoOfMillisecond non-negative for pre-1970
// values.
long micro = (long) dbzObj;
long millis = Math.floorDiv(micro, 1000L);
int nanos = (int) Math.floorMod(micro, 1000L) * 1000;
return TimestampData.fromEpochMillis(millis, nanos)
.toTimestamp()
.toString();
}
case NanoTimestamp.SCHEMA_NAME:
long nano = (long) dbzObj;
return TimestampData.fromEpochMillis(nano / 1000_000, (int) (nano % 1000_000))
.toTimestamp()
.toString();
{
long nano = (long) dbzObj;
long millis = Math.floorDiv(nano, 1_000_000L);
int nanos = (int) Math.floorMod(nano, 1_000_000L);
return TimestampData.fromEpochMillis(millis, nanos)
.toTimestamp()
.toString();
}
}
}
LocalDateTime localDateTime = TemporalConversions.toLocalDateTime(dbzObj, serverTimeZone);
Expand Down Expand Up @@ -364,7 +390,7 @@ private Object convertToArray(Schema fieldSchema, Object dbzObj) {
Schema elementSchema = fieldSchema.valueSchema();
List<Object> result = new ArrayList<>();
for (Object element : (List<?>) dbzObj) {
result.add(element == null ? null : convert(elementSchema, element));
result.add(element == null ? null : convertInternal(elementSchema, element));
}
return result;
}
Expand Down
Loading
Loading