Skip to content

Commit

Permalink
[HUDI-5954] Infer cleaning policy based on clean configs (#8238)
Browse files Browse the repository at this point in the history
This commit adds the logic of inferring the cleaning policy ("hoodie.cleaner.policy") based on clean configs. By default, the cleaning policy is determined based on one of the following configs explicitly set by the user (at most one of them can be set; otherwise, KEEP_LATEST_COMMITS cleaning policy is used):

- "hoodie.cleaner.commits.retained": the KEEP_LATEST_COMMITS cleaning policy is used;
- "hoodie.cleaner.hours.retained": the KEEP_LATEST_BY_HOURS cleaning policy is used;
- "hoodie.cleaner.fileversions.retained": the KEEP_LATEST_FILE_VERSIONS cleaning policy is used.

Now setting only one of the configs above automatically switches the cleaning policy. Setting "hoodie.cleaner.policy" is deprecated.
  • Loading branch information
yihua committed Apr 1, 2023
1 parent 6fd885f commit c739701
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
*/
@Immutable
@ConfigClassProperty(name = "Clean Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Cleaning (reclamation of older/unused file groups/slices).")
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Cleaning (reclamation of older/unused file groups/slices).")
public class HoodieCleanConfig extends HoodieConfig {

private static final String CLEANER_COMMITS_RETAINED_KEY = "hoodie.cleaner.commits.retained";
private static final String CLEANER_HOURS_RETAINED_KEY = "hoodie.cleaner.hours.retained";
private static final String CLEANER_FILE_VERSIONS_RETAINED_KEY = "hoodie.cleaner.fileversions.retained";

public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
.key("hoodie.clean.automatic")
.defaultValue("true")
Expand All @@ -59,25 +63,68 @@ public class HoodieCleanConfig extends HoodieConfig {
.withDocumentation("Only applies when " + AUTO_CLEAN.key() + " is turned on. "
+ "When turned on runs cleaner async with writing, which can speed up overall write performance.");

// The cleaner policy config definition has to be before the following configs for inference:
// CLEANER_COMMITS_RETAINED, CLEANER_HOURS_RETAINED, CLEANER_FILE_VERSIONS_RETAINED
@Deprecated
public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
.key("hoodie.cleaner.policy")
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withInferFunction(cfg -> {
boolean isCommitsRetainedConfigured = cfg.contains(CLEANER_COMMITS_RETAINED_KEY);
boolean isHoursRetainedConfigured = cfg.contains(CLEANER_HOURS_RETAINED_KEY);
boolean isFileVersionsRetainedConfigured = cfg.contains(CLEANER_FILE_VERSIONS_RETAINED_KEY);

// If the cleaner policy is not configured, the cleaner policy is inferred only when one
// of the following configs are explicitly configured by the user:
// "hoodie.cleaner.commits.retained" (inferred as KEEP_LATEST_COMMITS)
// "hoodie.cleaner.hours.retained" (inferred as KEEP_LATEST_BY_HOURS)
// "hoodie.cleaner.fileversions.retained" (inferred as KEEP_LATEST_FILE_VERSIONS)
if (isCommitsRetainedConfigured && !isHoursRetainedConfigured && !isFileVersionsRetainedConfigured) {
return Option.of(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name());
}
if (!isCommitsRetainedConfigured && isHoursRetainedConfigured && !isFileVersionsRetainedConfigured) {
return Option.of(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name());
}
if (!isCommitsRetainedConfigured && !isHoursRetainedConfigured && isFileVersionsRetainedConfigured) {
return Option.of(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name());
}
return Option.empty();
})
.withDocumentation("Cleaning policy to be used. The cleaner service deletes older file "
+ "slices files to re-claim space. Long running query plans may often refer to older "
+ "file slices and will break if those are cleaned, before the query has had a chance "
+ "to run. So, it is good to make sure that the data is retained for more than the "
+ "maximum query execution time. "
+ "By default, the cleaning policy is determined based on one of the following configs "
+ "explicitly set by the user (at most one of them can be set; otherwise, "
+ "KEEP_LATEST_COMMITS cleaning policy is used): "
+ "(1) \"hoodie.cleaner.commits.retained\": the KEEP_LATEST_COMMITS cleaning policy is "
+ "used, which keeps the file slices written by the last N commits, determined by "
+ "\"hoodie.cleaner.commits.retained\"; "
+ "(2) \"hoodie.cleaner.hours.retained\": the KEEP_LATEST_BY_HOURS cleaning policy is "
+ "used, which keeps the file slices written in the last N hours based on the commit "
+ "time, determined by \"hoodie.cleaner.hours.retained\"; "
+ "(3) \"hoodie.cleaner.fileversions.retained\": the KEEP_LATEST_FILE_VERSIONS cleaning "
+ "policy is used, which keeps the last N versions of the file slices written, "
+ "determined by \"hoodie.cleaner.fileversions.retained\".");

public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED = ConfigProperty
.key("hoodie.cleaner.commits.retained")
.key(CLEANER_COMMITS_RETAINED_KEY)
.defaultValue("10")
.withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
+ "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");

public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key("hoodie.cleaner.hours.retained")
public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key(CLEANER_HOURS_RETAINED_KEY)
.defaultValue("24")
.withDocumentation("Number of hours for which commits need to be retained. This config provides a more flexible option as"
+ "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
+ " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");

public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
.key("hoodie.cleaner.policy")
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space."
+ " By default, cleaner spares the file slices written by the last N commits, determined by " + CLEANER_COMMITS_RETAINED.key()
+ " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had"
+ " a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time");
public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
.key(CLEANER_FILE_VERSIONS_RETAINED_KEY)
.defaultValue("3")
.withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
+ " the minimum number of file slices to retain in each file group, during cleaning.");

