diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java index 754f4de96f5..aa1bc7a75b0 100644 --- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java +++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java @@ -64,6 +64,10 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema * row schema that json serializer can parse output row to json format */ private final RowType jsonRowType; + /** + * The index in writeableMetadata of {@link WriteableMetadata#TYPE} + */ + private final int typeIndex; private transient GenericRowData reuse; /** @@ -78,6 +82,7 @@ public CanalJsonEnhancedSerializationSchema( boolean encodeDecimalAsPlainNumber) { final List physicalChildren = physicalDataType.getLogicalType().getChildren(); this.jsonRowType = createJsonRowType(physicalDataType, writeableMetadata); + typeIndex = writeableMetadata.indexOf(WriteableMetadata.TYPE); this.physicalFieldGetter = IntStream.range(0, physicalChildren.size()) .mapToObj(targetField -> RowData.createFieldGetter(physicalChildren.get(targetField), targetField)) .toArray(RowData.FieldGetter[]::new); @@ -120,11 +125,29 @@ private static RowType createJsonRowType(DataType physicalDataType, List reuse.setField(2 + targetField, - wirteableMetadataFieldGetter[targetField].getFieldOrNull(row))); + if (typeIndex != -1) { + IntStream.range(0, wirteableMetadataFieldGetter.length) + .forEach(metaIndex -> { + if (metaIndex < typeIndex) { + reuse.setField(metaIndex + 2, + wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)); + } else if (metaIndex > typeIndex) { + reuse.setField(metaIndex + 1, + wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)); + } + }); + } else { + IntStream.range(0, wirteableMetadataFieldGetter.length) + .forEach(metaIndex -> reuse + .setField(metaIndex + 2, wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row))); + } return jsonSerializer.serialize(reuse); } catch (Throwable t) { throw new RuntimeException("Could not serialize row '" + row + "'.", t);