Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ public class CanalJsonEnhancedSerializationSchema implements SerializationSchema
* row schema that json serializer can parse output row to json format
*/
private final RowType jsonRowType;
/**
* The index in writeableMetadata of {@link WriteableMetadata#TYPE}
*/
private final int typeIndex;
private transient GenericRowData reuse;

/**
Expand All @@ -78,6 +82,7 @@ public CanalJsonEnhancedSerializationSchema(
boolean encodeDecimalAsPlainNumber) {
final List<LogicalType> physicalChildren = physicalDataType.getLogicalType().getChildren();
this.jsonRowType = createJsonRowType(physicalDataType, writeableMetadata);
typeIndex = writeableMetadata.indexOf(WriteableMetadata.TYPE);
this.physicalFieldGetter = IntStream.range(0, physicalChildren.size())
.mapToObj(targetField -> RowData.createFieldGetter(physicalChildren.get(targetField), targetField))
.toArray(RowData.FieldGetter[]::new);
Expand Down Expand Up @@ -120,11 +125,29 @@ private static RowType createJsonRowType(DataType physicalDataType, List<Writeab
return (RowType) DataTypeUtils.appendRowFields(root, metadataFields).getLogicalType();
}

/**
* Init for this serialization
* In this method, it initializes {@link this#reuse}, the size of the {@link this#reuse} will be
* length of physicalFields add the length of metadata fields.Here we put the physical field into a array whose key
* is 'data', and put it in the zeroth element of the {@link this#reuse}, and put the {@link WriteableMetadata#TYPE}
* in the first element of the {@link this#reuse},so when the metadata field does not contain
* {@link WriteableMetadata#TYPE}, it's size is two + the number of metadata fields, when included, it's size is
* one + the number of metadata fields
*
* @param context The context used for initialization
*/
@Override
public void open(InitializationContext context) {
reuse = new GenericRowData(2 + wirteableMetadataFieldGetter.length);
int size = 2 + wirteableMetadataFieldGetter.length;
if (typeIndex != -1) {
size--;
}
reuse = new GenericRowData(size);
}

/**
* Serialize the row with ignore the {@link WriteableMetadata#TYPE}
*/
@Override
public byte[] serialize(RowData row) {
try {
Expand All @@ -139,9 +162,22 @@ public byte[] serialize(RowData row) {
// mete data injection
StringData opType = rowKind2String(row.getRowKind());
reuse.setField(1, opType);
IntStream.range(0, wirteableMetadataFieldGetter.length)
.forEach(targetField -> reuse.setField(2 + targetField,
wirteableMetadataFieldGetter[targetField].getFieldOrNull(row)));
if (typeIndex != -1) {
IntStream.range(0, wirteableMetadataFieldGetter.length)
.forEach(metaIndex -> {
if (metaIndex < typeIndex) {
reuse.setField(metaIndex + 2,
wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row));
} else if (metaIndex > typeIndex) {
reuse.setField(metaIndex + 1,
wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row));
}
});
} else {
IntStream.range(0, wirteableMetadataFieldGetter.length)
.forEach(metaIndex -> reuse
.setField(metaIndex + 2, wirteableMetadataFieldGetter[metaIndex].getFieldOrNull(row)));
}
return jsonSerializer.serialize(reuse);
} catch (Throwable t) {
throw new RuntimeException("Could not serialize row '" + row + "'.", t);
Expand Down