Skip to content
Permalink
Browse files
[HUDI-3421]Pending clustering may break AbstractTableFileSystemView#g…
…etxxBaseFile() (#4810)
  • Loading branch information
zhangyue19921010 committed Feb 25, 2022
1 parent a4ee746 commit 742810070bf43bbc13f22ee5514dd997c27b1db2
Showing 4 changed files with 258 additions and 12 deletions.
@@ -373,23 +373,23 @@ public void testUpsertPartitionerWithSmallFileHandlingAndClusteringPlan() throws
.setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build();
FileCreateUtils.createRequestedReplaceCommit(basePath,"002", Option.of(requestedReplaceMetadata));

// create file slice 002
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "002", "2", 1);
FileCreateUtils.createCommit(basePath, "002");
// create file slice 003
FileCreateUtils.createBaseFile(basePath, testPartitionPath, "003", "3", 1);
FileCreateUtils.createCommit(basePath, "003");

metaClient = HoodieTableMetaClient.reload(metaClient);

// generate new data to be ingested
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(new String[] {testPartitionPath});
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("003", 100);
List<HoodieRecord> insertRecords = dataGenerator.generateInserts("004", 100);
WorkloadProfile profile = new WorkloadProfile(buildProfile(jsc.parallelize(insertRecords)));

HoodieSparkTable table = HoodieSparkTable.create(config, context, metaClient);
// create UpsertPartitioner
UpsertPartitioner partitioner = new UpsertPartitioner(profile, context, table, config);

// for now we have file slice1 and file slice2 and file slice1 is contained in pending clustering plan
// So that only file slice2 can be used for ingestion.
// for now we have file slice1 and file slice3 and file slice1 is contained in pending clustering plan
// So that only file slice3 can be used for ingestion.
assertEquals(1, partitioner.smallFiles.size(), "Should have 1 small file to be ingested.");
}

@@ -380,6 +380,19 @@ protected boolean isBaseFileDueToPendingCompaction(HoodieBaseFile baseFile) {
&& baseFile.getCommitTime().equals(compactionWithInstantTime.get().getKey());
}

/**
* With async clustering, it is possible to see partial/complete base-files due to inflight-clustering, Ignore those
* base-files.
*
* @param baseFile base File
*/
protected boolean isBaseFileDueToPendingClustering(HoodieBaseFile baseFile) {
List<String> pendingReplaceInstants =
metaClient.getActiveTimeline().filterPendingReplaceTimeline().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toList());

return !pendingReplaceInstants.isEmpty() && pendingReplaceInstants.contains(baseFile.getCommitTime());
}

