Skip to content

Commit

Permalink
fix comment
Browse files Browse the repository at this point in the history
  • Loading branch information
zhanshaoxiong committed Sep 21, 2022
1 parent 94dce01 commit 6b13901
Showing 1 changed file with 20 additions and 18 deletions.
Expand Up @@ -412,21 +412,22 @@ protected boolean isFileSliceAfterPendingCompaction(FileSlice fileSlice) {
* With async compaction, it is possible to see partial/complete base-files due to inflight-compactions, Ignore those
* base-files.
*
* @param fileSliceStream Stream of FileSlice
* @param includeEmptyFileSlice include empty file-slices
* @param fileSlice File Slice
* @param includeEmptyFileSlice include empty file-slice
*/
protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(Stream<FileSlice> fileSliceStream, boolean includeEmptyFileSlice) {
return fileSliceStream.map(fileSlice -> {
if (isFileSliceAfterPendingCompaction(fileSlice)) {
LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
// Base file is filtered out of the file-slice as the corresponding compaction
// instant not completed yet.
FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
fileSlice.getLogFiles().forEach(transformed::addLogFile);
return transformed;
protected Stream<FileSlice> filterBaseFileAfterPendingCompaction(FileSlice fileSlice, boolean includeEmptyFileSlice) {
if (isFileSliceAfterPendingCompaction(fileSlice)) {
LOG.debug("File Slice (" + fileSlice + ") is in pending compaction");
// Base file is filtered out of the file-slice as the corresponding compaction
// instant not completed yet.
FileSlice transformed = new FileSlice(fileSlice.getPartitionPath(), fileSlice.getBaseInstantTime(), fileSlice.getFileId());
fileSlice.getLogFiles().forEach(transformed::addLogFile);
if (transformed.isEmpty() && !includeEmptyFileSlice) {
return Stream.of();
}
return fileSlice;
}).filter(slice -> includeEmptyFileSlice || !slice.isEmpty());
return Stream.of(transformed);
}
return Stream.of(fileSlice);
}

protected HoodieFileGroup addBootstrapBaseFileIfPresent(HoodieFileGroup fileGroup) {
Expand Down Expand Up @@ -607,9 +608,10 @@ public final Stream<FileSlice> getLatestFileSlices(String partitionStr) {
readLock.lock();
String partitionPath = formatPartitionKey(partitionStr);
ensurePartitionLoadedCorrectly(partitionPath);
return filterBaseFileAfterPendingCompaction(fetchLatestFileSlices(partitionPath)
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId())), true)
.map(this::addBootstrapBaseFileIfPresent);
return fetchLatestFileSlices(partitionPath)
.filter(slice -> !isFileGroupReplaced(slice.getFileGroupId()))
.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, true))
.map(this::addBootstrapBaseFileIfPresent);
} finally {
readLock.unlock();
}
Expand All @@ -631,7 +633,7 @@ public final Option<FileSlice> getLatestFileSlice(String partitionStr, String fi
if (!fs.isPresent()) {
return Option.empty();
}
return Option.ofNullable(filterBaseFileAfterPendingCompaction(Stream.of(fs.get()), true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
return Option.ofNullable(filterBaseFileAfterPendingCompaction(fs.get(), true).map(this::addBootstrapBaseFileIfPresent).findFirst().orElse(null));
}
} finally {
readLock.unlock();
Expand Down Expand Up @@ -673,7 +675,7 @@ public final Stream<FileSlice> getLatestFileSlicesBeforeOrOn(String partitionStr
.filter(slice -> !isFileGroupReplacedBeforeOrOn(slice.getFileGroupId(), maxCommitTime))
.map(fg -> fg.getAllFileSlicesBeforeOn(maxCommitTime));
if (includeFileSlicesInPendingCompaction) {
return allFileSliceStream.map(sliceStream -> this.filterBaseFileAfterPendingCompaction(sliceStream, false))
return allFileSliceStream.map(sliceStream -> sliceStream.flatMap(slice -> this.filterBaseFileAfterPendingCompaction(slice, false)))
.map(sliceStream -> Option.fromJavaOptional(sliceStream.findFirst())).filter(Option::isPresent).map(Option::get)
.map(this::addBootstrapBaseFileIfPresent);
} else {
Expand Down

0 comments on commit 6b13901

Please sign in to comment.