Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-20379][Connector/Kafka] Improve the KafkaRecordDesrializer interface #14303

Closed
wants to merge 6 commits into from

Conversation

becketqin
Copy link
Contributor

What is the purpose of the change

The current KafkaRecordDesrializer has the following problems:

  1. The existing DesrializationSchema and legacy KafkaDeserializationSchema implementations cannot be reused.
  2. Missing an open() method with context for serialization and deserialization.

This patch fixes the above issues by doing the following:

  • Add a getUserCodeClassLoader() method to SourceReaderContext so the SourceReader implementation can construct the SerializationDeserializationContext.
  • Add the bridge method to facilitate reuse of the DeserializationSchema and legacy KafkaDeserializationSchema.
  • Rename KafkaRecordDeserializer to KafkaRecordDeserializationSchema to follow the naming convention.

Brief change log

  • Add a getUserCodeClassLoader() method to SourceReaderContext so the SourceReader implementation can construct the SerializationDeserializationContext.
  • Add the bridge method to facilitate reuse of the DeserializationSchema and legacy KafkaDeserializationSchema.

Verifying this change

Unit tests have been added to KafkaRecordDeserializationSchemaTest to verify the change in KafkaRecordDeserializationSchema

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (yes)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (Java doc)

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 3, 2020

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 2d0641f (Fri May 28 08:59:08 UTC 2021)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!
  • This pull request references an unassigned Jira ticket. According to the code contribution guide, tickets need to be assigned before starting with the implementation work.

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • ❓ 1. The [description] looks good.
  • ❓ 2. There is [consensus] that the contribution should go into to Flink.
  • ❓ 3. Needs [attention] from.
  • ❓ 4. The change fits into the overall [architecture].
  • ❓ 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 3, 2020

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run travis re-run the last Travis build
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@StephanEwen StephanEwen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for one issue (incompatible cast), this looks okay.
There are two thoughts for improvements below. I think the first one is easy, the second one more of a follow-up:

Discoverability

For discoverability for users, should be add methods like KafkaSourceBuilder.setDeserializer(KafkaDeserializationSchema) and KafkaSourceBuilder.setValueOnlyDeserializer(DeserializationSchema)? It might be easier to find than to understand how to wrap one schema into another. Also, we keep the fact that this is done by wrapping an internal detail, and can for example decide to later change which one wrap which.

Move Deserialization From SplitReader to RecordEmitter

As a general design question for the Kafka Source: Why are the serializers running in the fetcher, and not in the RecordEmitter.
It looks as if everything would become much simpler by putting the deserialization logic into the RecordEmitter:

  • You don't need to create the Tuple3 and extra Longs for offset and timestamp (fewer objects).
  • You dont need a collector that first collects the elements, then flattens them into the RecordsWithSplitIDs.
  • You can wrap the ConsumerRecords as a RecordsWithSplitIds object. You could save all the code and cycles that move records from one map (in ConsumerRecords) into another map (in KafkaPartitionSplitRecords).
  • We could then also open up the KafkaRecordDeserializer to accept a SourceOutput instead of a Collector if we want to give users more flexibility.

