Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-31049] [flink-connector-kafka]Add support for Kafka record headers to KafkaSink #22228

Conversation

AlexAxeman
Copy link

What is the purpose of the change

The default org.apache.flink.connector.kafka.sink.KafkaSink does not support adding Kafka record headers when using KafkaRecordSerializationSchemaBuilder, which is the most convenient way to create a Kafka sink. This PR adds support for Kafka headers to KafkaRecordSerializationSchemaBuilder.

Brief change log

  • Implemented a HeaderProducer that allows creating Headers from the input element
  • Added setters to KafkaRecordSerializationSchemaBuilder to allow setting a HeaderProducer
  • Added an optional HeaderProducer constructor argument to KafkaRecordSerializationSchemaWrapper that now uses a ProducerRecord constructor that includes the headers.

Verifying this change

Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing

This change added tests and can be verified as follows:

  • Added tests to KafkaRecordSerializationSchemaBuilderTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes?
  • If yes, how is the feature documented? JavaDocs

@flinkbot
Copy link
Collaborator

flinkbot commented Mar 20, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@MartijnVisser
Copy link
Contributor

@AlexAxeman Given that https://issues.apache.org/jira/browse/FLINK-30859 will be worked on shortly, could you open this PR towards apache/flink-connector-kafka:main

@tzulitai
Copy link
Contributor

Thanks for opening the PR @AlexAxeman! After you reopen the PR against apache/flink-connector-kafka:main, please feel free to ping me for a review on the change.

@davidradl
Copy link

@tzulitai @MartijnVisser I see #22797 , this pr is no longer relevant to this repository, I suggest closing this pr and porting the change to the new Flink connector kafka repo if it is still relevant.

@AlexAxeman
Copy link
Author

It was implemented in the new repo in this PR.

@AlexAxeman AlexAxeman closed this Oct 4, 2023
@AlexAxeman AlexAxeman deleted the add-recordheader-support-to-KafkaRecordSerializationSchemaBuilder branch October 4, 2023 14:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants