Skip to content
Permalink
Browse files
[Hudi-3376] Add an option to skip under deletion files for HoodieMeta…
…dataTableValidator (#4994)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
  • Loading branch information
zhangyue19921010 and yuezhang committed Mar 17, 2022
1 parent 91849c3 commit 8ca9a54db016fc644070085fe2d596b0e5643e00
Showing 1 changed file with 146 additions and 30 deletions.
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities;

import org.apache.hudi.async.HoodieAsyncService;
import org.apache.hudi.avro.model.HoodieCleanerPlan;
import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.config.HoodieMetadataConfig;
@@ -29,16 +30,21 @@
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
@@ -63,6 +69,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -173,6 +180,9 @@ public static class Config implements Serializable {
+ "Can use --min-validate-interval-seconds to control validation frequency", required = false)
public boolean continuous = false;

@Parameter(names = {"--skip-data-files-for-cleaning"}, description = "Skip to compare the data files which are under deletion by cleaner", required = false)
public boolean skipDataFilesForCleaning = false;

@Parameter(names = {"--validate-latest-file-slices"}, description = "Validate latest file slices for all partitions.", required = false)
public boolean validateLatestFileSlices = false;

@@ -230,6 +240,7 @@ public String toString() {
+ " --validate-all-column-stats " + validateAllColumnStats + ", \n"
+ " --validate-bloom-filters " + validateBloomFilters + ", \n"
+ " --continuous " + continuous + ", \n"
+ " --skip-data-files-for-cleaning " + skipDataFilesForCleaning + ", \n"
+ " --ignore-failed " + ignoreFailed + ", \n"
+ " --min-validate-interval-seconds " + minValidateIntervalSeconds + ", \n"
+ " --parallelism " + parallelism + ", \n"
@@ -252,6 +263,7 @@ public boolean equals(Object o) {
Config config = (Config) o;
return basePath.equals(config.basePath)
&& Objects.equals(continuous, config.continuous)
&& Objects.equals(skipDataFilesForCleaning, config.skipDataFilesForCleaning)
&& Objects.equals(validateLatestFileSlices, config.validateLatestFileSlices)
&& Objects.equals(validateLatestBaseFiles, config.validateLatestBaseFiles)
&& Objects.equals(validateAllFileGroups, config.validateAllFileGroups)
@@ -269,7 +281,7 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(basePath, continuous, validateLatestFileSlices, validateLatestBaseFiles,
return Objects.hash(basePath, continuous, skipDataFilesForCleaning, validateLatestFileSlices, validateLatestBaseFiles,
validateAllFileGroups, validateAllColumnStats, validateBloomFilters, minValidateIntervalSeconds,
parallelism, ignoreFailed, sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, configs, help);
}
@@ -345,16 +357,44 @@ public void doMetadataTableValidation() {
boolean finalResult = true;
metaClient.reloadActiveTimeline();
String basePath = metaClient.getBasePath();
Set<String> baseFilesForCleaning = Collections.emptySet();

if (cfg.skipDataFilesForCleaning) {
HoodieTimeline inflightCleaningTimeline = metaClient.getActiveTimeline().getCleanerTimeline().filterInflights();

baseFilesForCleaning = inflightCleaningTimeline.getInstants().flatMap(instant -> {
try {
// convert inflight instant to requested and get clean plan
instant = new HoodieInstant(HoodieInstant.State.REQUESTED, instant.getAction(), instant.getTimestamp());
HoodieCleanerPlan cleanerPlan = CleanerUtils.getCleanerPlan(metaClient, instant);

return cleanerPlan.getFilePathsToBeDeletedPerPartition().values().stream().flatMap(cleanerFileInfoList -> {
return cleanerFileInfoList.stream().map(fileInfo -> {
return new Path(fileInfo.getFilePath()).getName();
});
});

} catch (IOException e) {
throw new HoodieIOException("Error reading cleaner metadata for " + instant);
}
// only take care of base files here.
}).filter(path -> {
String fileExtension = FSUtils.getFileExtension(path);
return HoodieFileFormat.BASE_FILE_EXTENSIONS.contains(fileExtension);
}).collect(Collectors.toSet());
}

HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
List<String> allPartitions = validatePartitions(engineContext, basePath);
HoodieMetadataValidationContext metadataTableBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, true);
HoodieMetadataValidationContext fsBasedContext =
new HoodieMetadataValidationContext(engineContext, cfg, metaClient, false);

Set<String> finalBaseFilesForCleaning = baseFilesForCleaning;
List<Boolean> result = engineContext.parallelize(allPartitions, allPartitions.size()).map(partitionPath -> {
try {
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath);
validateFilesInPartition(metadataTableBasedContext, fsBasedContext, partitionPath, finalBaseFilesForCleaning);
LOG.info("Metadata table validation succeeded for " + partitionPath);
return true;
} catch (HoodieValidationException e) {
@@ -410,42 +450,64 @@ private List<String> validatePartitions(HoodieSparkEngineContext engineContext,
* @param metadataTableBasedContext Validation context containing information based on metadata table
* @param fsBasedContext Validation context containing information based on the file system
* @param partitionPath Partition path String
* @param baseDataFilesForCleaning Base files for un-complete cleaner action
*/
private void validateFilesInPartition(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
HoodieMetadataValidationContext fsBasedContext, String partitionPath,
Set<String> baseDataFilesForCleaning) {
if (cfg.validateLatestFileSlices) {
validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath);
validateLatestFileSlices(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}

if (cfg.validateLatestBaseFiles) {
validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath);
validateLatestBaseFiles(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}

if (cfg.validateAllFileGroups) {
validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath);
validateAllFileGroups(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}

if (cfg.validateAllColumnStats) {
validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath);
validateAllColumnStats(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}

if (cfg.validateBloomFilters) {
validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath);
validateBloomFilters(metadataTableBasedContext, fsBasedContext, partitionPath, baseDataFilesForCleaning);
}
}

private void validateAllFileGroups(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<FileSlice> allFileSlicesFromMeta = metadataTableBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
List<FileSlice> allFileSlicesFromFS = fsBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {

List<FileSlice> allFileSlicesFromMeta;
List<FileSlice> allFileSlicesFromFS;

if (!baseDataFilesForCleaning.isEmpty()) {
List<FileSlice> fileSlicesFromMeta = metadataTableBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
List<FileSlice> fileSlicesFromFS = fsBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());

allFileSlicesFromMeta = filterFileSliceBasedOnInflightCleaning(fileSlicesFromMeta, baseDataFilesForCleaning);
allFileSlicesFromFS = filterFileSliceBasedOnInflightCleaning(fileSlicesFromFS, baseDataFilesForCleaning);
} else {
allFileSlicesFromMeta = metadataTableBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
allFileSlicesFromFS = fsBasedContext
.getSortedAllFileGroupList(partitionPath).stream()
.flatMap(HoodieFileGroup::getAllFileSlices).sorted(new FileSliceComparator())
.collect(Collectors.toList());
}

LOG.debug("All file slices from metadata: " + allFileSlicesFromMeta + ". For partitions " + partitionPath);
LOG.debug("All file slices from direct listing: " + allFileSlicesFromFS + ". For partitions " + partitionPath);
@@ -459,10 +521,20 @@ private void validateAllFileGroups(
*/
private void validateLatestBaseFiles(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {

List<HoodieBaseFile> latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
List<HoodieBaseFile> latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
List<HoodieBaseFile> latestFilesFromMetadata;
List<HoodieBaseFile> latestFilesFromFS;

if (!baseDataFilesForCleaning.isEmpty()) {
latestFilesFromMetadata = filterBaseFileBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
latestFilesFromFS = filterBaseFileBasedOnInflightCleaning(fsBasedContext.getSortedLatestBaseFileList(partitionPath), baseDataFilesForCleaning);
} else {
latestFilesFromMetadata = metadataTableBasedContext.getSortedLatestBaseFileList(partitionPath);
latestFilesFromFS = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
}

LOG.debug("Latest base file from metadata: " + latestFilesFromMetadata + ". For partitions " + partitionPath);
LOG.debug("Latest base file from direct listing: " + latestFilesFromFS + ". For partitions " + partitionPath);
@@ -483,10 +555,19 @@ private void validateLatestBaseFiles(
*/
private void validateLatestFileSlices(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {

List<FileSlice> latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
List<FileSlice> latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {
List<FileSlice> latestFileSlicesFromMetadataTable;
List<FileSlice> latestFileSlicesFromFS;

if (!baseDataFilesForCleaning.isEmpty()) {
latestFileSlicesFromMetadataTable = filterFileSliceBasedOnInflightCleaning(metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
latestFileSlicesFromFS = filterFileSliceBasedOnInflightCleaning(fsBasedContext.getSortedLatestFileSliceList(partitionPath), baseDataFilesForCleaning);
} else {
latestFileSlicesFromMetadataTable = metadataTableBasedContext.getSortedLatestFileSliceList(partitionPath);
latestFileSlicesFromFS = fsBasedContext.getSortedLatestFileSliceList(partitionPath);
}

LOG.debug("Latest file list from metadata: " + latestFileSlicesFromMetadataTable + ". For partition " + partitionPath);
LOG.debug("Latest file list from direct listing: " + latestFileSlicesFromFS + ". For partition " + partitionPath);
@@ -495,11 +576,31 @@ private void validateLatestFileSlices(
LOG.info("Validation of getLatestFileSlices succeeded for partition " + partitionPath);
}

private List<FileSlice> filterFileSliceBasedOnInflightCleaning(List<FileSlice> sortedLatestFileSliceList, Set<String> baseDataFilesForCleaning) {
return sortedLatestFileSliceList.stream()
.filter(fileSlice -> {
if (!fileSlice.getBaseFile().isPresent()) {
return true;
} else {
return !baseDataFilesForCleaning.contains(fileSlice.getBaseFile().get().getFileName());
}
}).collect(Collectors.toList());
}

private List<HoodieBaseFile> filterBaseFileBasedOnInflightCleaning(List<HoodieBaseFile> sortedBaseFileList, Set<String> baseDataFilesForCleaning) {
return sortedBaseFileList.stream()
.filter(baseFile -> {
return !baseDataFilesForCleaning.contains(baseFile.getFileName());
}).collect(Collectors.toList());
}

private void validateAllColumnStats(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {

List<String> latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
List<HoodieColumnRangeMetadata<String>> metadataBasedColStats = metadataTableBasedContext
.getSortedColumnStatsList(partitionPath, latestBaseFilenameList);
List<HoodieColumnRangeMetadata<String>> fsBasedColStats = fsBasedContext
@@ -512,9 +613,11 @@ private void validateAllColumnStats(

private void validateBloomFilters(
HoodieMetadataValidationContext metadataTableBasedContext,
HoodieMetadataValidationContext fsBasedContext, String partitionPath) {
List<String> latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
HoodieMetadataValidationContext fsBasedContext,
String partitionPath,
Set<String> baseDataFilesForCleaning) {

List<String> latestBaseFilenameList = getLatestBaseFileNames(fsBasedContext, partitionPath, baseDataFilesForCleaning);
List<BloomFilterData> metadataBasedBloomFilters = metadataTableBasedContext
.getSortedBloomFilterList(partitionPath, latestBaseFilenameList);
List<BloomFilterData> fsBasedBloomFilters = fsBasedContext
@@ -525,6 +628,19 @@ private void validateBloomFilters(
LOG.info("Validation of bloom filters succeeded for partition " + partitionPath);
}

private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
List<String> latestBaseFilenameList;
if (!baseDataFilesForCleaning.isEmpty()) {
List<HoodieBaseFile> sortedLatestBaseFileList = fsBasedContext.getSortedLatestBaseFileList(partitionPath);
latestBaseFilenameList = filterBaseFileBasedOnInflightCleaning(sortedLatestBaseFileList, baseDataFilesForCleaning)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
} else {
latestBaseFilenameList = fsBasedContext.getSortedLatestBaseFileList(partitionPath)
.stream().map(BaseFile::getFileName).collect(Collectors.toList());
}
return latestBaseFilenameList;
}

private <T> void validate(
List<T> infoListFromMetadataTable, List<T> infoListFromFS, String partitionPath, String label) {
if (infoListFromMetadataTable.size() != infoListFromFS.size()

0 comments on commit 8ca9a54

Please sign in to comment.