public static final ConfigProperty<String> CLEAN_TRIGGER_STRATEGY = ConfigProperty
.key("hoodie.clean.trigger.strategy")
Expand All @@ -90,12 +137,6 @@ public class HoodieCleanConfig extends HoodieConfig {
.defaultValue("1")
.withDocumentation("Number of commits after the last clean operation, before scheduling of a new clean is attempted.");

public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
.key("hoodie.cleaner.fileversions.retained")
.defaultValue("3")
.withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
+ " the minimum number of file slices to retain in each file group, during cleaning.");

public static final ConfigProperty<String> CLEANER_INCREMENTAL_MODE_ENABLE = ConfigProperty
.key("hoodie.cleaner.incremental.mode")
.defaultValue("true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import java.util.Properties;
import java.util.function.Function;

import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS;
import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_COMMITS;
import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS;
import static org.apache.hudi.config.HoodieArchivalConfig.ASYNC_ARCHIVE;
import static org.apache.hudi.config.HoodieCleanConfig.ASYNC_CLEAN;
import static org.apache.hudi.config.HoodieCleanConfig.AUTO_CLEAN;
Expand Down Expand Up @@ -124,6 +127,80 @@ public void testDefaultMarkersTypeAccordingToEngineType() {
EngineType.JAVA, MarkerType.DIRECT));
}

@Test
public void testInferCleaningPolicy() {
// If no clean configs are set,
// use KEEP_LATEST_COMMITS cleaning policy
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.build())
.build();
assertEquals(KEEP_LATEST_COMMITS, writeConfig.getCleanerPolicy());

// If "hoodie.cleaner.commits.retained" is set only,
// use KEEP_LATEST_COMMITS cleaning policy
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.retainCommits(10)
.build())
.build();
assertEquals(KEEP_LATEST_COMMITS, writeConfig.getCleanerPolicy());

// If "hoodie.cleaner.hours.retained" is set only,
// use KEEP_LATEST_BY_HOURS cleaning policy
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.cleanerNumHoursRetained(96)
.build())
.build();
assertEquals(KEEP_LATEST_BY_HOURS, writeConfig.getCleanerPolicy());

// If "hoodie.cleaner.fileversions.retained" is set only,
// use KEEP_LATEST_FILE_VERSIONS cleaning policy
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.retainFileVersions(2)
.build())
.build();
assertEquals(KEEP_LATEST_FILE_VERSIONS, writeConfig.getCleanerPolicy());

// If multiple clean configs are set and the cleaning policy is not set,
// use KEEP_LATEST_COMMITS cleaning policy (no inference)
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.cleanerNumHoursRetained(96)
.retainFileVersions(2)
.build())
.build();
assertEquals(KEEP_LATEST_COMMITS, writeConfig.getCleanerPolicy());

// If the cleaning policy is explicitly set, use the configured policy
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(KEEP_LATEST_BY_HOURS)
.retainFileVersions(2)
.build())
.build();
assertEquals(KEEP_LATEST_BY_HOURS, writeConfig.getCleanerPolicy());

writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(KEEP_LATEST_BY_HOURS)
.retainCommits(10)
.cleanerNumHoursRetained(96)
.retainFileVersions(2)
.build())
.build();
assertEquals(KEEP_LATEST_BY_HOURS, writeConfig.getCleanerPolicy());
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType tableType) {
Expand Down

0 comments on commit c739701

Please sign in to comment.