/**
* Returns true if the file-group is under pending-compaction and the file-slice' baseInstant matches compaction
* Instant.
@@ -492,7 +505,7 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesBeforeOrOn(String partitio
.map(fileGroup -> Option.fromJavaOptional(fileGroup.getAllBaseFiles()
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.LESSER_THAN_OR_EQUALS, maxCommitTime
))
.filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst()))
.filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst()))
.filter(Option::isPresent).map(Option::get)
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
} finally {
@@ -511,7 +524,7 @@ public final Option<HoodieBaseFile> getBaseFileOn(String partitionStr, String in
} else {
return fetchHoodieFileGroup(partitionPath, fileId).map(fileGroup -> fileGroup.getAllBaseFiles()
.filter(baseFile -> HoodieTimeline.compareTimestamps(baseFile.getCommitTime(), HoodieTimeline.EQUALS,
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst().orElse(null))
instantTime)).filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst().orElse(null))
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, fileId), df));
}
} finally {
@@ -547,7 +560,7 @@ public final Stream<HoodieBaseFile> getLatestBaseFilesInRange(List<String> commi
.filter(fileGroup -> !isFileGroupReplacedBeforeAny(fileGroup.getFileGroupId(), commitsToReturn))
.map(fileGroup -> Pair.of(fileGroup.getFileGroupId(), Option.fromJavaOptional(
fileGroup.getAllBaseFiles().filter(baseFile -> commitsToReturn.contains(baseFile.getCommitTime())
&& !isBaseFileDueToPendingCompaction(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
&& !isBaseFileDueToPendingCompaction(baseFile) && !isBaseFileDueToPendingClustering(baseFile)).findFirst()))).filter(p -> p.getValue().isPresent())
.map(p -> addBootstrapBaseFileIfPresent(p.getKey(), p.getValue().get()));
} finally {
readLock.unlock();
@@ -563,7 +576,7 @@ public final Stream<HoodieBaseFile> getAllBaseFiles(String partitionStr) {
return fetchAllBaseFiles(partitionPath)
.filter(df -> !isFileGroupReplaced(partitionPath, df.getFileId()))
.filter(df -> visibleCommitsAndCompactionTimeline.containsOrBeforeTimelineStarts(df.getCommitTime()))
.filter(df -> !isBaseFileDueToPendingCompaction(df))
.filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df))
.map(df -> addBootstrapBaseFileIfPresent(new HoodieFileGroupId(partitionPath, df.getFileId()), df));
} finally {
readLock.unlock();
@@ -953,7 +966,7 @@ public Stream<HoodieBaseFile> fetchLatestBaseFiles(final String partitionPath) {

protected Option<HoodieBaseFile> getLatestBaseFile(HoodieFileGroup fileGroup) {
return Option
.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df)).findFirst());
.fromJavaOptional(fileGroup.getAllBaseFiles().filter(df -> !isBaseFileDueToPendingCompaction(df) && !isBaseFileDueToPendingClustering(df)).findFirst());
}

/**
@@ -31,6 +31,7 @@
import org.apache.hudi.common.bootstrap.index.BootstrapIndex.IndexWriter;
import org.apache.hudi.common.bootstrap.index.HFileBootstrapIndex;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.BaseFile;
import org.apache.hudi.common.model.BootstrapFileMapping;
import org.apache.hudi.common.model.CompactionOperation;
@@ -41,6 +42,7 @@
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -50,12 +52,15 @@
import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView;
import org.apache.hudi.common.table.view.TableFileSystemView.SliceView;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.testutils.HoodieTestTable;
import org.apache.hudi.common.testutils.HoodieTestUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.CompactionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ImmutablePair;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.junit.jupiter.api.BeforeEach;
@@ -1537,6 +1542,234 @@ public void testPendingClusteringOperations() throws IOException {
assertFalse(fileIds.contains(fileId3));
}

/**
*
* create hoodie table like
* .
* ├── .hoodie
* │   ├── .aux
* │   │   └── .bootstrap
* │   │   ├── .fileids
* │   │   └── .partitions
* │   ├── .temp
* │   ├── 1.commit
* │   ├── 1.commit.requested
* │   ├── 1.inflight
* │   ├── 2.replacecommit
* │   ├── 2.replacecommit.inflight
* │   ├── 2.replacecommit.requested
* │   ├── 3.commit
* │   ├── 3.commit.requested
* │   ├── 3.inflight
* │   ├── archived
* │   └── hoodie.properties
* └── 2020
* └── 06
* └── 27
* ├── 5fe477d2-0150-46d4-833c-1e9cc8da9948_1-0-1_3.parquet
* ├── 7e3208c8-fdec-4254-9682-8fff1e51ee8d_1-0-1_2.parquet
* ├── e04b0e2d-1467-46b2-8ea6-f4fe950965a5_1-0-1_1.parquet
* └── f3936b66-b3db-4fc8-a6d0-b1a7559016e6_1-0-1_1.parquet
*
* First test fsView API with finished clustering:
* 1. getLatestBaseFilesBeforeOrOn
* 2. getBaseFileOn
* 3. getLatestBaseFilesInRange
* 4. getAllBaseFiles
* 5. getLatestBaseFiles
*
* Then remove 2.replacecommit, 1.commit, 1.commit.requested, 1.inflight to simulate
* pending clustering at the earliest position in the active timeline and test these APIs again.
*
* @throws IOException
*/
@Test
public void testHoodieTableFileSystemViewWithPendingClustering() throws IOException {
List<String> latestBaseFilesBeforeOrOn;
Option<HoodieBaseFile> baseFileOn;
List<String> latestBaseFilesInRange;
List<String> allBaseFiles;
List<String> latestBaseFiles;
List<String> latestBaseFilesPerPartition;
String partitionPath = "2020/06/27";
new File(basePath + "/" + partitionPath).mkdirs();
HoodieActiveTimeline commitTimeline = metaClient.getActiveTimeline();

// will create 5 fileId in partition.
// fileId1 and fileId2 will be replaced by fileID3
// fileId4 and fileId5 will be committed after clustering finished.
String fileId1 = UUID.randomUUID().toString();
String fileId2 = UUID.randomUUID().toString();
String fileId3 = UUID.randomUUID().toString();
String fileId4 = UUID.randomUUID().toString();
String fileId5 = UUID.randomUUID().toString();

assertFalse(roView.getLatestBaseFiles(partitionPath)
.anyMatch(dfile -> dfile.getFileId().equals(fileId1)
|| dfile.getFileId().equals(fileId2)
|| dfile.getFileId().equals(fileId3)
|| dfile.getFileId().equals(fileId4)
|| dfile.getFileId().equals(fileId5)),
"No commit, should not find any data file");

// first insert commit
String commitTime1 = "1";
String fileName1 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId1);
String fileName2 = FSUtils.makeDataFileName(commitTime1, TEST_WRITE_TOKEN, fileId2);
new File(basePath + "/" + partitionPath + "/" + fileName1).createNewFile();
new File(basePath + "/" + partitionPath + "/" + fileName2).createNewFile();

