Skip to content
Permalink
Browse files
[HUDI-3635] Fix HoodieMetadataTableValidator around comparison of par…
…tition path listing (#5100)

Co-authored-by: yuezhang <yuezhang@freewheel.tv>
  • Loading branch information
zhangyue19921010 and yuezhang committed Mar 30, 2022
1 parent eae8488 commit 2b60641d17f60fc0d90ad470d6dead95805495fa
Showing 2 changed files with 34 additions and 0 deletions.
@@ -18,6 +18,7 @@

package org.apache.hudi.common.model;

import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieException;

import org.apache.hadoop.fs.FSDataInputStream;
@@ -135,6 +136,23 @@ public void readFromFS() throws IOException {
}
}

/**
* Read out the COMMIT_TIME_KEY metadata for this partition.
*/
public Option<String> readPartitionCreatedCommitTime() {
try {
if (props.containsKey(COMMIT_TIME_KEY)) {
return Option.of(props.getProperty(COMMIT_TIME_KEY));
} else {
readFromFS();
return Option.of(props.getProperty(COMMIT_TIME_KEY));
}
} catch (IOException ioe) {
LOG.warn("Error fetch Hoodie partition metadata for " + partitionPath, ioe);
return Option.empty();
}
}

// methods related to partition meta data
public static boolean hasPartitionMetadata(FileSystem fs, Path partitionPath) {
try {
@@ -32,6 +32,7 @@
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
import org.apache.hudi.common.model.HoodieFileFormat;
import org.apache.hudi.common.model.HoodieFileGroup;
import org.apache.hudi.common.model.HoodiePartitionMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -443,6 +444,21 @@ private void checkMetadataTableIsAvailable() {
private List<String> validatePartitions(HoodieSparkEngineContext engineContext, String basePath) {
// compare partitions
List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, basePath, false, cfg.assumeDatePartitioning);
HoodieTimeline completedTimeline = metaClient.getActiveTimeline().filterCompletedInstants();

// ignore partitions created by uncommitted ingestion.
allPartitionPathsFromFS = allPartitionPathsFromFS.stream().parallel().filter(part -> {
HoodiePartitionMetadata hoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getFs(), new Path(basePath, part));

Option<String> instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime();
if (instantOption.isPresent()) {
String instantTime = instantOption.get();
return completedTimeline.containsOrBeforeTimelineStarts(instantTime);
} else {
return false;
}
}).collect(Collectors.toList());

List<String> allPartitionPathsMeta = FSUtils.getAllPartitionPaths(engineContext, basePath, true, cfg.assumeDatePartitioning);

Collections.sort(allPartitionPathsFromFS);

0 comments on commit 2b60641

Please sign in to comment.