-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-20385][canal][json] Allow to read metadata for canal-json format #14464
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit d6ec831 (Fri May 28 07:01:42 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
Hi @SteNicholas , I would like to discuss the metadata keys first. What do you think think just use the keys and types?
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good in general. Could you add a IT case in KafkaChangelogTableITCase
to test kafka+canal-json with metadata accessing?
...ts/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
Outdated
Show resolved
Hide resolved
...ts/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDecodingFormat.java
Outdated
Show resolved
Hide resolved
...k-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonDeserializationSchema.java
Outdated
Show resolved
Hide resolved
@wuchong I have added a IT case in
The name of keys are a little different from yours. What do you think about the above names of keys? |
I'm also fine with that. Will review it tomorrow. |
Btw, could you add documentation for this feature? |
@wuchong I have added the document about the metadata for Canal JSON format. Please help to review the document together. |
...c/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
Outdated
Show resolved
Hide resolved
@wuchong I have kept code spotless with Maven for conflicts resolution and modified the document. Please help to review again. |
...c/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java
Outdated
Show resolved
Hide resolved
I helped to beautify the format. Will merge this once Azure is passed. |
@wuchong Thanks for helping to beautify the format. I have set the time zone for the Canal JSON format metadata |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Nicholas Jiang Jark Wu I found a bug in the canal code. 'canal-json.table.include' does not filter out the binlog of the specified table correctly, which will cause an error in the parsing section. For example, if I want to read the binlog of canal-json.table.include = 'a' table, there is a source field of int in table a, but at this time if table b also has a source field of string, An error will be reported. |
@wangfeigithub thanks for reporting this. Could you create an JIRA issue for this? |
What is the purpose of the change
Currently FLIP-107 supports reading meta from the Debezium format. According to FLIP-107, metadata should support to be exposed for the Canal JSON format.
Brief change log
CanalJsonDeserializationSchema
access and convert those additional fields to metadata columns.Verifying this change
CanalJsonSerDeSchemaTest
addstestDeserializationWithMetadata
to whether deserialization ofCanalJsonDeserializationSchema
could read metadata for the Canal JSON format.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation