Skip to content

Commit

Permalink
[HUDI-2683] Parallelize deleting archived hoodie commits (#3920)
Browse files Browse the repository at this point in the history
Co-authored-by: yuezhang <yuezhang@freewheel.tv>
  • Loading branch information
zhangyue19921010 and yuezhang committed Nov 15, 2021
1 parent 53d2d6a commit 38b6934
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ public class HoodieCompactionConfig extends HoodieConfig {
+ " keep the metadata overhead constant, even as the table size grows."
+ "This config controls the maximum number of instants to retain in the active timeline. ");

public static final ConfigProperty<Integer> DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty
.key("hoodie.archive.delete.parallelism")
.defaultValue(100)
.withDocumentation("Parallelism for deleting archived hoodie commits.");

public static final ConfigProperty<String> MIN_COMMITS_TO_KEEP = ConfigProperty
.key("hoodie.keep.min.commits")
.defaultValue("20")
Expand Down Expand Up @@ -568,6 +573,11 @@ public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBefo
return this;
}

public Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) {
compactionConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism));
return this;
}

public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) {
compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS, String.valueOf(maxDeltaSecondsBeforeCompaction));
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1135,6 +1135,10 @@ public Boolean getCompactionReverseLogReadEnabled() {
return getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE);
}

public int getArchiveDeleteParallelism() {
return getInt(HoodieCompactionConfig.DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE);
}

public boolean inlineClusteringEnabled() {
return getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

package org.apache.hudi.table;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.client.utils.MetadataConversionUtils;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroPayload;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
Expand Down Expand Up @@ -127,7 +129,7 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException
LOG.info("Archiving instants " + instantsToArchive);
archive(context, instantsToArchive);
LOG.info("Deleting archived instants " + instantsToArchive);
success = deleteArchivedInstants(instantsToArchive);
success = deleteArchivedInstants(instantsToArchive, context);
} else {
LOG.info("No Instants to archive");
}
Expand Down Expand Up @@ -224,19 +226,34 @@ private Stream<HoodieInstant> getInstantsToArchive() {
HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream());
}

private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants) throws IOException {
private boolean deleteArchivedInstants(List<HoodieInstant> archivedInstants, HoodieEngineContext context) throws IOException {
LOG.info("Deleting instants " + archivedInstants);
boolean success = true;
for (HoodieInstant archivedInstant : archivedInstants) {
Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
try {
if (metaClient.getFs().exists(commitFile)) {
success &= metaClient.getFs().delete(commitFile, false);
LOG.info("Archived and deleted instant file " + commitFile);
}
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e);
}
List<String> instantFiles = archivedInstants.stream().map(archivedInstant -> {
return new Path(metaClient.getMetaPath(), archivedInstant.getFileName());
}).map(Path::toString).collect(Collectors.toList());

context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants");
Map<String, Boolean> resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context,
metaClient.getFs(),
config.getArchiveDeleteParallelism(),
pairOfSubPathAndConf -> {
Path commitFile = new Path(pairOfSubPathAndConf.getKey());
try {
FileSystem fs = commitFile.getFileSystem(pairOfSubPathAndConf.getValue().get());
if (fs.exists(commitFile)) {
return fs.delete(commitFile, false);
}
return true;
} catch (IOException e) {
throw new HoodieIOException("Failed to delete archived instant " + commitFile, e);
}
},
instantFiles);

for (Map.Entry<String, Boolean> result : resultDeleteInstantFiles.entrySet()) {
LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue());
success &= result.getValue();
}

// Remove older meta-data from auxiliary path too
Expand Down
25 changes: 18 additions & 7 deletions hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -670,19 +670,30 @@ public static <T> Map<String, T> parallelizeSubPathProcess(
.filter(subPathPredicate)
.map(fileStatus -> fileStatus.getPath().toString())
.collect(Collectors.toList());
if (subPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
int actualParallelism = Math.min(subPaths.size(), parallelism);
result = hoodieEngineContext.mapToPair(subPaths,
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))),
actualParallelism);
}
result = parallelizeFilesProcess(hoodieEngineContext, fs, parallelism, pairFunction, subPaths);
} catch (IOException ioe) {
throw new HoodieIOException(ioe.getMessage(), ioe);
}
return result;
}

public static <T> Map<String, T> parallelizeFilesProcess(
HoodieEngineContext hoodieEngineContext,
FileSystem fs,
int parallelism,
SerializableFunction<Pair<String, SerializableConfiguration>, T> pairFunction,
List<String> subPaths) {
Map<String, T> result = new HashMap<>();
if (subPaths.size() > 0) {
SerializableConfiguration conf = new SerializableConfiguration(fs.getConf());
int actualParallelism = Math.min(subPaths.size(), parallelism);
result = hoodieEngineContext.mapToPair(subPaths,
subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))),
actualParallelism);
}
return result;
}

/**
* Deletes a sub-path.
*
Expand Down

0 comments on commit 38b6934

Please sign in to comment.