Skip to content

Commit

Permalink
[HUDI-1436]: Provide an option to trigger clean every nth commit (#4385)
Browse files Browse the repository at this point in the history
- Provided option to trigger clean every nth commit with default number of commits as 1 so that existing users are not affected.
Co-authored-by: sivabalan <n.siva.b@gmail.com>
  • Loading branch information
pratyakshsharma committed Mar 22, 2022
1 parent 26e5d2e commit ca0931d
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 384 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
import org.apache.hudi.table.action.compact.strategy.LogFileSizeBasedCompactionStrategy;
Expand Down Expand Up @@ -129,6 +130,17 @@ public class HoodieCompactionConfig extends HoodieConfig {
.withDocumentation("Controls how compaction scheduling is triggered, by time or num delta commits or combination of both. "
+ "Valid options: " + Arrays.stream(CompactionTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));

public static final ConfigProperty<String> CLEAN_TRIGGER_STRATEGY = ConfigProperty
.key("hoodie.clean.trigger.strategy")
.defaultValue(CleaningTriggerStrategy.NUM_COMMITS.name())
.withDocumentation("Controls how cleaning is scheduled. Valid options: "
+ Arrays.stream(CleaningTriggerStrategy.values()).map(Enum::name).collect(Collectors.joining(",")));

public static final ConfigProperty<String> CLEAN_MAX_COMMITS = ConfigProperty
.key("hoodie.clean.max.commits")
.defaultValue("1")
.withDocumentation("Number of commits after the last clean operation, before scheduling of a new clean is attempted.");

public static final ConfigProperty<String> CLEANER_FILE_VERSIONS_RETAINED = ConfigProperty
.key("hoodie.cleaner.fileversions.retained")
.defaultValue("3")
Expand Down Expand Up @@ -583,6 +595,16 @@ public Builder withInlineCompactionTriggerStrategy(CompactionTriggerStrategy com
return this;
}

public Builder withCleaningTriggerStrategy(String cleaningTriggerStrategy) {
compactionConfig.setValue(CLEAN_TRIGGER_STRATEGY, cleaningTriggerStrategy);
return this;
}

public Builder withMaxCommitsBeforeCleaning(int maxCommitsBeforeCleaning) {
compactionConfig.setValue(CLEAN_MAX_COMMITS, String.valueOf(maxCommitsBeforeCleaning));
return this;
}

public Builder withCleanerPolicy(HoodieCleaningPolicy policy) {
compactionConfig.setValue(CLEANER_POLICY, policy.name());
return this;
Expand Down
Expand Up @@ -61,6 +61,7 @@
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.RandomFileIdPrefixProvider;
import org.apache.hudi.table.action.clean.CleaningTriggerStrategy;
import org.apache.hudi.table.action.cluster.ClusteringPlanPartitionFilterMode;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;
Expand Down Expand Up @@ -1153,6 +1154,18 @@ public int getCleanerParallelism() {
return getInt(HoodieCompactionConfig.CLEANER_PARALLELISM_VALUE);
}

public int getCleaningMaxCommits() {
return getInt(HoodieCompactionConfig.CLEAN_MAX_COMMITS);
}

public CleaningTriggerStrategy getCleaningTriggerStrategy() {
return CleaningTriggerStrategy.valueOf(getString(HoodieCompactionConfig.CLEAN_TRIGGER_STRATEGY));
}

public boolean isAutoClean() {
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
}

public boolean getArchiveMergeEnable() {
return getBoolean(HoodieCompactionConfig.ARCHIVE_MERGE_ENABLE);
}
Expand All @@ -1169,10 +1182,6 @@ public boolean isAsyncArchive() {
return getBoolean(HoodieCompactionConfig.ASYNC_ARCHIVE);
}

public boolean isAutoClean() {
return getBoolean(HoodieCompactionConfig.AUTO_CLEAN);
}

public boolean isAsyncClean() {
return getBoolean(HoodieCompactionConfig.ASYNC_CLEAN);
}
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.BaseActionExecutor;
Expand All @@ -58,8 +59,30 @@ public CleanPlanActionExecutor(HoodieEngineContext context,
this.extraMetadata = extraMetadata;
}

protected Option<HoodieCleanerPlan> createCleanerPlan() {
return execute();
private int getCommitsSinceLastCleaning() {
Option<HoodieInstant> lastCleanInstant = table.getActiveTimeline().getCleanerTimeline().filterCompletedInstants().lastInstant();
HoodieTimeline commitTimeline = table.getActiveTimeline().getCommitsTimeline().filterCompletedInstants();

String latestCleanTs;
int numCommits = 0;
if (lastCleanInstant.isPresent()) {
latestCleanTs = lastCleanInstant.get().getTimestamp();
numCommits = commitTimeline.findInstantsAfter(latestCleanTs).countInstants();
} else {
numCommits = commitTimeline.countInstants();
}

return numCommits;
}

private boolean needsCleaning(CleaningTriggerStrategy strategy) {
if (strategy == CleaningTriggerStrategy.NUM_COMMITS) {
int numberOfCommits = getCommitsSinceLastCleaning();
int maxInlineCommitsForNextClean = config.getCleaningMaxCommits();
return numberOfCommits >= maxInlineCommitsForNextClean;
} else {
throw new HoodieException("Unsupported cleaning trigger strategy: " + config.getCleaningTriggerStrategy());
}
}

/**
Expand Down Expand Up @@ -128,6 +151,9 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {

@Override
public Option<HoodieCleanerPlan> execute() {
if (!needsCleaning(config.getCleaningTriggerStrategy())) {
return Option.empty();
}
// Plan a new clean action
return requestClean(instantTime);
}
Expand Down
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.clean;

public enum CleaningTriggerStrategy {
// trigger cleaning when reach n commits
NUM_COMMITS
}
Expand Up @@ -117,7 +117,7 @@ public void testInsert() throws Exception {
records1.add(new HoodieAvroRecord(new HoodieKey(insertRow2.getRowKey(), insertRow2.getPartitionPath()), insertRow2));

int startInstant = 1;
String firstCommitTime = makeNewCommitTime(startInstant++);
String firstCommitTime = makeNewCommitTime(startInstant++, "%09d");
// First insert
writeClient.startCommitWithTime(firstCommitTime);
writeClient.insert(records1, firstCommitTime);
Expand Down Expand Up @@ -145,7 +145,7 @@ public void testInsert() throws Exception {
records2.add(new HoodieAvroRecord(new HoodieKey(insertRow1.getRowKey(), insertRow1.getPartitionPath()), insertRow1));
records2.add(new HoodieAvroRecord(new HoodieKey(insertRow2.getRowKey(), insertRow2.getPartitionPath()), insertRow2));

String newCommitTime = makeNewCommitTime(startInstant++);
String newCommitTime = makeNewCommitTime(startInstant++, "%09d");
writeClient.startCommitWithTime(newCommitTime);
// Second insert is the same as the _row_key of the first one,test allowDuplicateInserts
writeClient.insert(records2, newCommitTime);
Expand Down Expand Up @@ -183,7 +183,7 @@ public void testInsertWithDataGenerator(boolean mergeAllowDuplicateOnInsertsEnab
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[]{partitionPath});

int startInstant = 1;
String firstCommitTime = makeNewCommitTime(startInstant++);
String firstCommitTime = makeNewCommitTime(startInstant++, "%09d");
List<HoodieRecord> records1 = dataGenerator.generateInserts(firstCommitTime, 100);

// First insert
Expand All @@ -200,7 +200,7 @@ public void testInsertWithDataGenerator(boolean mergeAllowDuplicateOnInsertsEnab
assertTrue(filter.mightContain(record.getRecordKey()));
}

String newCommitTime = makeNewCommitTime(startInstant++);
String newCommitTime = makeNewCommitTime(startInstant++, "%09d");
List<HoodieRecord> records2 = dataGenerator.generateUpdates(newCommitTime, 100);
writeClient.startCommitWithTime(newCommitTime);
// Second insert is the same as the _row_key of the first one,test allowDuplicateInserts
Expand Down
Expand Up @@ -123,7 +123,7 @@ public void testUpdateRecords() throws Exception {
// Prepare the AvroParquetIO
HoodieWriteConfig config = makeHoodieClientConfig();
int startInstant = 1;
String firstCommitTime = makeNewCommitTime(startInstant++);
String firstCommitTime = makeNewCommitTime(startInstant++, "%09d");
HoodieJavaWriteClient writeClient = getHoodieWriteClient(config);
writeClient.startCommitWithTime(firstCommitTime);
metaClient = HoodieTableMetaClient.reload(metaClient);
Expand Down Expand Up @@ -185,7 +185,7 @@ public void testUpdateRecords() throws Exception {

List<HoodieRecord> updatedRecords = Arrays.asList(updatedRecord1, insertedRecord1);

String newCommitTime = makeNewCommitTime(startInstant++);
String newCommitTime = makeNewCommitTime(startInstant++, "%09d");
metaClient = HoodieTableMetaClient.reload(metaClient);
writeClient.startCommitWithTime(newCommitTime);
List<WriteStatus> statuses = writeClient.upsert(updatedRecords, newCommitTime);
Expand Down

0 comments on commit ca0931d

Please sign in to comment.