-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-35791][kafka] add database and table info of canal/debezium json format for kafka sink. #3461
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for @lvyanquan's contribution! Just left some minor comments about JavaDocs.
...main/java/org/apache/flink/cdc/connectors/kafka/json/canal/CanalJsonSerializationSchema.java
Outdated
Show resolved
Hide resolved
...ava/org/apache/flink/cdc/connectors/kafka/json/debezium/DebeziumJsonSerializationSchema.java
Outdated
Show resolved
Hide resolved
@@ -132,6 +132,17 @@ public byte[] serialize(Event event) { | |||
} | |||
|
|||
DataChangeEvent dataChangeEvent = (DataChangeEvent) event; | |||
reuseGenericRowData.setField( | |||
3, StringData.fromString(dataChangeEvent.tableId().getSchemaName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
org.apache.flink.table.data.StringData#fromString
was used to generate binary data from string here, but I noticed that org.apache.flink.cdc.common.data.binary.BinaryStringData#fromString
is more frequently used in CDC code base. Though they're basically the same (CDC version was copied from Flink), is it better if we can stick to one consistent binary encoding algorithm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reuseGenericRowData will be serialized in SerializationSchema, so Flink types were passed here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, thanks for clarification.
Mysql->Kafka. If transform is added, sink schemaName and tableName are obtained, which is not as expected |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lvyanquan Thanks for the PR! LGTM
Could you rebase the latest master branch and run CI again? Will merge after CI passes
Done rebase. |
…json format for Kafka sink (apache#3461)
Refer to https://debezium.io/documentation/reference/1.9/connectors/mysql.html.