-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-6990] Configurable clustering task parallelism #9925
Conversation
@@ -161,6 +161,13 @@ public class HoodieClusteringConfig extends HoodieConfig { | |||
+ "value will let the clustering job run faster, while it will give additional pressure to the " | |||
+ "execution engines to manage more concurrent running jobs."); | |||
|
|||
public static final ConfigProperty<Integer> CLUSTERING_READ_RECORDS_PARALLELISM = ConfigProperty | |||
.key("hoodie.clustering.read.records.parallelism") | |||
.defaultValue(20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe just rename to hoodie.clustering.parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering where does the default 20 come from, should we limit the parallelism by default? Do we have similiar option for compaction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We verify that 20 is a relatively stable parameter running job. The param hoodie.clustering.plan.strategy.target.file.max.bytes is 1g which contains many files, we read it with 20 parallelism is enough and no need to increase executor-memory.
We should limit the parallelism by default, this can reduce the number of tasks to a great extent.
Compaction has a similiar use, it parallelize operations with its size.
// HoodieCompactor.java
return context.parallelize(operations).map(operation -> compact(
compactionHandler, metaClient, config, operation, compactionInstantTime, maxInstantTime, instantRange, taskContextSupplier, executionHelper))
.flatMap(List::iterator);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @boneanxs to take a look~
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @boneanxs This config oodie.clustering.rdd.read.parallelism
is ok? or whether you have better adivce.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the sum of clusteringGroup.getNumOutputFileGroups
for all clustering groups, which is actually the final write parallelism.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about the sum of
clusteringGroup.getNumOutputFileGroups
for all clustering groups, which is actually the final write parallelism.
Not very friendly. The number of all clustering groups is controlled by hoodie.clustering.plan.strategy.max.num.groups
. If we increase this config, the read parallelism will be larger. In our case, we generate 100 groups and the read parallelism will be 200, that is not helpful for reading tasks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mayne hoodie.clustering.group.read.parallelism
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
@hudi-bot run azure re-run the last Azure build |
@hudi-bot run azure |
Azure looks some problems |
You can rebase with the latest master to re-trigger it. |
@hudi-bot run azure |
Change Logs
Spark executes clustering will generate too many tasks which is equal to the the number of files in clustering plan. Support config it when read files.
![image](https://private-user-images.githubusercontent.com/135721692/278271130-92e4b88a-a581-43c5-8776-3df060c726a5.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3MTgzNTA2MzcsIm5iZiI6MTcxODM1MDMzNywicGF0aCI6Ii8xMzU3MjE2OTIvMjc4MjcxMTMwLTkyZTRiODhhLWE1ODEtNDNjNS04Nzc2LTNkZjA2MGM3MjZhNS5wbmc_WC1BbXotQWxnb3JpdGhtPUFXUzQtSE1BQy1TSEEyNTYmWC1BbXotQ3JlZGVudGlhbD1BS0lBVkNPRFlMU0E1M1BRSzRaQSUyRjIwMjQwNjE0JTJGdXMtZWFzdC0xJTJGczMlMkZhd3M0X3JlcXVlc3QmWC1BbXotRGF0ZT0yMDI0MDYxNFQwNzMyMTdaJlgtQW16LUV4cGlyZXM9MzAwJlgtQW16LVNpZ25hdHVyZT0zNGE3NDI4MDM2NDY4NzQ2ZjQyYjlmZmM2N2JjYjU1Nzk5NmQ2ZDk4ZGViZjViMzY3MjVhYjllNTRmNTI3MjY5JlgtQW16LVNpZ25lZEhlYWRlcnM9aG9zdCZhY3Rvcl9pZD0wJmtleV9pZD0wJnJlcG9faWQ9MCJ9.80RlzwBPkCKGmy3v69D4BiQ27y_gRSpwSZvQurlAfDk)
Impact
None
Risk level (write none, low medium or high below)
If medium or high, explain what verification was done to mitigate the risks.
Documentation Update
Describe any necessary documentation update if there is any new feature, config, or user-facing change
ticket number here and follow the instruction to make
changes to the website.
Contributor's checklist