From a3f6a5bfdfdb395a27b59d3fe0aa6453d3586987 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 27 Sep 2022 15:18:53 -0700 Subject: [PATCH 1/3] Revert "[HUDI-4792] Batch clean files to delete (#6580)" This reverts commit cbf9b83ca6d3dada14eea551a5bae25144ca0459. --- .../action/clean/CleanPlanActionExecutor.java | 11 +- .../hudi/table/action/clean/CleanPlanner.java | 202 +++++++++--------- ...arkCopyOnWriteTableArchiveWithReplace.java | 4 +- .../view/AbstractTableFileSystemView.java | 16 +- .../view/PriorityBasedFileSystemView.java | 5 - .../view/RemoteHoodieTableFileSystemView.java | 12 -- .../table/view/TableFileSystemView.java | 14 +- 7 files changed, 114 insertions(+), 150 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java index bd7ec798ed1a..7f3b437178fd 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java @@ -42,7 +42,6 @@ import org.apache.log4j.Logger; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.stream.Collectors; @@ -117,15 +116,9 @@ HoodieCleanerPlan requestClean(HoodieEngineContext context) { context.setJobStatus(this.getClass().getSimpleName(), "Generating list of file slices to be cleaned: " + config.getTableName()); Map>> cleanOpsWithPartitionMeta = context - .parallelize(partitionsToClean, cleanerParallelism) - .mapPartitions(partitionIterator -> { - List partitionList = new ArrayList<>(); - partitionIterator.forEachRemaining(partitionList::add); - Map>> cleanResult = planner.getDeletePaths(partitionList); - return cleanResult.entrySet().iterator(); - }, false).collectAsList() + .map(partitionsToClean, partitionPathToClean -> Pair.of(partitionPathToClean, planner.getDeletePaths(partitionPathToClean)), cleanerParallelism) .stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + .collect(Collectors.toMap(Pair::getKey, Pair::getValue)); Map> cleanOps = cleanOpsWithPartitionMeta.entrySet().stream() .collect(Collectors.toMap(Map.Entry::getKey, diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index e5d90b5e9d37..476940ab8e10 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -60,7 +60,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Date; -import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -226,10 +225,10 @@ private List getPartitionPathsForFullCleaning() { * policy is useful, if you are simply interested in querying the table, and you don't want too many versions for a * single file (i.e run it with versionsRetained = 1) */ - private Map>> getFilesToCleanKeepingLatestVersions(List partitionPaths) { - LOG.info("Cleaning " + partitionPaths + ", retaining latest " + config.getCleanerFileVersionsRetained() + private Pair> getFilesToCleanKeepingLatestVersions(String partitionPath) { + LOG.info("Cleaning " + partitionPath + ", retaining latest " + config.getCleanerFileVersionsRetained() + " file versions. "); - Map>> map = new HashMap<>(); + List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() .flatMap(this::getSavepointedDataFiles) @@ -237,48 +236,43 @@ private Map>> getFilesToCleanKeepingLa // In this scenario, we will assume that once replaced a file group automatically becomes eligible for cleaning completely // In other words, the file versions only apply to the active file groups. - List>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList()); - for (Pair> partitionFileGroupList : fileGroupsPerPartition) { - List deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), Option.empty())); - boolean toDeletePartition = false; - for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) { - int keepVersions = config.getCleanerFileVersionsRetained(); - // do not cleanup slice required for pending compaction - Iterator fileSliceIterator = - fileGroup.getAllFileSlices() - .filter(fs -> !isFileSliceNeededForPendingCompaction(fs)) - .iterator(); - if (isFileGroupInPendingCompaction(fileGroup)) { - // We have already saved the last version of file-groups for pending compaction Id - keepVersions--; - } + deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, Option.empty())); + boolean toDeletePartition = false; + List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + for (HoodieFileGroup fileGroup : fileGroups) { + int keepVersions = config.getCleanerFileVersionsRetained(); + // do not cleanup slice required for pending compaction + Iterator fileSliceIterator = + fileGroup.getAllFileSlices().filter(fs -> !isFileSliceNeededForPendingCompaction(fs)).iterator(); + if (isFileGroupInPendingCompaction(fileGroup)) { + // We have already saved the last version of file-groups for pending compaction Id + keepVersions--; + } - while (fileSliceIterator.hasNext() && keepVersions > 0) { - // Skip this most recent version - fileSliceIterator.next(); - keepVersions--; - } - // Delete the remaining files - while (fileSliceIterator.hasNext()) { - FileSlice nextSlice = fileSliceIterator.next(); - Option dataFile = nextSlice.getBaseFile(); - if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { - // do not clean up a savepoint data file - continue; - } - deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); - } + while (fileSliceIterator.hasNext() && keepVersions > 0) { + // Skip this most recent version + fileSliceIterator.next(); + keepVersions--; } - // if there are no valid file groups for the partition, mark it to be deleted - if (partitionFileGroupList.getValue().isEmpty()) { - toDeletePartition = true; + // Delete the remaining files + while (fileSliceIterator.hasNext()) { + FileSlice nextSlice = fileSliceIterator.next(); + Option dataFile = nextSlice.getBaseFile(); + if (dataFile.isPresent() && savepointedFiles.contains(dataFile.get().getFileName())) { + // do not clean up a savepoint data file + continue; + } + deletePaths.addAll(getCleanFileInfoForSlice(nextSlice)); } - map.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths)); } - return map; + // if there are no valid file groups for the partition, mark it to be deleted + if (fileGroups.isEmpty()) { + toDeletePartition = true; + } + return Pair.of(toDeletePartition, deletePaths); } - private Map>> getFilesToCleanKeepingLatestCommits(List partitionPath) { + private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, config.getCleanerCommitsRetained(), HoodieCleaningPolicy.KEEP_LATEST_COMMITS); } @@ -299,9 +293,9 @@ private Map>> getFilesToCleanKeepingLa * @return A {@link Pair} whose left is boolean indicating whether partition itself needs to be deleted, * and right is a list of {@link CleanFileInfo} about the files in the partition that needs to be deleted. */ - private Map>> getFilesToCleanKeepingLatestCommits(List partitionPaths, int commitsRetained, HoodieCleaningPolicy policy) { - LOG.info("Cleaning " + partitionPaths + ", retaining latest " + commitsRetained + " commits. "); - Map>> cleanFileInfoPerPartitionMap = new HashMap<>(); + private Pair> getFilesToCleanKeepingLatestCommits(String partitionPath, int commitsRetained, HoodieCleaningPolicy policy) { + LOG.info("Cleaning " + partitionPath + ", retaining latest " + commitsRetained + " commits. "); + List deletePaths = new ArrayList<>(); // Collect all the datafiles savepointed by all the savepoints List savepointedFiles = hoodieTable.getSavepointTimestamps().stream() @@ -313,51 +307,49 @@ private Map>> getFilesToCleanKeepingLa if (commitTimeline.countInstants() > commitsRetained) { Option earliestCommitToRetainOption = getEarliestCommitToRetain(); HoodieInstant earliestCommitToRetain = earliestCommitToRetainOption.get(); + // all replaced file groups before earliestCommitToRetain are eligible to clean + deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionPath, earliestCommitToRetainOption)); // add active files - List>> fileGroupsPerPartition = fileSystemView.getAllFileGroups(partitionPaths).collect(Collectors.toList()); - for (Pair> partitionFileGroupList : fileGroupsPerPartition) { - List deletePaths = new ArrayList<>(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption)); - // all replaced file groups before earliestCommitToRetain are eligible to clean - deletePaths.addAll(getReplacedFilesEligibleToClean(savepointedFiles, partitionFileGroupList.getLeft(), earliestCommitToRetainOption)); - for (HoodieFileGroup fileGroup : partitionFileGroupList.getRight()) { - List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); - - if (fileSliceList.isEmpty()) { + List fileGroups = fileSystemView.getAllFileGroups(partitionPath).collect(Collectors.toList()); + for (HoodieFileGroup fileGroup : fileGroups) { + List fileSliceList = fileGroup.getAllFileSlices().collect(Collectors.toList()); + + if (fileSliceList.isEmpty()) { + continue; + } + + String lastVersion = fileSliceList.get(0).getBaseInstantTime(); + String lastVersionBeforeEarliestCommitToRetain = + getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); + + // Ensure there are more than 1 version of the file (we only clean old files from updates) + // i.e always spare the last commit. + for (FileSlice aSlice : fileSliceList) { + Option aFile = aSlice.getBaseFile(); + String fileCommitTime = aSlice.getBaseInstantTime(); + if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { + // do not clean up a savepoint data file continue; } - String lastVersion = fileSliceList.get(0).getBaseInstantTime(); - String lastVersionBeforeEarliestCommitToRetain = - getLatestVersionBeforeCommit(fileSliceList, earliestCommitToRetain); - - // Ensure there are more than 1 version of the file (we only clean old files from updates) - // i.e always spare the last commit. - for (FileSlice aSlice : fileSliceList) { - Option aFile = aSlice.getBaseFile(); - String fileCommitTime = aSlice.getBaseInstantTime(); - if (aFile.isPresent() && savepointedFiles.contains(aFile.get().getFileName())) { - // do not clean up a savepoint data file + if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { + // Dont delete the latest commit and also the last commit before the earliest commit we + // are retaining + // The window of commit retain == max query run time. So a query could be running which + // still + // uses this file. + if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { + // move on to the next file continue; } - - if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - // Dont delete the latest commit and also the last commit before the earliest commit we - // are retaining - // The window of commit retain == max query run time. So a query could be running which - // still - // uses this file. - if (fileCommitTime.equals(lastVersion) || (fileCommitTime.equals(lastVersionBeforeEarliestCommitToRetain))) { - // move on to the next file - continue; - } - } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - // This block corresponds to KEEP_LATEST_BY_HOURS policy - // Do not delete the latest commit. - if (fileCommitTime.equals(lastVersion)) { - // move on to the next file - continue; - } + } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { + // This block corresponds to KEEP_LATEST_BY_HOURS policy + // Do not delete the latest commit. + if (fileCommitTime.equals(lastVersion)) { + // move on to the next file + continue; } + } // Always keep the last commit if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline @@ -376,17 +368,32 @@ private Map>> getFilesToCleanKeepingLa deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) .collect(Collectors.toList())); } + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { + // If merge on read, then clean the log files for the commits as well + Predicate notCDCLogFile = + hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); + deletePaths.addAll( + aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); + } + if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { + // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow. + // Here we need to clean uo these cdc log files. + Predicate isCDCLogFile = + hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); + deletePaths.addAll( + aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); } } } - // if there are no valid file groups for the partition, mark it to be deleted - if (partitionFileGroupList.getValue().isEmpty()) { - toDeletePartition = true; - } - cleanFileInfoPerPartitionMap.put(partitionFileGroupList.getLeft(), Pair.of(toDeletePartition, deletePaths)); + } + // if there are no valid file groups for the partition, mark it to be deleted + if (fileGroups.isEmpty()) { + toDeletePartition = true; } } - return cleanFileInfoPerPartitionMap; + return Pair.of(toDeletePartition, deletePaths); } /** @@ -394,11 +401,10 @@ private Map>> getFilesToCleanKeepingLa * all the files with commit time earlier than 5 hours will be removed. Also the latest file for any file group is retained. * This policy gives much more flexibility to users for retaining data for running incremental queries as compared to * KEEP_LATEST_COMMITS cleaning policy. The default number of hours is 5. - * * @param partitionPath partition path to check * @return list of files to clean */ - private Map>> getFilesToCleanKeepingLatestHours(List partitionPath) { + private Pair> getFilesToCleanKeepingLatestHours(String partitionPath) { return getFilesToCleanKeepingLatestCommits(partitionPath, 0, HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS); } @@ -464,23 +470,21 @@ private List getCleanFileInfoForSlice(FileSlice nextSlice) { /** * Returns files to be cleaned for the given partitionPath based on cleaning policy. */ - public Map>> getDeletePaths(List partitionPaths) { + public Pair> getDeletePaths(String partitionPath) { HoodieCleaningPolicy policy = config.getCleanerPolicy(); - Map>> deletePaths; + Pair> deletePaths; if (policy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) { - deletePaths = getFilesToCleanKeepingLatestCommits(partitionPaths); + deletePaths = getFilesToCleanKeepingLatestCommits(partitionPath); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS) { - deletePaths = getFilesToCleanKeepingLatestVersions(partitionPaths); + deletePaths = getFilesToCleanKeepingLatestVersions(partitionPath); } else if (policy == HoodieCleaningPolicy.KEEP_LATEST_BY_HOURS) { - deletePaths = getFilesToCleanKeepingLatestHours(partitionPaths); + deletePaths = getFilesToCleanKeepingLatestHours(partitionPath); } else { throw new IllegalArgumentException("Unknown cleaning policy : " + policy.name()); } - for (String partitionPath : deletePaths.keySet()) { - LOG.info(deletePaths.get(partitionPath).getRight().size() + " patterns used to delete in partition path:" + partitionPath); - if (deletePaths.get(partitionPath).getLeft()) { - LOG.info("Partition " + partitionPath + " to be deleted"); - } + LOG.info(deletePaths.getValue().size() + " patterns used to delete in partition path:" + partitionPath); + if (deletePaths.getKey()) { + LOG.info("Partition " + partitionPath + " to be deleted"); } return deletePaths; } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java index 967e313f4ee9..baff4ebac875 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/functional/TestHoodieSparkCopyOnWriteTableArchiveWithReplace.java @@ -57,7 +57,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce HoodieWriteConfig writeConfig = getConfigBuilder(true) .withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(1).build()) .withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(2, 3).build()) - .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).withMaxNumDeltaCommitsBeforeCompaction(2).build()) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(metadataEnabled).build()) .build(); try (SparkRDDWriteClient client = getHoodieWriteClient(writeConfig); HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator(DEFAULT_PARTITION_PATHS)) { @@ -81,7 +81,7 @@ public void testDeletePartitionAndArchive(boolean metadataEnabled) throws IOExce client.startCommitWithTime(instantTime4, HoodieActiveTimeline.REPLACE_COMMIT_ACTION); client.deletePartitions(Arrays.asList(DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH), instantTime4); - // 2nd write batch; 4 commits for the 3rd partition; the 4th commit to trigger archiving the replace commit + // 2nd write batch; 4 commits for the 4th partition; the 4th commit to trigger archiving the replace commit for (int i = 5; i < 9; i++) { String instantTime = HoodieActiveTimeline.createNewInstantTime(i * 1000); client.startCommitWithTime(instantTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java index d70f74a0c454..ed4bfd760161 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java @@ -116,7 +116,7 @@ protected void init(HoodieTableMetaClient metaClient, HoodieTimeline visibleActi /** * Refresh commits timeline. - * + * * @param visibleActiveTimeline Visible Active Timeline */ protected void refreshTimeline(HoodieTimeline visibleActiveTimeline) { @@ -750,20 +750,6 @@ public final Stream getAllFileGroups(String partitionStr) { return getAllFileGroupsIncludingReplaced(partitionStr).filter(fg -> !isFileGroupReplaced(fg)); } - @Override - public final Stream>> getAllFileGroups(List partitionPaths) { - return getAllFileGroupsIncludingReplaced(partitionPaths) - .map(pair -> Pair.of(pair.getLeft(), pair.getRight().stream().filter(fg -> !isFileGroupReplaced(fg)).collect(Collectors.toList()))); - } - - private Stream>> getAllFileGroupsIncludingReplaced(final List partitionStrList) { - List>> fileGroupPerPartitionList = new ArrayList<>(); - for (String partitionStr : partitionStrList) { - fileGroupPerPartitionList.add(Pair.of(partitionStr, getAllFileGroupsIncludingReplaced(partitionStr).collect(Collectors.toList()))); - } - return fileGroupPerPartitionList.stream(); - } - private Stream getAllFileGroupsIncludingReplaced(final String partitionStr) { try { readLock.lock(); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java index 9006bd45cba9..ff44c7cef017 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/PriorityBasedFileSystemView.java @@ -204,11 +204,6 @@ public Stream getAllFileGroups(String partitionPath) { return execute(partitionPath, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); } - @Override - public Stream>> getAllFileGroups(List partitionPaths) { - return execute(partitionPaths, preferredView::getAllFileGroups, secondaryView::getAllFileGroups); - } - @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { return execute(maxCommitTime, partitionPath, preferredView::getReplacedFileGroupsBeforeOrOn, secondaryView::getReplacedFileGroupsBeforeOrOn); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java index 5e52767fe2ce..bd18ba22a25d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RemoteHoodieTableFileSystemView.java @@ -51,11 +51,9 @@ import java.io.IOException; import java.io.Serializable; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import java.util.stream.Stream; /** @@ -379,16 +377,6 @@ public Stream getAllFileGroups(String partitionPath) { } } - @Override - public Stream>> getAllFileGroups(List partitionPaths) { - ArrayList>> fileGroupPerPartitionList = new ArrayList<>(); - for (String partitionPath : partitionPaths) { - Stream fileGroup = getAllFileGroups(partitionPath); - fileGroupPerPartitionList.add(Pair.of(partitionPath, fileGroup.collect(Collectors.toList()))); - } - return fileGroupPerPartitionList.stream(); - } - @Override public Stream getReplacedFileGroupsBeforeOrOn(String maxCommitTime, String partitionPath) { Map paramsMap = getParamsWithAdditionalParam(partitionPath, MAX_INSTANT_PARAM, maxCommitTime); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java index 9c83c8f19cd9..c32e2cabb101 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/TableFileSystemView.java @@ -109,18 +109,18 @@ interface SliceViewWithLatestSlice { /** * Stream all latest file slices in given partition with precondition that commitTime(file) before maxCommitTime. * - * @param partitionPath Partition path - * @param maxCommitTime Max Instant Time + * @param partitionPath Partition path + * @param maxCommitTime Max Instant Time * @param includeFileSlicesInPendingCompaction include file-slices that are in pending compaction */ Stream getLatestFileSlicesBeforeOrOn(String partitionPath, String maxCommitTime, - boolean includeFileSlicesInPendingCompaction); + boolean includeFileSlicesInPendingCompaction); /** * Stream all "merged" file-slices before on an instant time If a file-group has a pending compaction request, the * file-slice before and after compaction request instant is merged and returned. - * - * @param partitionPath Partition Path + * + * @param partitionPath Partition Path * @param maxInstantTime Max Instant Time * @return */ @@ -149,12 +149,10 @@ interface SliceView extends SliceViewWithLatestSlice { */ Stream getAllFileGroups(String partitionPath); - Stream>> getAllFileGroups(List partitionPaths); - /** * Return Pending Compaction Operations. * - * @return Pair> + * @return Pair> */ Stream> getPendingCompactionOperations(); From 2c5f43beba87a049ffc077b0394e76fa1e9d9ad6 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Tue, 27 Sep 2022 15:48:16 -0700 Subject: [PATCH 2/3] fixing code style --- .../hudi/table/action/clean/CleanPlanner.java | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 476940ab8e10..850d8f6101fc 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -351,23 +351,23 @@ private Pair> getFilesToCleanKeepingLatestCommits(S } } - // Always keep the last commit - if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline - .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { - // this is a commit, that should be cleaned. - aFile.ifPresent(hoodieDataFile -> { - deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); - if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { - deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); - } - }); - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ - || hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { - // 1. If merge on read, then clean the log files for the commits as well; - // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow. - deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); + // Always keep the last commit + if (!isFileSliceNeededForPendingCompaction(aSlice) && HoodieTimeline + .compareTimestamps(earliestCommitToRetain.getTimestamp(), HoodieTimeline.GREATER_THAN, fileCommitTime)) { + // this is a commit, that should be cleaned. + aFile.ifPresent(hoodieDataFile -> { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getPath(), false)); + if (hoodieDataFile.getBootstrapBaseFile().isPresent() && config.shouldCleanBootstrapBaseFile()) { + deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true)); } + }); + if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ + || hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { + // 1. If merge on read, then clean the log files for the commits as well; + // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow. + deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) + .collect(Collectors.toList())); + } if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { // If merge on read, then clean the log files for the commits as well Predicate notCDCLogFile = From 891ae8a8962c8e7f398e97fa621d54034b2cb9a9 Mon Sep 17 00:00:00 2001 From: sivabalan Date: Wed, 28 Sep 2022 11:07:48 -0700 Subject: [PATCH 3/3] fixing rebase issue --- .../hudi/table/action/clean/CleanPlanner.java | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java index 850d8f6101fc..9027ab045a2f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java @@ -368,23 +368,6 @@ private Pair> getFilesToCleanKeepingLatestCommits(S deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) .collect(Collectors.toList())); } - if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) { - // If merge on read, then clean the log files for the commits as well - Predicate notCDCLogFile = - hoodieLogFile -> !hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); - deletePaths.addAll( - aSlice.getLogFiles().filter(notCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); - } - if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) { - // The cdc log files will be written out in cdc scenario, no matter the table type is mor or cow. - // Here we need to clean uo these cdc log files. - Predicate isCDCLogFile = - hoodieLogFile -> hoodieLogFile.getFileName().endsWith(HoodieCDCUtils.CDC_LOGFILE_SUFFIX); - deletePaths.addAll( - aSlice.getLogFiles().filter(isCDCLogFile).map(lf -> new CleanFileInfo(lf.getPath().toString(), false)) - .collect(Collectors.toList())); - } } } }