* @param <T> the type of the deserialized records.
*/
class KafkaDeserializationSchemaWrapper<T> implements KafkaRecordDeserializer<T> {
private static final long serialVersionUID = 3239435655135705790L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per the style guide, please start the serialVersionUID at 1L.
https://flink.apache.org/contributing/code-style-and-quality-java.html#java-serialization

if (deserializer == null) {
deserializer = (Deserializer<T>) InstantiationUtil.instantiate(
try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(userCodeClassLoader)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this initialization be moved into the open method?
If not, could be good to move the init logic to separate method, to keep the main method code paths small, for JIT friendlyness.

import java.util.Map;

/**
* A package private class to wrap {@link Deserializer}.
*/
class ValueDeserializerWrapper<T> implements KafkaRecordDeserializer<T> {
class KafkaValueOnlyDeserializerWrapper<T> implements KafkaRecordDeserializer<T> {
private static final long serialVersionUID = 5409547407386004054L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serialVersionUID should start at 1L, see above.

* @param <T> the return type of the deserialization.
*/
class KafkaValueOnlyDeserializationSchemaWrapper<T> implements KafkaRecordDeserializer<T> {
private static final long serialVersionUID = -3962448817248263667L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serialVersionUID should start at 1L, see above.


@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a cast between incompatible types.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can probably reuse some parts from RuntimeContextInitializationContextAdapters here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in a later commit. The change should be put here. I'll rebase the commits.

I did look into RuntimeContextInitializationContextAdapters. Please see the other reply for the reason why it was not used here.

@@ -197,7 +195,20 @@ public void sendSourceEventToCoordinator(SourceEvent event) {

@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();
return new UserCodeClassLoader() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to reuse code from RuntimeContextInitializationContextAdapters here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my first attempt as well. But it seems a little ugly for two reasons:

  1. The RuntimeContextInitializationContextAdapters is itself a class converting RuntimeContext to InitializationContext. The scope and usage of that class is quite clear. The RuntimeContextUserCodeClassLoaderAdapter is a nested private class. Making only that class public seems a little weird.
  2. The RuntimeContextInitializationContextAdapters is affiliated with DeserializationSchema which is something that the SourceOperator should not be aware of.

Another 2 options are:

  1. Move the RuntimeContextUserCodeClassLoaderAdapter to a separate class.
  2. Add a new method to RuntimeContext which returns an actual UserCodeClassLoader instead of letting the caller construct by themselves.

Both approach seems a little unnecessary here because the inline class is quite simple.

Copy link
Contributor Author

@becketqin becketqin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@StephanEwen Thanks for the review and suggestion. Please see the reply below.

Discoverability.

Having a setValueOnlyDeserializer(DeserializationSchema) makes sense. I'll update the patch to add that.

Move Deserialization From SplitReader to RecordEmitter.

The most important concern about moving the deserialization to RecordEmitter is performance impact. Deserialization could be quite expensive in some cases. The RecordEmitter runs in the mailbox main thread, if we move the deserialization there, the throughput may drop significantly as there is just one thread dealing with deserialization.

The "N consuming-and-deserialization threads + 1 shared queue" is a proven pattern in many Kafka applications. While we only have one SplitFetcher thread in the KafkaSource at this point, we will likely support multiple fetchers in the next release. This allows users to address the deserialization bottleneck in a more light-weighted manner compared with increasing the parallelism of the Source.


@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was fixed in a later commit. The change should be put here. I'll rebase the commits.

I did look into RuntimeContextInitializationContextAdapters. Please see the other reply for the reason why it was not used here.

@@ -197,7 +195,20 @@ public void sendSourceEventToCoordinator(SourceEvent event) {

@Override
public UserCodeClassLoader getUserCodeClassLoader() {
return (UserCodeClassLoader) getRuntimeContext().getUserCodeClassLoader();
return new UserCodeClassLoader() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was my first attempt as well. But it seems a little ugly for two reasons:

  1. The RuntimeContextInitializationContextAdapters is itself a class converting RuntimeContext to InitializationContext. The scope and usage of that class is quite clear. The RuntimeContextUserCodeClassLoaderAdapter is a nested private class. Making only that class public seems a little weird.
  2. The RuntimeContextInitializationContextAdapters is affiliated with DeserializationSchema which is something that the SourceOperator should not be aware of.

Another 2 options are:

  1. Move the RuntimeContextUserCodeClassLoaderAdapter to a separate class.
  2. Add a new method to RuntimeContext which returns an actual UserCodeClassLoader instead of letting the caller construct by themselves.

Both approach seems a little unnecessary here because the inline class is quite simple.

@StephanEwen
Copy link
Contributor

On the "Move Deserialization From SplitReader to RecordEmitter" discussion:

I would be interested in seeing how this impacts performance. The current Kafka connector runs deserialization in the processing thread, so we could use this as a baseline for performance comparison.

Right now, the new Kafka connector does strictly more work (in CPU cycles) with its wrapping and re-packaging of records, compared to the old connector, or an approach that deserializes in the RecordEmitter.
The assumption that this is faster can only hold if the actual processing is not sufficiently parallelized (there are unused cores, so it is better to do something less efficient where more threads can be used).

To my knowledge, any setup with sufficient parallelization across partitions wins over setups which try to parallelize across pipeline stages: Data parallelism always beats pipeline parallelism on modern hardware. So if you have spare CPU cores, increasing the parallelism of the operator (and slots per TM) is more efficient than trying to spread work across more threads and stages. Otherwise, it would not be a good idea to chain operators into tasks, but we should have a thread per operator.

@becketqin
Copy link
Contributor Author

I think you are right. At the high level, the performance would be the best all the CPU cores a busy and they do not do unnecessary work.

In the ideal case, there are N dedicated main threads, where N == number of CPU cores, so no computing resource is idle. These main threads will only be "interrupted" by IO, which means there are more records to be handed over to the main threads for processing. Async IO would be beneficial so that can be done in the main thread without interruption or context switch at all. We can achieve this in KafkaSource because KafkaConsumer is designed to be non-blocking.

The only potential problem I can think of is the overhead of increasing the parallelism. e.g. more memory footprint, more IO buffer, etc.

And I think you are also right about the assumption based on which more deserialization threads works. For most streaming systems in production, actually there are spare CPU resources. And increasing the parallelism is usually done by adding a new JVM instance which could be expensive. So adding more deserialization thread helps.

Add bridge methods to the existing KafkaDeserializationSchema implementations.
…aRecordDeserializationSchema to follow the naming convention.
…serializer(DeserializationSchema) to KafkaSourceBuilder.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants