Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5926] Improve cleaner parallelism #8171

Merged
merged 1 commit into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand Down Expand Up @@ -120,7 +121,17 @@ public class HoodieCleanConfig extends HoodieConfig {
public static final ConfigProperty<String> CLEANER_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.cleaner.parallelism")
.defaultValue("200")
.withDocumentation("Parallelism for the cleaning operation. Increase this if cleaning becomes slow.");
.withDocumentation("This config controls the behavior of both the cleaning plan and "
+ "cleaning execution. Deriving the cleaning plan is parallelized at the table "
+ "partition level, i.e., each table partition is processed by one Spark task to figure "
+ "out the files to clean. The cleaner picks the configured parallelism if the number "
+ "of table partitions is larger than this configured value. The parallelism is "
+ "assigned to the number of table partitions if it is smaller than the configured value. "
+ "The clean execution, i.e., the file deletion, is parallelized at file level, which "
+ "is the unit of Spark task distribution. Similarly, the actual parallelism cannot "
+ "exceed the configured value if the number of files is larger. If cleaning plan or "
+ "execution is slow due to limited parallelism, you can increase this to tune the "
+ "performance..");

public static final ConfigProperty<Boolean> ALLOW_MULTIPLE_CLEANS = ConfigProperty
.key("hoodie.clean.allow.multiple")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,11 @@ private static Stream<Pair<String, PartitionCleanStat>> deleteFilesFunc(Iterator
*/
List<HoodieCleanStat> clean(HoodieEngineContext context, HoodieCleanerPlan cleanerPlan) {
int cleanerParallelism = Math.min(
(int) (cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).count()),
cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().mapToInt(List::size).sum(),
config.getCleanerParallelism());
LOG.info("Using cleanerParallelism: " + cleanerParallelism);

context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of partitions: " + config.getTableName());
context.setJobStatus(this.getClass().getSimpleName(), "Perform cleaning of table: " + config.getTableName());

Stream<Pair<String, CleanFileInfo>> filesToBeDeletedPerPartition =
cleanerPlan.getFilePathsToBeDeletedPerPartition().entrySet().stream()
Expand Down