-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[pipeline-connector][kafka] add kafka pipeline data sink connector. #2938
Conversation
7e5fd66
to
e571c1a
Compare
e571c1a
to
ac6d68e
Compare
@Shawn-Hx PTAL. |
ac6d68e
to
933f2a7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great contribution. Just left some small comments.
And can log4j2-test.properties be added?
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/pom.xml
Outdated
Show resolved
Hide resolved
Thanks for these advice, addressed it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
b5b18e7
to
aa6210d
Compare
I tested it, but did not get the output result I expected. The column names were changed to f1, f2, and I noticed that the schema change event was skipped in the code. I think it should be configurable here, and I am trying to compile and modify it. I hope to get your help. |
@svea-vip Hi,can you show more about your situation, and what's the output result you expected? |
@lvyanquan |
@lvyanquan I would like to know your pipeline design for Kafka, which does not have independent metadata like a database. How do you plan to implement MetadataAccessor and MetadataApplier for Kafka. |
Can we use Schema Registry as Metadata provider ? |
As Kafka does not have independent metadata like a database, I actually do nothing in MetadataApplier, and skip processing SchemaChangeEvent. |
"The written topic of Kafka will be namespace. SchemaName. TableName string of TableId, this can be changed using route function of pipeline." If there are many tables and one topic is written to each table, too many topics may cause kafka to write randomly. Specifies whether to write a topic, kakfka header record database & table name |
6368ab3
to
17fd5b9
Compare
I've added two options to specify this, PTAL. @melin |
Yes, but I am still considering whether to output and how to output table structure changes. |
17fd5b9
to
07a0e17
Compare
07a0e17
to
39e9d8b
Compare
Rebased to master. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lvyanquan Thanks for the PR! I left some comments
...fka/src/main/java/org/apache/flink/cdc/connectors/kakfa/json/ChangeLogJsonFormatFactory.java
Outdated
Show resolved
Hide resolved
...tor-kafka/src/main/java/org/apache/flink/cdc/connectors/kakfa/sink/KafkaDataSinkOptions.java
Outdated
Show resolved
Hide resolved
440479e
to
b3164ad
Compare
Support add custom key and value to kafka header, Value is a constant value, example: region = hangzhou |
b3164ad
to
25037b1
Compare
25037b1
to
bf51e0c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lvyanquan Thanks for the update! LGTM
This closes #2691.
debeium-json
andcanal-json
.namespace.schemaName.tableName
string of TableId,this can be changed usingroute
function of pipeline.