Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieMemoryConfig;
import org.apache.hudi.hadoop.CachingPath;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.avro.Schema;
Expand Down Expand Up @@ -232,9 +233,9 @@ public String showLogFileRecords(
} else {
for (String logFile : logFilePaths) {
Schema writerSchema = new AvroSchemaConverter()
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new Path(logFile))));
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new CachingPath(logFile))));
HoodieLogFormat.Reader reader =
HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema);
HoodieLogFormat.newReader(fs, new HoodieLogFile(new CachingPath(logFile)), writerSchema);
// read the avro blocks
while (reader.hasNext()) {
HoodieLogBlock n = reader.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.table.action.compact.OperationResult;

import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -244,8 +245,8 @@ protected static List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsToAl
merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList());
return logFilesToBeMoved.stream().map(lf -> {
ValidationUtils.checkArgument(lf.getLogVersion() - maxVersion > 0, "Expect new log version to be sane");
HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(),
FSUtils.makeLogFileName(lf.getFileId(), "." + FSUtils.getFileExtensionFromLog(lf.getPath()),
HoodieLogFile newLogFile = new HoodieLogFile(new CachingPath(lf.getPath().getParent(),
FSUtils.makeLogFileName(lf.getFileId(), "." + lf.getFileExtension(),
compactionInstant, lf.getLogVersion() - maxVersion, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
return Pair.of(lf, newLogFile);
}).collect(Collectors.toList());
Expand Down Expand Up @@ -450,7 +451,7 @@ public List<Pair<HoodieLogFile, HoodieLogFile>> getRenamingActionsForUnschedulin
.orElse(fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getPath().getParent().toString()).get());
for (HoodieLogFile toRepair : logFilesToRepair) {
int version = maxUsedVersion + 1;
HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, FSUtils.makeLogFileName(operation.getFileId(),
HoodieLogFile newLf = new HoodieLogFile(new CachingPath(parentPath, FSUtils.makeLogFileName(operation.getFileId(),
logExtn, operation.getBaseInstantTime(), version, HoodieLogFormat.UNKNOWN_WRITE_TOKEN)));
result.add(Pair.of(toRepair, newLf));
maxUsedVersion = version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.data.HoodiePairData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -47,6 +46,7 @@

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
Expand Down Expand Up @@ -214,12 +214,15 @@ protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(

String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp();

List<Pair<String, HoodieBaseFile>> baseFilesForAllPartitions = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable);
// Partition and file name pairs
List<Pair<String, String>> partitionFileNameList =
HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream()
.map(partitionBaseFilePair -> Pair.of(partitionBaseFilePair.getLeft(), partitionBaseFilePair.getRight().getFileName()))
.sorted()
.collect(toList());
List<Pair<String, String>> partitionFileNameList = new ArrayList<>(baseFilesForAllPartitions.size());
Map<Pair<String, String>, String> partitionAndFileNameToFileId = new HashMap<>(baseFilesForAllPartitions.size(), 1);
baseFilesForAllPartitions.forEach(pair -> {
Pair<String, String> partitionAndFileName = Pair.of(pair.getKey(), pair.getValue().getFileName());
partitionFileNameList.add(partitionAndFileName);
partitionAndFileNameToFileId.put(partitionAndFileName, pair.getValue().getFileId());
});

if (partitionFileNameList.isEmpty()) {
return Collections.emptyList();
Expand All @@ -233,7 +236,7 @@ protected List<Pair<String, BloomIndexFileInfo>> loadColumnRangesFromMetaIndex(
for (Map.Entry<Pair<String, String>, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) {
result.add(Pair.of(entry.getKey().getLeft(),
new BloomIndexFileInfo(
FSUtils.getFileId(entry.getKey().getRight()),
partitionAndFileNameToFileId.get(entry.getKey()),
// NOTE: Here we assume that the type of the primary key field is string
(String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()),
(String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo
writeStatus.setStat(new HoodieWriteStat());
try {
String latestValidFilePath = baseFileToMerge.getFileName();
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime());

HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime,
new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
Expand Down Expand Up @@ -471,7 +471,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) {
String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)",
writeStatus.getFileId(), writeStatus.getPartitionPath(),
instantTime, writeStatus.getStat().getNumWrites(), writeStatus.getStat().getNumDeletes(),
FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites));
baseFileToMerge.getCommitTime(), oldNumWrites));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ protected HoodieLogFormat.Writer createLogWriter(
.withSizeThreshold(config.getLogFileMaxSize())
.withFs(fs)
.withRolloverLogWriteToken(writeToken)
.withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken))
.withLogWriteToken(latestLogFile.map(HoodieLogFile::getLogWriteToken).orElse(writeToken))
.withSuffix(suffix)
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hudi.common.function.SerializableFunction;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieFileFormat;
Expand Down Expand Up @@ -164,7 +165,7 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf,
this.engineContext = engineContext;
this.hadoopConf = new SerializableConfiguration(hadoopConf);
this.metrics = Option.empty();
this.enabledPartitionTypes = new ArrayList<>();
this.enabledPartitionTypes = new ArrayList<>(4);

this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build();

Expand Down Expand Up @@ -481,10 +482,10 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
// Collect the list of latest base files present in each partition
List<String> partitions = metadata.getAllPartitionPaths();
fsView.loadAllPartitions();
final List<Pair<String, String>> partitionBaseFilePairs = new ArrayList<>();
final List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = new ArrayList<>();
for (String partition : partitions) {
partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition)
.map(basefile -> Pair.of(partition, basefile.getFileName())).collect(Collectors.toList()));
.map(basefile -> Pair.of(partition, basefile)).collect(Collectors.toList()));
}

LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in "
Expand All @@ -509,21 +510,22 @@ private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition()
* Read the record keys from base files in partitions and return records.
*/
private HoodieData<HoodieRecord> readRecordKeysFromBaseFiles(HoodieEngineContext engineContext,
List<Pair<String, String>> partitionBaseFilePairs,
List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs,
boolean forDelete) {
if (partitionBaseFilePairs.isEmpty()) {
return engineContext.emptyHoodieData();
}

engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: reading record keys from " + partitionBaseFilePairs.size() + " base files");
final int parallelism = Math.min(partitionBaseFilePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
return engineContext.parallelize(partitionBaseFilePairs, parallelism).flatMap(p -> {
final String partition = p.getKey();
final String filename = p.getValue();
return engineContext.parallelize(partitionBaseFilePairs, parallelism).flatMap(partitionAndBaseFile -> {
final String partition = partitionAndBaseFile.getKey();
final HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
final String filename = baseFile.getFileName();
Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + Path.SEPARATOR + filename);

final String fileId = FSUtils.getFileId(filename);
final String instantTime = FSUtils.getCommitTime(filename);
final String fileId = baseFile.getFileId();
final String instantTime = baseFile.getCommitTime();
HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(), dataFilePath);
ClosableIterator<String> recordKeyIterator = reader.getRecordKeyIterator();

Expand Down Expand Up @@ -1370,10 +1372,10 @@ private HoodieData<HoodieRecord> getRecordIndexUpdates(HoodieData<WriteStatus> w
private HoodieData<HoodieRecord> getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) {
final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
List<Pair<String, String>> partitionBaseFilePairs = replaceCommitMetadata
List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = replaceCommitMetadata
.getPartitionToReplaceFileIds()
.keySet().stream().flatMap(partition
-> fsView.getLatestBaseFiles(partition).map(f -> Pair.of(partition, f.getFileName())))
-> fsView.getLatestBaseFiles(partition).map(f -> Pair.of(partition, f)))
.collect(Collectors.toList());

return readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,9 +253,8 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {

for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.location = new HoodieRecordLocation(file.getCommitTime(), file.getFileId());
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.config.SerializableConfiguration;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
Expand Down Expand Up @@ -84,7 +83,7 @@ public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, HoodieB
@Override
protected List<HoodieKeyLookupResult> computeNext() {
// Partition path and file name pair to list of keys
final Map<Pair<String, String>, HoodieBloomFilterProbingResult> fileToLookupResults = new HashMap<>();
final Map<Pair<String, HoodieBaseFile>, HoodieBloomFilterProbingResult> fileToLookupResults = new HashMap<>();
final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();

while (inputItr.hasNext()) {
Expand All @@ -103,7 +102,7 @@ protected List<HoodieKeyLookupResult> computeNext() {
fileIDBaseFileMap.put(fileId, baseFile.get());
}

fileToLookupResults.putIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()), entry._2);
fileToLookupResults.putIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId)), entry._2);

if (fileToLookupResults.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
break;
Expand All @@ -116,12 +115,11 @@ protected List<HoodieKeyLookupResult> computeNext() {

return fileToLookupResults.entrySet().stream()
.map(entry -> {
Pair<String, String> partitionPathFileNamePair = entry.getKey();
Pair<String, HoodieBaseFile> partitionPathFileNamePair = entry.getKey();
HoodieBloomFilterProbingResult bloomFilterKeyLookupResult = entry.getValue();

final String partitionPath = partitionPathFileNamePair.getLeft();
final String fileName = partitionPathFileNamePair.getRight();
final String fileId = FSUtils.getFileId(fileName);
final String fileId = partitionPathFileNamePair.getRight().getFileId();
ValidationUtils.checkState(!fileId.isEmpty());

List<String> candidateRecordKeys = bloomFilterKeyLookupResult.getCandidateKeys();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.hudi.client.utils.LazyIterableIterator;
import org.apache.hudi.common.bloom.BloomFilter;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.model.HoodieKey;
Expand All @@ -44,6 +43,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import scala.Tuple2;

Expand Down Expand Up @@ -93,7 +93,7 @@ public BloomIndexLazyKeyCheckIterator(Iterator<Tuple2<HoodieFileGroupId, String>
@Override
protected Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> computeNext() {
// Partition path and file name pair to list of keys
final Map<Pair<String, String>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
final Map<Pair<String, HoodieBaseFile>, List<HoodieKey>> fileToKeysMap = new HashMap<>();
final Map<String, HoodieBaseFile> fileIDBaseFileMap = new HashMap<>();

while (inputItr.hasNext()) {
Expand All @@ -110,7 +110,7 @@ protected Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> co
fileIDBaseFileMap.put(fileId, baseFile.get());
}

fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()),
fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId)),
k -> new ArrayList<>()).add(new HoodieKey(entry._2, partitionPath));

if (fileToKeysMap.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) {
Expand All @@ -122,20 +122,19 @@ protected Iterator<Tuple2<HoodieFileGroupId, HoodieBloomFilterProbingResult>> co
return Collections.emptyIterator();
}

List<Pair<String, String>> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet());
List<Pair<String, String>> partitionNameFileNameList = fileToKeysMap.keySet().stream().map(pair -> Pair.of(pair.getLeft(), pair.getRight().getFileName())).collect(Collectors.toList());
Map<Pair<String, String>, BloomFilter> fileToBloomFilterMap =
hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList);

return fileToKeysMap.entrySet().stream()
.map(entry -> {
Pair<String, String> partitionPathFileNamePair = entry.getKey();
List<HoodieKey> hoodieKeyList = entry.getValue();

final String partitionPath = partitionPathFileNamePair.getLeft();
final String fileName = partitionPathFileNamePair.getRight();
final String fileId = FSUtils.getFileId(fileName);
final String partitionPath = entry.getKey().getLeft();
final HoodieBaseFile baseFile = entry.getKey().getRight();
final String fileId = baseFile.getFileId();
ValidationUtils.checkState(!fileId.isEmpty());

Pair<String, String> partitionPathFileNamePair = Pair.of(partitionPath, baseFile.getFileName());
if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) {
throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,8 @@ protected List<SmallFile> getSmallFiles(String partitionPath) {

for (HoodieBaseFile file : allFiles) {
if (file.getFileSize() < config.getParquetSmallFileLimit()) {
String filename = file.getFileName();
SmallFile sf = new SmallFile();
sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename));
sf.location = new HoodieRecordLocation(file.getCommitTime(), file.getFileId());
sf.sizeBytes = file.getFileSize();
smallFileLocations.add(sf);
}
Expand Down
Loading