diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java index ce74aad6b098..e11d06098fc7 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java @@ -119,6 +119,11 @@ public class HoodieCompactionConfig extends HoodieConfig { + " keep the metadata overhead constant, even as the table size grows." + "This config controls the maximum number of instants to retain in the active timeline. "); + public static final ConfigProperty DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE = ConfigProperty + .key("hoodie.archive.delete.parallelism") + .defaultValue(100) + .withDocumentation("Parallelism for deleting archived hoodie commits."); + public static final ConfigProperty MIN_COMMITS_TO_KEEP = ConfigProperty .key("hoodie.keep.min.commits") .defaultValue("20") @@ -568,6 +573,11 @@ public Builder withMaxNumDeltaCommitsBeforeCompaction(int maxNumDeltaCommitsBefo return this; } + public Builder withArchiveDeleteParallelism(int archiveDeleteParallelism) { + compactionConfig.setValue(DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE, String.valueOf(archiveDeleteParallelism)); + return this; + } + public Builder withMaxDeltaSecondsBeforeCompaction(int maxDeltaSecondsBeforeCompaction) { compactionConfig.setValue(INLINE_COMPACT_TIME_DELTA_SECONDS, String.valueOf(maxDeltaSecondsBeforeCompaction)); return this; diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index eb3df38428f5..73629b4c97b8 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -1135,6 +1135,10 @@ public Boolean getCompactionReverseLogReadEnabled() { return getBoolean(HoodieCompactionConfig.COMPACTION_REVERSE_LOG_READ_ENABLE); } + public int getArchiveDeleteParallelism() { + return getInt(HoodieCompactionConfig.DELETE_ARCHIVED_INSTANT_PARALLELISM_VALUE); + } + public boolean inlineClusteringEnabled() { return getBoolean(HoodieClusteringConfig.INLINE_CLUSTERING); } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java index d492fb6577a9..bff91c3e56c2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTimelineArchiveLog.java @@ -18,9 +18,11 @@ package org.apache.hudi.table; +import org.apache.hadoop.fs.FileSystem; import org.apache.hudi.avro.model.HoodieArchivedMetaEntry; import org.apache.hudi.client.utils.MetadataConversionUtils; import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieArchivedLogFile; import org.apache.hudi.common.model.HoodieAvroPayload; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; @@ -127,7 +129,7 @@ public boolean archiveIfRequired(HoodieEngineContext context) throws IOException LOG.info("Archiving instants " + instantsToArchive); archive(context, instantsToArchive); LOG.info("Deleting archived instants " + instantsToArchive); - success = deleteArchivedInstants(instantsToArchive); + success = deleteArchivedInstants(instantsToArchive, context); } else { LOG.info("No Instants to archive"); } @@ -224,19 +226,34 @@ private Stream getInstantsToArchive() { HoodieInstant.getComparableAction(hoodieInstant.getAction()))).stream()); } - private boolean deleteArchivedInstants(List archivedInstants) throws IOException { + private boolean deleteArchivedInstants(List archivedInstants, HoodieEngineContext context) throws IOException { LOG.info("Deleting instants " + archivedInstants); boolean success = true; - for (HoodieInstant archivedInstant : archivedInstants) { - Path commitFile = new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); - try { - if (metaClient.getFs().exists(commitFile)) { - success &= metaClient.getFs().delete(commitFile, false); - LOG.info("Archived and deleted instant file " + commitFile); - } - } catch (IOException e) { - throw new HoodieIOException("Failed to delete archived instant " + archivedInstant, e); - } + List instantFiles = archivedInstants.stream().map(archivedInstant -> { + return new Path(metaClient.getMetaPath(), archivedInstant.getFileName()); + }).map(Path::toString).collect(Collectors.toList()); + + context.setJobStatus(this.getClass().getSimpleName(), "Delete archived instants"); + Map resultDeleteInstantFiles = FSUtils.parallelizeFilesProcess(context, + metaClient.getFs(), + config.getArchiveDeleteParallelism(), + pairOfSubPathAndConf -> { + Path commitFile = new Path(pairOfSubPathAndConf.getKey()); + try { + FileSystem fs = commitFile.getFileSystem(pairOfSubPathAndConf.getValue().get()); + if (fs.exists(commitFile)) { + return fs.delete(commitFile, false); + } + return true; + } catch (IOException e) { + throw new HoodieIOException("Failed to delete archived instant " + commitFile, e); + } + }, + instantFiles); + + for (Map.Entry result : resultDeleteInstantFiles.entrySet()) { + LOG.info("Archived and deleted instant file " + result.getKey() + " : " + result.getValue()); + success &= result.getValue(); } // Remove older meta-data from auxiliary path too diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index dc4df23a4c3d..209e4ae42270 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -670,19 +670,30 @@ public static Map parallelizeSubPathProcess( .filter(subPathPredicate) .map(fileStatus -> fileStatus.getPath().toString()) .collect(Collectors.toList()); - if (subPaths.size() > 0) { - SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); - int actualParallelism = Math.min(subPaths.size(), parallelism); - result = hoodieEngineContext.mapToPair(subPaths, - subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))), - actualParallelism); - } + result = parallelizeFilesProcess(hoodieEngineContext, fs, parallelism, pairFunction, subPaths); } catch (IOException ioe) { throw new HoodieIOException(ioe.getMessage(), ioe); } return result; } + public static Map parallelizeFilesProcess( + HoodieEngineContext hoodieEngineContext, + FileSystem fs, + int parallelism, + SerializableFunction, T> pairFunction, + List subPaths) { + Map result = new HashMap<>(); + if (subPaths.size() > 0) { + SerializableConfiguration conf = new SerializableConfiguration(fs.getConf()); + int actualParallelism = Math.min(subPaths.size(), parallelism); + result = hoodieEngineContext.mapToPair(subPaths, + subPath -> new ImmutablePair<>(subPath, pairFunction.apply(new ImmutablePair<>(subPath, conf))), + actualParallelism); + } + return result; + } + /** * Deletes a sub-path. *