HoodieInstant instant1 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime1);

// build writeStats
HashMap<String, List<String>> partitionToFile1 = new HashMap<>();
ArrayList<String> files1 = new ArrayList<>();
files1.add(fileId1);
files1.add(fileId2);
partitionToFile1.put(partitionPath, files1);
List<HoodieWriteStat> writeStats1 = buildWriteStats(partitionToFile1, commitTime1);

HoodieCommitMetadata commitMetadata1 =
CommitUtils.buildMetadata(writeStats1, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION);
saveAsComplete(commitTimeline, instant1, Option.of(commitMetadata1.toJsonString().getBytes(StandardCharsets.UTF_8)));
commitTimeline.reload();

// replace commit
String commitTime2 = "2";
String fileName3 = FSUtils.makeDataFileName(commitTime2, TEST_WRITE_TOKEN, fileId3);
new File(basePath + "/" + partitionPath + "/" + fileName3).createNewFile();

HoodieInstant instant2 = new HoodieInstant(true, HoodieTimeline.REPLACE_COMMIT_ACTION, commitTime2);
Map<String, List<String>> partitionToReplaceFileIds = new HashMap<>();
List<String> replacedFileIds = new ArrayList<>();
replacedFileIds.add(fileId1);
replacedFileIds.add(fileId2);
partitionToReplaceFileIds.put(partitionPath, replacedFileIds);

HashMap<String, List<String>> partitionToFile2 = new HashMap<>();
ArrayList<String> files2 = new ArrayList<>();
files2.add(fileId3);
partitionToFile2.put(partitionPath, files2);
List<HoodieWriteStat> writeStats2 = buildWriteStats(partitionToFile2, commitTime2);

HoodieCommitMetadata commitMetadata2 =
CommitUtils.buildMetadata(writeStats2, partitionToReplaceFileIds, Option.empty(), WriteOperationType.INSERT_OVERWRITE, "", HoodieTimeline.REPLACE_COMMIT_ACTION);
saveAsComplete(commitTimeline, instant2, Option.of(commitMetadata2.toJsonString().getBytes(StandardCharsets.UTF_8)));

// another insert commit
String commitTime3 = "3";
String fileName4 = FSUtils.makeDataFileName(commitTime3, TEST_WRITE_TOKEN, fileId4);
new File(basePath + "/" + partitionPath + "/" + fileName4).createNewFile();
HoodieInstant instant3 = new HoodieInstant(true, HoodieTimeline.COMMIT_ACTION, commitTime3);

// build writeStats
HashMap<String, List<String>> partitionToFile3 = new HashMap<>();
ArrayList<String> files3 = new ArrayList<>();
files3.add(fileId4);
partitionToFile3.put(partitionPath, files3);
List<HoodieWriteStat> writeStats3 = buildWriteStats(partitionToFile3, commitTime3);
HoodieCommitMetadata commitMetadata3 =
CommitUtils.buildMetadata(writeStats3, new HashMap<>(), Option.empty(), WriteOperationType.INSERT, "", HoodieTimeline.COMMIT_ACTION);
saveAsComplete(commitTimeline, instant3, Option.of(commitMetadata3.toJsonString().getBytes(StandardCharsets.UTF_8)));

metaClient.reloadActiveTimeline();
refreshFsView();

ArrayList<String> commits = new ArrayList<>();
commits.add(commitTime1);
commits.add(commitTime2);
commits.add(commitTime3);

// do check
latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, latestBaseFilesBeforeOrOn.size());
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId3));
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4));

// could see fileId3 because clustering is committed.
baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3);
assertTrue(baseFileOn.isPresent());
assertEquals(baseFileOn.get().getFileId(), fileId3);

latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, latestBaseFilesInRange.size());
assertTrue(latestBaseFilesInRange.contains(fileId3));
assertTrue(latestBaseFilesInRange.contains(fileId4));

allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, allBaseFiles.size());
assertTrue(allBaseFiles.contains(fileId3));
assertTrue(allBaseFiles.contains(fileId4));

// could see fileId3 because clustering is committed.
latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, latestBaseFiles.size());
assertTrue(allBaseFiles.contains(fileId3));
assertTrue(allBaseFiles.contains(fileId4));

// could see fileId3 because clustering is committed.
latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(2, latestBaseFiles.size());
assertTrue(latestBaseFilesPerPartition.contains(fileId3));
assertTrue(latestBaseFilesPerPartition.contains(fileId4));

HoodieWrapperFileSystem fs = metaClient.getFs();
fs.delete(new Path(basePath + "/.hoodie", "1.commit"), false);
fs.delete(new Path(basePath + "/.hoodie", "1.inflight"), false);
fs.delete(new Path(basePath + "/.hoodie", "1.commit.requested"), false);
fs.delete(new Path(basePath + "/.hoodie", "2.replacecommit"), false);

metaClient.reloadActiveTimeline();
refreshFsView();
// do check after delete some commit file
latestBaseFilesBeforeOrOn = fsView.getLatestBaseFilesBeforeOrOn(partitionPath, commitTime3).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, latestBaseFilesBeforeOrOn.size());
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId1));
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId2));
assertTrue(latestBaseFilesBeforeOrOn.contains(fileId4));

// couldn't see fileId3 because clustering is not committed.
baseFileOn = fsView.getBaseFileOn(partitionPath, commitTime2, fileId3);
assertFalse(baseFileOn.isPresent());

latestBaseFilesInRange = fsView.getLatestBaseFilesInRange(commits).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, latestBaseFilesInRange.size());
assertTrue(latestBaseFilesInRange.contains(fileId1));
assertTrue(latestBaseFilesInRange.contains(fileId2));
assertTrue(latestBaseFilesInRange.contains(fileId4));

allBaseFiles = fsView.getAllBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, allBaseFiles.size());
assertTrue(allBaseFiles.contains(fileId1));
assertTrue(allBaseFiles.contains(fileId2));
assertTrue(allBaseFiles.contains(fileId4));

// couldn't see fileId3 because clustering is not committed.
latestBaseFiles = fsView.getLatestBaseFiles().map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, latestBaseFiles.size());
assertTrue(allBaseFiles.contains(fileId1));
assertTrue(allBaseFiles.contains(fileId2));
assertTrue(allBaseFiles.contains(fileId4));

// couldn't see fileId3 because clustering is not committed.
latestBaseFilesPerPartition = fsView.getLatestBaseFiles(partitionPath).map(HoodieBaseFile::getFileId).collect(Collectors.toList());
assertEquals(3, latestBaseFiles.size());
assertTrue(latestBaseFilesPerPartition.contains(fileId1));
assertTrue(latestBaseFilesPerPartition.contains(fileId2));
assertTrue(latestBaseFilesPerPartition.contains(fileId4));
}


// Generate Hoodie WriteStat For Given Partition
private List<HoodieWriteStat> buildWriteStats(HashMap<String, List<String>> partitionToFileIds, String commitTime) {
HashMap<String, List<Pair<String, Integer>>> maps = new HashMap<>();
for (String partition : partitionToFileIds.keySet()) {
List<Pair<String, Integer>> list = partitionToFileIds.get(partition).stream().map(fileId -> new ImmutablePair<String, Integer>(fileId, 0)).collect(Collectors.toList());
maps.put(partition, list);
}
return HoodieTestTable.generateHoodieWriteStatForPartition(maps, commitTime, false);
}

@Override
protected HoodieTableType getTableType() {
return HoodieTableType.MERGE_ON_READ;
@@ -1044,7 +1044,7 @@ private static HoodieTestTableState getTestTableStateWithPartitionFileInfo(Write
return testTableState;
}

private static List<HoodieWriteStat> generateHoodieWriteStatForPartition(Map<String, List<Pair<String, Integer>>> partitionToFileIdMap,
public static List<HoodieWriteStat> generateHoodieWriteStatForPartition(Map<String, List<Pair<String, Integer>>> partitionToFileIdMap,
String commitTime, boolean bootstrap) {
List<HoodieWriteStat> writeStats = new ArrayList<>();
for (Map.Entry<String, List<Pair<String, Integer>>> entry : partitionToFileIdMap.entrySet()) {

0 comments on commit 7428100

Please sign in to comment.