Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects #23599

Merged
merged 4 commits into from
Nov 7, 2023

Conversation

1996fanrui
Copy link
Member

@1996fanrui 1996fanrui commented Oct 26, 2023

What is the purpose of the change

The background is similar to FLINK-33315.

A hive table with a lot of data, and the HiveSource#partitionBytes is 281MB. When slotPerTM = 4, one TM will run 4 HiveSources at the same time.

How the TaskExecutor to submit a large task?

  1. TaskExecutor#loadBigData will read all bytes from file to SerializedValue
  • The SerializedValue has a byte[]
  • It will cost the heap memory
  • It will be great than 281 MB, because it not only stores HiveSource#partitionBytes, it also stores other information of TaskInformation.
  1. Generate the TaskInformation from SerializedValue
  • TaskExecutor#submitTask calls the tdd.getSerializedTaskInformation()..deserializeValue()
  • tdd.getSerializedTaskInformation() is SerializedValue
  • It will generate the TaskInformation
  • TaskInformation includes the Configuration taskConfiguration
  • The taskConfiguration includes StreamConfig#SERIALIZEDUDF

Based on the above process, TM memory will have 2 big byte array for each task:

  • The SerializedValue
  • The TaskInformation

When one TM runs 4 HiveSources at the same time, it will have 8 big byte array.

In our production environment, this is also a situation that often leads to TM OOM.

Brief change log

  • [FLINK-33354][runtime][refactor] Refactor ShuffleDescriptorsCache into a generic GroupCache
  • [FLINK-33354][runtime][refactor] serializedJobInformation and taskInfo are never null
  • [FLINK-33354][runtime] Cache TaskInformation and JobInformation to avoid deserializing duplicate big objects
  • [FLINK-33354][runtime] Using the InputStream instead of byte array to avoid contiguous huge memory usage

Verifying this change

Improve the old tests:

  • DefaultGroupCacheTest
  • TaskDeploymentDescriptorTest

Add a new test: DefaultGroupCacheTest#testTaskInformationCache

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature?no

@flinkbot
Copy link
Collaborator

flinkbot commented Oct 26, 2023

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@1996fanrui 1996fanrui marked this pull request as draft October 26, 2023 09:17
@1996fanrui 1996fanrui force-pushed the 33354/Reuse-TaskInformation branch 2 times, most recently from dfcfdbe to 3bdc6b8 Compare October 27, 2023 02:24
@1996fanrui 1996fanrui marked this pull request as ready for review October 30, 2023 07:40
@1996fanrui
Copy link
Member Author

Hi @pnowojski @RocMarshal , would you mind helping take a look this PR in your free time? Thanks a lot

@1996fanrui
Copy link
Member Author

Hi @huwh , would you mind helping take a look this PR in your free time as well?

This improvement is totally similar with FLINK-32386 is contributed by you, and this PR refactor the ShuffleDescriptorsCache into a generic GroupCache, so it would be better if you join this review, thanks a lot!

Copy link
Contributor

@RocMarshal RocMarshal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @1996fanrui for the contribution.
I left a few of comments.
Please let me know what's your opinion~ :)

Comment on lines +90 to +99
cachedBlobKeysPerJob.computeIfPresent(
cacheKey.getGroup(),
(group, keys) -> {
keys.remove(cacheKey);
if (keys.isEmpty()) {
return null;
} else {
return keys;
}
});
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be there a risk of memory leakage here?
For example, let's talk about the situation:

  • There are too many groups
  • Perform the following operations on each of these groups one by one
    • Add a set for one group and then remove set for the same one group, but the key has not been removed. Would there be many Entries in the form of Entry-i<Group-i, set-i>(set-i is empty or null) ?

In short, would cachedBlobKeysPerJob degenerate into a collection with too many elements?

Please correct me if needed for my limited read.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I write a demo on My IDEA, it doesn't have the memory leak. When return null, the map will remove the key.

image image

Comment on lines 199 to +205
if (serializedTaskInformation instanceof NonOffloaded) {
NonOffloaded<TaskInformation> taskInformation =
(NonOffloaded<TaskInformation>) serializedTaskInformation;
return taskInformation.serializedValue;
} else {
throw new IllegalStateException(
"Trying to work with offloaded serialized job information.");
return taskInformation.serializedValue.deserializeValue(getClass().getClassLoader());
}
throw new IllegalStateException(
"Trying to work with offloaded serialized task information.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about

        Preconditions.checkState(
                serializedJobInformation instanceof NonOffloaded,
                 "Trying to work with offloaded serialized job information.");
        NonOffloaded<JobInformation> jobInformation =
                (NonOffloaded<JobInformation>) serializedJobInformation;
        return jobInformation.serializedValue.deserializeValue(getClass().getClassLoader());


?

@huwh huwh self-requested a review November 1, 2023 14:30
Copy link
Contributor

@huwh huwh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @1996fanrui for preparing this PR. LGTM with a minor comment.

Copy link
Contributor

@RocMarshal RocMarshal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update & review.
LGTM +1.

Copy link
Member Author

@1996fanrui 1996fanrui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, merging~

@1996fanrui 1996fanrui merged commit b759794 into apache:master Nov 7, 2023
@1996fanrui 1996fanrui deleted the 33354/Reuse-TaskInformation branch November 7, 2023 06:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants