Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5954] Infer cleaning policy based on clean configs #8238

Merged
merged 2 commits into from
Apr 1, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@
*/
@Immutable
@ConfigClassProperty(name = "Clean Configs",
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Cleaning (reclamation of older/unused file groups/slices).")
groupName = ConfigGroups.Names.WRITE_CLIENT,
description = "Cleaning (reclamation of older/unused file groups/slices).")
public class HoodieCleanConfig extends HoodieConfig {

private static final String CLEANER_COMMITS_RETAINED_KEY = "hoodie.cleaner.commits.retained";
private static final String CLEANER_HOURS_RETAINED_KEY = "hoodie.cleaner.hours.retained";
private static final String CLEANER_FILE_VERSIONS_RETAINED_KEY = "hoodie.cleaner.fileversions.retained";

public static final ConfigProperty<String> AUTO_CLEAN = ConfigProperty
.key("hoodie.clean.automatic")
.defaultValue("true")
Expand All @@ -59,25 +63,67 @@ public class HoodieCleanConfig extends HoodieConfig {
.withDocumentation("Only applies when " + AUTO_CLEAN.key() + " is turned on. "
+ "When turned on runs cleaner async with writing, which can speed up overall write performance.");

// The cleaner policy config definition has to be before the following configs for inference:
// CLEANER_COMMITS_RETAINED, CLEANER_HOURS_RETAINED, CLEANER_FILE_VERSIONS_RETAINED
public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
.key("hoodie.cleaner.policy")
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withInferFunction(cfg -> {
boolean isCommitsRetainedConfigured = cfg.contains(CLEANER_COMMITS_RETAINED_KEY);
boolean isHoursRetainedConfigured = cfg.contains(CLEANER_HOURS_RETAINED_KEY);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm so confused by these options, does the option hoodie.cleaner.policy make any sense here? If all the specific cleaning param: hoodie.cleaner.commits.retained, hoodie.cleaner.hours.retained, hoodie.cleaner.fileversions.retained all have detemistic policy, then this option should be eliminated.

For example, can we use a combination like HoodieCleaningPolicy.KEEP_LATEST_COMMITS policy and hoodie.cleaner.fileversions.retained, if not, introduce the redundant option key hoodie.cleaner.policy is totally unnecessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I marked it as deprecated.

boolean isFileVersionsRetainedConfigured = cfg.contains(CLEANER_FILE_VERSIONS_RETAINED_KEY);

// If the cleaner policy is not configured, the cleaner policy is inferred only when one
// of the following configs are explicitly configured by the user:
// "hoodie.cleaner.commits.retained" (inferred as KEEP_LATEST_COMMITS)
// "hoodie.cleaner.hours.retained" (inferred as KEEP_LATEST_BY_HOURS)
// "hoodie.cleaner.fileversions.retained" (inferred as KEEP_LATEST_FILE_VERSIONS)
if (isCommitsRetainedConfigured && !isHoursRetainedConfigured && !isFileVersionsRetainedConfigured) {
return Option.of(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name());
}
if (!isCommitsRetainedConfigured && isHoursRetainedConfigured && !isFileVersionsRetainedConfigured) {
return Option.of(HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS.name());
}
if (!isCommitsRetainedConfigured && !isHoursRetainedConfigured && isFileVersionsRetainedConfigured) {
return Option.of(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name());
}
return Option.empty();
})
.withDocumentation("Cleaning policy to be used. The cleaner service deletes older file "
+ "slices files to re-claim space. Long running query plans may often refer to older "
+ "file slices and will break if those are cleaned, before the query has had a chance "
+ "to run. So, it is good to make sure that the data is retained for more than the "
+ "maximum query execution time. "
+ "By default, the cleaning policy is determined based on one of the following configs "
+ "explicitly set by the user (at most one of them can be set; otherwise, "
+ "KEEP_LATEST_COMMITS cleaning policy is used): "
+ "(1) \"hoodie.cleaner.commits.retained\": the KEEP_LATEST_COMMITS cleaning policy is "
+ "used, which keeps the file slices written by the last N commits, determined by "
+ "\"hoodie.cleaner.commits.retained\"; "
+ "(2) \"hoodie.cleaner.hours.retained\": the KEEP_LATEST_BY_HOURS cleaning policy is "
+ "used, which keeps the file slices written in the last N hours based on the commit "
+ "time, determined by \"hoodie.cleaner.hours.retained\"; "
+ "(3) \"hoodie.cleaner.fileversions.retained\": the KEEP_LATEST_FILE_VERSIONS cleaning "
+ "policy is used, which keeps the last N versions of the file slices written, "
+ "determined by \"hoodie.cleaner.fileversions.retained\".");

public static final ConfigProperty<String> CLEANER_COMMITS_RETAINED = ConfigProperty
.key("hoodie.cleaner.commits.retained")
.key(CLEANER_COMMITS_RETAINED_KEY)
.defaultValue("10")
.withDocumentation("Number of commits to retain, without cleaning. This will be retained for num_of_commits * time_between_commits "
+ "(scheduled). This also directly translates into how much data retention the table supports for incremental queries.");

public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key("hoodie.cleaner.hours.retained")
public static final ConfigProperty<String> CLEANER_HOURS_RETAINED = ConfigProperty.key(CLEANER_HOURS_RETAINED_KEY)
.defaultValue("24")
.withDocumentation("Number of hours for which commits need to be retained. This config provides a more flexible option as"
+ "compared to number of commits retained for cleaning service. Setting this property ensures all the files, but the latest in a file group,"
+ " corresponding to commits with commit times older than the configured number of hours to be retained are cleaned.");

public static final ConfigProperty<String> CLEANER_POLICY = ConfigProperty
.key("hoodie.cleaner.policy")
.defaultValue(HoodieCleaningPolicy.KEEP_LATEST_COMMITS.name())
.withDocumentation("Cleaning policy to be used. The cleaner service deletes older file slices files to re-claim space."
+ " By default, cleaner spares the file slices written by the last N commits, determined by " + CLEANER_COMMITS_RETAINED.key()
+ " Long running query plans may often refer to older file slices and will break if those are cleaned, before the query has had"
+ " a chance to run. So, it is good to make sure that the data is retained for more than the maximum query execution time");
public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
.key(CLEANER_FILE_VERSIONS_RETAINED_KEY)
.defaultValue("3")
.withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
+ " the minimum number of file slices to retain in each file group, during cleaning.");

public static final ConfigProperty<String> CLEAN_TRIGGER_STRATEGY = ConfigProperty
.key("hoodie.clean.trigger.strategy")
Expand All @@ -90,12 +136,6 @@ public class HoodieCleanConfig extends HoodieConfig {
.defaultValue("1")
.withDocumentation("Number of commits after the last clean operation, before scheduling of a new clean is attempted.");

public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
.key("hoodie.cleaner.fileversions.retained")
.defaultValue("3")
.withDocumentation("When " + HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS.name() + " cleaning policy is used, "
+ " the minimum number of file slices to retain in each file group, during cleaning.");

public static final ConfigProperty<String> CLEANER_INCREMENTAL_MODE_ENABLE = ConfigProperty
.key("hoodie.cleaner.incremental.mode")
.defaultValue("true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
import java.util.Properties;
import java.util.function.Function;

import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS;
import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_COMMITS;
import static org.apache.hudi.common.model.HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS;
import static org.apache.hudi.config.HoodieArchivalConfig.ASYNC_ARCHIVE;
import static org.apache.hudi.config.HoodieCleanConfig.ASYNC_CLEAN;
import static org.apache.hudi.config.HoodieCleanConfig.AUTO_CLEAN;
Expand Down Expand Up @@ -124,6 +127,80 @@ public void testDefaultMarkersTypeAccordingToEngineType() {
EngineType.JAVA, MarkerType.DIRECT));
}

@Test
public void testInferCleaningPolicy() {
// If no clean configs are set,
// use KEEP_LATEST_COMMITS cleaning policy
HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.build())
.build();
assertEquals(KEEP_LATEST_COMMITS, writeConfig.getCleanerPolicy());

// If "hoodie.cleaner.commits.retained" is set only,
// use KEEP_LATEST_COMMITS cleaning policy
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.retainCommits(10)
.build())
.build();
assertEquals(KEEP_LATEST_COMMITS, writeConfig.getCleanerPolicy());

// If "hoodie.cleaner.hours.retained" is set only,
// use KEEP_LATEST_BY_HOURS cleaning policy
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.cleanerNumHoursRetained(96)
.build())
.build();
assertEquals(KEEP_LATEST_BY_HOURS, writeConfig.getCleanerPolicy());

// If "hoodie.cleaner.fileversions.retained" is set only,
// use KEEP_LATEST_FILE_VERSIONS cleaning policy
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.retainFileVersions(2)
.build())
.build();
assertEquals(KEEP_LATEST_FILE_VERSIONS, writeConfig.getCleanerPolicy());

// If multiple clean configs are set and the cleaning policy is not set,
// use KEEP_LATEST_COMMITS cleaning policy (no inference)
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.cleanerNumHoursRetained(96)
.retainFileVersions(2)
.build())
.build();
assertEquals(KEEP_LATEST_COMMITS, writeConfig.getCleanerPolicy());

// If the cleaning policy is explicitly set, use the configured policy
writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(KEEP_LATEST_BY_HOURS)
.retainFileVersions(2)
.build())
.build();
assertEquals(KEEP_LATEST_BY_HOURS, writeConfig.getCleanerPolicy());

writeConfig = HoodieWriteConfig.newBuilder()
.withPath("/tmp")
.withCleanConfig(HoodieCleanConfig.newBuilder()
.withCleanerPolicy(KEEP_LATEST_BY_HOURS)
.retainCommits(10)
.cleanerNumHoursRetained(96)
.retainFileVersions(2)
.build())
.build();
assertEquals(KEEP_LATEST_BY_HOURS, writeConfig.getCleanerPolicy());
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testAutoConcurrencyConfigAdjustmentWithTableServices(HoodieTableType tableType) {
Expand Down