Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-8354] Add KafkaDeserializationSchema that directly uses ConsumerRecord #7781

Closed

Conversation

aljoscha
Copy link
Contributor

This is a rebased and cleaned-up version of #6615 that only has the Consumer changes.

What is the purpose of the change

  • Add a new deserialization schema that directly uses ConsumerRecord, thereby exposing all available Kafka functionality.

Brief change log

  • First we do some cleanup on the Kafka code
  • Then we introduce the new schema, the existing schema now implements the new schema, for backwards compatibility
  • Consumers and tests are adapted to use the new schema

Verifying this change

  • This is covered by existing tests

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 20, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@aljoscha
Copy link
Contributor Author

cc @alexeyt820
cc @azagrebin

@aljoscha aljoscha force-pushed the pr/6615-rebased-consumer-only branch from 476322f to 529ba9e Compare February 21, 2019 14:07
@aljoscha aljoscha force-pushed the pr/6615-rebased-consumer-only branch 4 times, most recently from 5f13be0 to 363aa94 Compare February 21, 2019 15:22
We now directly use the ConsumerRecord from the Kafka API instead of
trying to forward what we need to the deserialization schema ourselves.

This makes it more future-proof, if Kafka adds new fields to the
ConsumerRecord.

The previously used KeyedDeserializationSchema now extends
KafkaDeserializationSchema and has a default method to bridge the
interface. This way existing uses of KeyedDeserializationSchema still
work.
@aljoscha aljoscha force-pushed the pr/6615-rebased-consumer-only branch from 363aa94 to d39c6c0 Compare February 21, 2019 15:27
@aljoscha
Copy link
Contributor Author

Merged!

@lamberken
Copy link
Member

@aljoscha hi, the java file was placed at wrong package
KafkaDeserializationSchemaWrapper.java

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 5, 2019

@lamber-ken How do you mean?

@lamberken
Copy link
Member

@aljoscha, hi, the mismatch between KafkaDeserializationSchemaWrapper class's package name and file's location. #7902

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 5, 2019

Now I see, yes, we need to fix this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants