New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-18823][format] Support serialization for debezium-json format #13333
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit f5080f2 (Sun Sep 06 12:11:27 UTC 2020) ✅no warnings Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
|
||
@Override | ||
public SerializationSchema<RowData> createRuntimeEncoder( | ||
DynamicTableSink.Context context, DataType consumedDataType) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indent.
private static final StringData OP_INSERT = StringData.fromString("INSERT"); // insert | ||
private static final StringData OP_DELETE = StringData.fromString("DELETE"); // delete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The insert and delete operation string in debezium should be "c" and "d".
@Override | ||
public byte[] serialize(RowData rowData) { | ||
try { | ||
genericRowData.setField(0, rowData); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong. Debezium serialize insert data in "after", but delete data in "before".
// Debezium JSON contains some other information, e.g. "source", "ts_ms" | ||
// but we don't need them. | ||
return (RowType) DataTypes.ROW( | ||
DataTypes.FIELD("after", databaseSchema), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is wrong. Debezium serialize insert data in "after", but delete data in "before".
} | ||
|
||
@Override | ||
public byte[] serialize(RowData rowData) { | ||
try { | ||
genericRowData.setField(0, rowData); | ||
genericRowData.setField(1, rowKind2String(rowData.getRowKind())); | ||
if (RowKind.INSERT == rowData.getRowKind()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should also consider UPDATE_AFTER
and UPDATE_BEFORE
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updating. LGTM.
What is the purpose of the change
Currently,
debezium-json
format only support deserialization, but not support serialization, which is not convenient for users to writing changelogs to an message queue. The serialization fordebezium-json
could follow the json strcuture of debezium, but should consider currently Flink can't combineUPDATE_BEFORE
andUPDATE_AFTER
into a singleUPDATE
message. This could encodeUPDATE_BEFORE
andUDPATE_AFTER
asDELETE
andINSERT
debezium messages. Therefore, this could support serialization fordebezium-json
formatBrief change log
DebeziumJsonSerializationSchema
which serialization schema from Flink Table/SQL internal data structureRowData
to Debezium JSON.DebeziumJsonFormatFactory
supportscreateEncodingFormat
with creating EncodingFormat forDebeziumJsonSerializationSchema
.Verifying this change
DebeziumJsonSerDeSchemaTest
add test cases forDebeziumJsonSerializationSchema
to verify the serialization schema to Debezium JSON.DebeziumJsonFormatFactoryTest
add test cases for creatingDebeziumJsonSerializationSchema
andDebeziumJsonDeserializationSchema
to verify the serialization and deserialization schema forDebeziumJsonFormatFactory
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation