Skip to content

Commit

Permalink
[HUDI-2938] Metadata table util to get latest file slices for reader/…
Browse files Browse the repository at this point in the history
…writers (#4218)
  • Loading branch information
manojpec committed Dec 12, 2021
1 parent 15444c9 commit b22c2c6
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,6 @@ protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String
return;
}


// Trigger compaction with suffixes based on the same instant time. This ensures that any future
// delta commits synced over will not have an instant time lesser than the last completed instant on the
// metadata table.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
* The record is tagged with respective file slice's location based on its record key.
*/
private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false);
List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));

return records.stream().map(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ protected void commit(HoodieData<HoodieRecord> hoodieDataRecords, String partiti
* The record is tagged with respective file slice's location based on its record key.
*/
private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) {
List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false);
List<FileSlice> fileSlices = HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, partitionName);
ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups));

return recordsRDD.map(r -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,8 @@ public void testVirtualKeysInBaseFiles(boolean populateMetaFields) throws Except
});
}


/**
* Tests that virtual key configs are honored in base files after compaction in metadata table.
*
*/
@ParameterizedTest
@ValueSource(booleans = {true, false})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> openReadersI

// Metadata is in sync till the latest completed instant on the dataset
HoodieTimer timer = new HoodieTimer().startTimer();
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, true);
List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, partitionName);
if (latestFileSlices.size() == 0) {
// empty partition
return Pair.of(null, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public static List<HoodieRecord> convertMetadataToRecords(HoodieCleanMetadata cl
* @return a list of metadata table records
*/
public static List<HoodieRecord> convertMetadataToRecords(HoodieActiveTimeline metadataTableTimeline,
HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
HoodieRestoreMetadata restoreMetadata, String instantTime, Option<String> lastSyncTs) {
Map<String, Map<String, Long>> partitionToAppendedFiles = new HashMap<>();
Map<String, List<String>> partitionToDeletedFiles = new HashMap<>();
restoreMetadata.getHoodieRestoreMetadata().values().forEach(rms -> {
Expand Down Expand Up @@ -338,29 +338,65 @@ public static int mapRecordKeyToFileGroupIndex(String recordKey, int numFileGrou
}

/**
* Loads the list of file groups for a partition of the Metadata Table with latest file slices.
* Get the latest file slices for a Metadata Table partition. If the file slice is
* because of pending compaction instant, then merge the file slice with the one
* just before the compaction instant time. The list of file slices returned is
* sorted in the correct order of file group name.
*
* The list of file slices returned is sorted in the correct order of file group name.
* @param metaClient instance of {@link HoodieTableMetaClient}.
* @param partition The name of the partition whose file groups are to be loaded.
* @param isReader true if reader code path, false otherwise.
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @return List of latest file slices for all file groups in a given partition.
*/
public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition, boolean isReader) {
LOG.info("Loading file groups for metadata table partition " + partition);
public static List<FileSlice> getPartitionLatestMergedFileSlices(HoodieTableMetaClient metaClient, String partition) {
LOG.info("Loading latest merged file slices for metadata table partition " + partition);
return getPartitionFileSlices(metaClient, partition, true);
}

/**
* Get the latest file slices for a Metadata Table partition. The list of file slices
* returned is sorted in the correct order of file group name.
*
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @return List of latest file slices for all file groups in a given partition.
*/
public static List<FileSlice> getPartitionLatestFileSlices(HoodieTableMetaClient metaClient, String partition) {
LOG.info("Loading latest file slices for metadata table partition " + partition);
return getPartitionFileSlices(metaClient, partition, false);
}

// If there are no commits on the metadata table then the table's default FileSystemView will not return any file
// slices even though we may have initialized them.
/**
* Get the latest file slices for a given partition.
*
* @param metaClient - Instance of {@link HoodieTableMetaClient}.
* @param partition - The name of the partition whose file groups are to be loaded.
* @param mergeFileSlices - When enabled, will merge the latest file slices with the last known
* completed instant. This is useful for readers when there are pending
* compactions. MergeFileSlices when disabled, will return the latest file
* slices without any merging, and this is needed for the writers.
* @return List of latest file slices for all file groups in a given partition.
*/
private static List<FileSlice> getPartitionFileSlices(HoodieTableMetaClient metaClient, String partition,
boolean mergeFileSlices) {
// If there are no commits on the metadata table then the table's
// default FileSystemView will not return any file slices even
// though we may have initialized them.
HoodieTimeline timeline = metaClient.getActiveTimeline();
if (timeline.empty()) {
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime());
final HoodieInstant instant = new HoodieInstant(false, HoodieTimeline.DELTA_COMMIT_ACTION,
HoodieActiveTimeline.createNewInstantTime());
timeline = new HoodieDefaultTimeline(Arrays.asList(instant).stream(), metaClient.getActiveTimeline()::getInstantDetails);
}

HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline);
Stream<FileSlice> fileSliceStream = isReader ? fsView.getLatestMergedFileSlicesBeforeOrOn(partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) :
fsView.getLatestFileSlices(partition);
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId()))
.collect(Collectors.toList());
Stream<FileSlice> fileSliceStream;
if (mergeFileSlices) {
fileSliceStream = fsView.getLatestMergedFileSlicesBeforeOrOn(
partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp());
} else {
fileSliceStream = fsView.getLatestFileSlices(partition);
}
return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())).collect(Collectors.toList());
}

}

0 comments on commit b22c2c6

Please sign in to comment.