From 9a91737c70fed2d962fb6e4b18897bb0cab5279d Mon Sep 17 00:00:00 2001 From: yunqingmoswu Date: Wed, 1 Mar 2023 17:15:58 +0800 Subject: [PATCH 1/3] [INLONG-7477][Sort] Fix the metadata of table write error for canal-json --- .../CanalJsonEnhancedSerializationSchema.java | 23 ++++++++++++++++--- 1 file changed, 20 insertions(+), 3 deletions(-) diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java index 754f4de96f5..a0230e20841 100644 --- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java +++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java @@ -65,6 +65,10 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema */ private final RowType jsonRowType; private transient GenericRowData reuse; + /** + * The index in writeableMetadata of {@link WriteableMetadata#TYPE} + */ + private final int typeIndex; /** * Constructor of CanalJsonEnhancedSerializationSchema. @@ -78,6 +82,7 @@ public CanalJsonEnhancedSerializationSchema( boolean encodeDecimalAsPlainNumber) { final List physicalChildren = physicalDataType.getLogicalType().getChildren(); this.jsonRowType = createJsonRowType(physicalDataType, writeableMetadata); + typeIndex = writeableMetadata.indexOf(WriteableMetadata.TYPE); this.physicalFieldGetter = IntStream.range(0, physicalChildren.size()) .mapToObj(targetField -> RowData.createFieldGetter(physicalChildren.get(targetField), targetField)) .toArray(RowData.FieldGetter[]::new); @@ -122,9 +127,16 @@ private static RowType createJsonRowType(DataType physicalDataType, List reuse.setField(2 + targetField, - wirteableMetadataFieldGetter[targetField].getFieldOrNull(row))); + .forEach(metaIndex -> { + if (metaIndex < typeIndex) { + reuse.setField(metaIndex + 2, wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)); + } else if (metaIndex > typeIndex) { + reuse.setField(metaIndex + 1, wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)); + } + }); return jsonSerializer.serialize(reuse); } catch (Throwable t) { throw new RuntimeException("Could not serialize row '" + row + "'.", t); From dac2c9ff1069ad815d277cecbaded2cfec974405 Mon Sep 17 00:00:00 2001 From: yunqingmoswu Date: Wed, 1 Mar 2023 18:43:09 +0800 Subject: [PATCH 2/3] [INLONG-7477][Sort] Fix unit test error --- .../CanalJsonEnhancedSerializationSchema.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java index a0230e20841..6a228a5cb21 100644 --- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java +++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java @@ -64,11 +64,11 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema * row schema that json serializer can parse output row to json format */ private final RowType jsonRowType; - private transient GenericRowData reuse; /** * The index in writeableMetadata of {@link WriteableMetadata#TYPE} */ private final int typeIndex; + private transient GenericRowData reuse; /** * Constructor of CanalJsonEnhancedSerializationSchema. @@ -151,14 +151,22 @@ public byte[] serialize(RowData row) { // mete data injection StringData opType = rowKind2String(row.getRowKind()); reuse.setField(1, opType); - IntStream.range(0, wirteableMetadataFieldGetter.length) - .forEach(metaIndex -> { - if (metaIndex < typeIndex) { - reuse.setField(metaIndex + 2, wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)); - } else if (metaIndex > typeIndex) { - reuse.setField(metaIndex + 1, wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)); - } - }); + if (typeIndex != -1) { + IntStream.range(0, wirteableMetadataFieldGetter.length) + .forEach(metaIndex -> { + if (metaIndex < typeIndex) { + reuse.setField(metaIndex + 2, + wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)); + } else if (metaIndex > typeIndex) { + reuse.setField(metaIndex + 1, + wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)); + } + }); + } else { + IntStream.range(0, wirteableMetadataFieldGetter.length) + .forEach(metaIndex -> reuse + .setField(metaIndex + 2, wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row))); + } return jsonSerializer.serialize(reuse); } catch (Throwable t) { throw new RuntimeException("Could not serialize row '" + row + "'.", t); From a997b2f288be703ca4958ee335be1424dd69af70 Mon Sep 17 00:00:00 2001 From: yunqingmoswu Date: Thu, 2 Mar 2023 10:44:34 +0800 Subject: [PATCH 3/3] [INLONG-7477][Sort] Add some java docs for this --- .../canal/CanalJsonEnhancedSerializationSchema.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java index 6a228a5cb21..aa1bc7a75b0 100644 --- a/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java +++ b/inlong-sort/sort-formats/format-json/src/main/java/org/apache/inlong/sort/formats/json/canal/CanalJsonEnhancedSerializationSchema.java @@ -125,6 +125,17 @@ private static RowType createJsonRowType(DataType physicalDataType, List