From df46bc4da9c5a53752c105cfcd6906abc874cef7 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Tue, 1 Aug 2023 13:14:55 -0700 Subject: [PATCH 01/14] change to method usage --- .../org/apache/hudi/io/HoodieMergeHandle.java | 4 ++-- .../HoodieBackedTableMetadataWriter.java | 24 ++++++++++--------- .../action/commit/JavaUpsertPartitioner.java | 3 +-- .../bloom/HoodieFileProbingFunction.java | 10 ++++---- ...dieMetadataBloomFilterProbingFunction.java | 13 +++++----- .../action/commit/UpsertPartitioner.java | 3 +-- .../SparkUpsertDeltaCommitPartitioner.java | 12 +++++----- .../profile/DeltaWriteProfile.java | 10 ++++---- .../partitioner/profile/WriteProfile.java | 4 +--- 9 files changed, 38 insertions(+), 45 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 7aa357b75048e..21c0059474e86 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -173,7 +173,7 @@ private void init(String fileId, String partitionPath, HoodieBaseFile baseFileTo writeStatus.setStat(new HoodieWriteStat()); try { String latestValidFilePath = baseFileToMerge.getFileName(); - writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath)); + writeStatus.getStat().setPrevCommit(baseFileToMerge.getCommitTime()); HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(fs, instantTime, new Path(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath), @@ -471,7 +471,7 @@ public void performMergeDataValidationCheck(WriteStatus writeStatus) { String.format("Record write count decreased for file: %s, Partition Path: %s (%s:%d + %d < %s:%d)", writeStatus.getFileId(), writeStatus.getPartitionPath(), instantTime, writeStatus.getStat().getNumWrites(), writeStatus.getStat().getNumDeletes(), - FSUtils.getCommitTime(oldFilePath.toString()), oldNumWrites)); + baseFileToMerge.getCommitTime(), oldNumWrites)); } } diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 83e6d8de6ead3..55bc466156092 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -35,6 +35,7 @@ import org.apache.hudi.common.function.SerializableFunction; import org.apache.hudi.common.model.DeleteRecord; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieFileFormat; @@ -164,7 +165,7 @@ protected HoodieBackedTableMetadataWriter(Configuration hadoopConf, this.engineContext = engineContext; this.hadoopConf = new SerializableConfiguration(hadoopConf); this.metrics = Option.empty(); - this.enabledPartitionTypes = new ArrayList<>(); + this.enabledPartitionTypes = new ArrayList<>(4); this.dataMetaClient = HoodieTableMetaClient.builder().setConf(hadoopConf).setBasePath(dataWriteConfig.getBasePath()).build(); @@ -481,10 +482,10 @@ private Pair> initializeRecordIndexPartition() // Collect the list of latest base files present in each partition List partitions = metadata.getAllPartitionPaths(); fsView.loadAllPartitions(); - final List> partitionBaseFilePairs = new ArrayList<>(); + final List> partitionBaseFilePairs = new ArrayList<>(); for (String partition : partitions) { partitionBaseFilePairs.addAll(fsView.getLatestBaseFiles(partition) - .map(basefile -> Pair.of(partition, basefile.getFileName())).collect(Collectors.toList())); + .map(basefile -> Pair.of(partition, basefile)).collect(Collectors.toList())); } LOG.info("Initializing record index from " + partitionBaseFilePairs.size() + " base files in " @@ -509,7 +510,7 @@ private Pair> initializeRecordIndexPartition() * Read the record keys from base files in partitions and return records. */ private HoodieData readRecordKeysFromBaseFiles(HoodieEngineContext engineContext, - List> partitionBaseFilePairs, + List> partitionBaseFilePairs, boolean forDelete) { if (partitionBaseFilePairs.isEmpty()) { return engineContext.emptyHoodieData(); @@ -517,13 +518,14 @@ private HoodieData readRecordKeysFromBaseFiles(HoodieEngineContext engineContext.setJobStatus(this.getClass().getSimpleName(), "Record Index: reading record keys from " + partitionBaseFilePairs.size() + " base files"); final int parallelism = Math.min(partitionBaseFilePairs.size(), dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); - return engineContext.parallelize(partitionBaseFilePairs, parallelism).flatMap(p -> { - final String partition = p.getKey(); - final String filename = p.getValue(); + return engineContext.parallelize(partitionBaseFilePairs, parallelism).flatMap(partitionAndBaseFile -> { + final String partition = partitionAndBaseFile.getKey(); + final HoodieBaseFile baseFile = partitionAndBaseFile.getValue(); + final String filename = baseFile.getFileName(); Path dataFilePath = new Path(dataWriteConfig.getBasePath(), partition + Path.SEPARATOR + filename); - final String fileId = FSUtils.getFileId(filename); - final String instantTime = FSUtils.getCommitTime(filename); + final String fileId = baseFile.getFileId(); + final String instantTime = baseFile.getCommitTime(); HoodieFileReader reader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(hadoopConf.get(), dataFilePath); ClosableIterator recordKeyIterator = reader.getRecordKeyIterator(); @@ -1370,10 +1372,10 @@ private HoodieData getRecordIndexUpdates(HoodieData w private HoodieData getRecordIndexReplacedRecords(HoodieReplaceCommitMetadata replaceCommitMetadata) { final HoodieMetadataFileSystemView fsView = new HoodieMetadataFileSystemView(dataMetaClient, dataMetaClient.getActiveTimeline(), metadata); - List> partitionBaseFilePairs = replaceCommitMetadata + List> partitionBaseFilePairs = replaceCommitMetadata .getPartitionToReplaceFileIds() .keySet().stream().flatMap(partition - -> fsView.getLatestBaseFiles(partition).map(f -> Pair.of(partition, f.getFileName()))) + -> fsView.getLatestBaseFiles(partition).map(f -> Pair.of(partition, f))) .collect(Collectors.toList()); return readRecordKeysFromBaseFiles(engineContext, partitionBaseFilePairs, true); diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java index 1c8ba1b175d5d..ad7111d70a2a9 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/JavaUpsertPartitioner.java @@ -253,9 +253,8 @@ protected List getSmallFiles(String partitionPath) { for (HoodieBaseFile file : allFiles) { if (file.getFileSize() < config.getParquetSmallFileLimit()) { - String filename = file.getFileName(); SmallFile sf = new SmallFile(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.location = new HoodieRecordLocation(file.getCommitTime(), file.getFileId()); sf.sizeBytes = file.getFileSize(); smallFileLocations.add(sf); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java index 43d3a8a8e2e2b..2b6a96b3d05a7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieFileProbingFunction.java @@ -20,7 +20,6 @@ import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.config.SerializableConfiguration; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; @@ -84,7 +83,7 @@ public BloomIndexLazyKeyCheckIterator(Iterator computeNext() { // Partition path and file name pair to list of keys - final Map, HoodieBloomFilterProbingResult> fileToLookupResults = new HashMap<>(); + final Map, HoodieBloomFilterProbingResult> fileToLookupResults = new HashMap<>(); final Map fileIDBaseFileMap = new HashMap<>(); while (inputItr.hasNext()) { @@ -103,7 +102,7 @@ protected List computeNext() { fileIDBaseFileMap.put(fileId, baseFile.get()); } - fileToLookupResults.putIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()), entry._2); + fileToLookupResults.putIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId)), entry._2); if (fileToLookupResults.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) { break; @@ -116,12 +115,11 @@ protected List computeNext() { return fileToLookupResults.entrySet().stream() .map(entry -> { - Pair partitionPathFileNamePair = entry.getKey(); + Pair partitionPathFileNamePair = entry.getKey(); HoodieBloomFilterProbingResult bloomFilterKeyLookupResult = entry.getValue(); final String partitionPath = partitionPathFileNamePair.getLeft(); - final String fileName = partitionPathFileNamePair.getRight(); - final String fileId = FSUtils.getFileId(fileName); + final String fileId = partitionPathFileNamePair.getRight().getFileId(); ValidationUtils.checkState(!fileId.isEmpty()); List candidateRecordKeys = bloomFilterKeyLookupResult.getCandidateKeys(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java index 5dacad7320eeb..840932b2346d0 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java @@ -20,7 +20,6 @@ import org.apache.hudi.client.utils.LazyIterableIterator; import org.apache.hudi.common.bloom.BloomFilter; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; @@ -44,6 +43,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import scala.Tuple2; @@ -93,7 +93,7 @@ public BloomIndexLazyKeyCheckIterator(Iterator @Override protected Iterator> computeNext() { // Partition path and file name pair to list of keys - final Map, List> fileToKeysMap = new HashMap<>(); + final Map, List> fileToKeysMap = new HashMap<>(); final Map fileIDBaseFileMap = new HashMap<>(); while (inputItr.hasNext()) { @@ -110,7 +110,7 @@ protected Iterator> co fileIDBaseFileMap.put(fileId, baseFile.get()); } - fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId).getFileName()), + fileToKeysMap.computeIfAbsent(Pair.of(partitionPath, fileIDBaseFileMap.get(fileId)), k -> new ArrayList<>()).add(new HoodieKey(entry._2, partitionPath)); if (fileToKeysMap.size() > BLOOM_FILTER_CHECK_MAX_FILE_COUNT_PER_BATCH) { @@ -122,18 +122,17 @@ protected Iterator> co return Collections.emptyIterator(); } - List> partitionNameFileNameList = new ArrayList<>(fileToKeysMap.keySet()); + List> partitionNameFileNameList = fileToKeysMap.keySet().stream().map(pair -> Pair.of(pair.getLeft(), pair.getRight().getFileName())).collect(Collectors.toList()); Map, BloomFilter> fileToBloomFilterMap = hoodieTable.getMetadataTable().getBloomFilters(partitionNameFileNameList); return fileToKeysMap.entrySet().stream() .map(entry -> { - Pair partitionPathFileNamePair = entry.getKey(); + Pair partitionPathFileNamePair = entry.getKey(); List hoodieKeyList = entry.getValue(); final String partitionPath = partitionPathFileNamePair.getLeft(); - final String fileName = partitionPathFileNamePair.getRight(); - final String fileId = FSUtils.getFileId(fileName); + final String fileId = partitionPathFileNamePair.getRight().getFileId(); ValidationUtils.checkState(!fileId.isEmpty()); if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index d98524fc38f42..edd6d981d1850 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -302,9 +302,8 @@ protected List getSmallFiles(String partitionPath) { for (HoodieBaseFile file : allFiles) { if (file.getFileSize() < config.getParquetSmallFileLimit()) { - String filename = file.getFileName(); SmallFile sf = new SmallFile(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.location = new HoodieRecordLocation(file.getCommitTime(), file.getFileId()); sf.sizeBytes = file.getFileSize(); smallFileLocations.add(sf); } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java index a5dd9978939f7..dc8f267718f24 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/deltacommit/SparkUpsertDeltaCommitPartitioner.java @@ -19,8 +19,8 @@ package org.apache.hudi.table.action.deltacommit; import org.apache.hudi.client.common.HoodieSparkEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -32,6 +32,7 @@ import org.apache.hudi.table.action.commit.UpsertPartitioner; import javax.annotation.Nonnull; + import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; @@ -69,15 +70,14 @@ protected List getSmallFiles(String partitionPath) { for (FileSlice smallFileSlice : smallFileSlicesCandidates) { SmallFile sf = new SmallFile(); if (smallFileSlice.getBaseFile().isPresent()) { - // TODO : Move logic of file name, file id, base commit time handling inside file slice - String filename = smallFileSlice.getBaseFile().get().getFileName(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + HoodieBaseFile baseFile = smallFileSlice.getBaseFile().get(); + sf.location = new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()); sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); } else { HoodieLogFile logFile = smallFileSlice.getLogFiles().findFirst().get(); - sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), - FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.location = new HoodieRecordLocation(logFile.getBaseCommitTime(), + logFile.getFileId()); sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java index d63696effba4a..894a6463a4447 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java @@ -19,8 +19,8 @@ package org.apache.hudi.sink.partitioner.profile; import org.apache.hudi.client.common.HoodieFlinkEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.table.timeline.HoodieInstant; @@ -70,16 +70,14 @@ protected List smallFilesProfile(String partitionPath) { for (FileSlice smallFileSlice : allSmallFileSlices) { SmallFile sf = new SmallFile(); if (smallFileSlice.getBaseFile().isPresent()) { - // TODO : Move logic of file name, file id, base commit time handling inside file slice - String filename = smallFileSlice.getBaseFile().get().getFileName(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + HoodieBaseFile baseFile = smallFileSlice.getBaseFile().get(); + sf.location = new HoodieRecordLocation(baseFile.getCommitTime(), baseFile.getFileId()); sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); } else { smallFileSlice.getLogFiles().findFirst().ifPresent(logFile -> { // in case there is something error, and the file slice has no log file - sf.location = new HoodieRecordLocation(FSUtils.getBaseCommitTimeFromLogPath(logFile.getPath()), - FSUtils.getFileIdFromLogPath(logFile.getPath())); + sf.location = new HoodieRecordLocation(logFile.getBaseCommitTime(), logFile.getFileId()); sf.sizeBytes = getTotalFileSize(smallFileSlice); smallFileLocations.add(sf); }); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java index 9dd604a717f9e..1f41888ff45c0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java @@ -20,7 +20,6 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext; import org.apache.hudi.common.config.HoodieStorageConfig; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecordLocation; @@ -210,9 +209,8 @@ protected List smallFilesProfile(String partitionPath) { for (HoodieBaseFile file : allFiles) { // filter out the corrupted files. if (file.getFileSize() < config.getParquetSmallFileLimit() && file.getFileSize() > 0) { - String filename = file.getFileName(); SmallFile sf = new SmallFile(); - sf.location = new HoodieRecordLocation(FSUtils.getCommitTime(filename), FSUtils.getFileId(filename)); + sf.location = new HoodieRecordLocation(file.getCommitTime(), file.getFileId()); sf.sizeBytes = file.getFileSize(); smallFileLocations.add(sf); } From 6e6b63c03d92a4482a5856d5e353350dbad0a7d3 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Tue, 1 Aug 2023 13:54:52 -0700 Subject: [PATCH 02/14] log file testing and refactor --- .../hudi/client/CompactionAdminClient.java | 2 +- .../org/apache/hudi/io/HoodieWriteHandle.java | 2 +- .../org/apache/hudi/common/fs/FSUtils.java | 9 --- .../hudi/common/model/HoodieLogFile.java | 73 +++++++++++------ .../hudi/common/model/TestHoodieBaseFile.java | 22 ++++++ .../hudi/common/model/TestHoodieLogFile.java | 79 +++++++++++++++++++ 6 files changed, 153 insertions(+), 34 deletions(-) create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java create mode 100644 hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 8ff562c2070ec..6445ca5b0719b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -245,7 +245,7 @@ protected static List> getRenamingActionsToAl return logFilesToBeMoved.stream().map(lf -> { ValidationUtils.checkArgument(lf.getLogVersion() - maxVersion > 0, "Expect new log version to be sane"); HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(), - FSUtils.makeLogFileName(lf.getFileId(), "." + FSUtils.getFileExtensionFromLog(lf.getPath()), + FSUtils.makeLogFileName(lf.getFileId(), "." + lf.getFileExtension(), compactionInstant, lf.getLogVersion() - maxVersion, HoodieLogFormat.UNKNOWN_WRITE_TOKEN))); return Pair.of(lf, newLogFile); }).collect(Collectors.toList()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java index 9e716a280e812..8148076759928 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java @@ -253,7 +253,7 @@ protected HoodieLogFormat.Writer createLogWriter( .withSizeThreshold(config.getLogFileMaxSize()) .withFs(fs) .withRolloverLogWriteToken(writeToken) - .withLogWriteToken(latestLogFile.map(x -> FSUtils.getWriteTokenFromLogPath(x.getPath())).orElse(writeToken)) + .withLogWriteToken(latestLogFile.map(HoodieLogFile::getLogWriteToken).orElse(writeToken)) .withSuffix(suffix) .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 234af8a7be3b7..601a600bb7588 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -454,15 +454,6 @@ public static int getFileVersionFromLog(String logFileName) { return Integer.parseInt(matcher.group(4)); } - public static String getSuffixFromLogPath(Path path) { - Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); - if (!matcher.find()) { - throw new InvalidHoodiePathException(path, "LogFile"); - } - String val = matcher.group(10); - return val == null ? "" : val; - } - public static String makeLogFileName(String fileId, String logFileExtension, String baseCommitTime, int version, String writeToken) { String suffix = (writeToken == null) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index 988194964f71d..c2edce8967e62 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -19,6 +19,8 @@ package org.apache.hudi.common.model; import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.exception.InvalidHoodiePathException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -28,6 +30,9 @@ import java.io.Serializable; import java.util.Comparator; import java.util.Objects; +import java.util.regex.Matcher; + +import static org.apache.hudi.common.fs.FSUtils.LOG_FILE_PATTERN; /** * Abstracts a single log file. Contains methods to extract metadata like the fileId, version and extension from the log @@ -47,60 +52,82 @@ public class HoodieLogFile implements Serializable { private transient FileStatus fileStatus; private final String pathStr; + private final String fileId; + private final String baseCommitTime; + private final int logVersion; + private final String logWriteToken; + private final String fileExtension; + private final String suffix; + private final Path path; private long fileLen; public HoodieLogFile(HoodieLogFile logFile) { - this.fileStatus = logFile.fileStatus; - this.pathStr = logFile.pathStr; - this.fileLen = logFile.fileLen; + this(logFile.getFileStatus(), logFile.getPath(), logFile.pathStr, logFile.getFileSize()); } public HoodieLogFile(FileStatus fileStatus) { - this.fileStatus = fileStatus; - this.pathStr = fileStatus.getPath().toString(); - this.fileLen = fileStatus.getLen(); + this(fileStatus, fileStatus.getPath(), fileStatus.getPath().toString(), fileStatus.getLen()); } public HoodieLogFile(Path logPath) { - this.fileStatus = null; - this.pathStr = logPath.toString(); - this.fileLen = -1; + this(null, logPath, logPath.toString(), -1); } - public HoodieLogFile(Path logPath, Long fileLen) { - this.fileStatus = null; - this.pathStr = logPath.toString(); - this.fileLen = fileLen; + public HoodieLogFile(Path logPath, long fileLen) { + this(null, logPath, logPath.toString(), fileLen); } public HoodieLogFile(String logPathStr) { - this.fileStatus = null; + this(null, null, logPathStr, -1); + } + + private HoodieLogFile(FileStatus fileStatus, Path logPath, String logPathStr, long fileLen) { + this.fileStatus = fileStatus; this.pathStr = logPathStr; - this.fileLen = -1; + this.fileLen = fileLen; + if (logPath != null) { + if (logPath instanceof CachingPath) { + this.path = logPath; + } else { + this.path = new CachingPath(logPath.getParent(), logPath.getName()); + } + } else { + this.path = new CachingPath(pathStr); + } + Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); + if (!matcher.find()) { + throw new InvalidHoodiePathException(path, "LogFile"); + } + this.fileId = matcher.group(1); + this.baseCommitTime = matcher.group(2); + this.fileExtension = matcher.group(3); + this.logVersion = Integer.parseInt(matcher.group(4)); + this.logWriteToken = matcher.group(6); + this.suffix = matcher.group(10); } public String getFileId() { - return FSUtils.getFileIdFromLogPath(getPath()); + return fileId; } public String getBaseCommitTime() { - return FSUtils.getBaseCommitTimeFromLogPath(getPath()); + return baseCommitTime; } public int getLogVersion() { - return FSUtils.getFileVersionFromLog(getPath()); + return logVersion; } public String getLogWriteToken() { - return FSUtils.getWriteTokenFromLogPath(getPath()); + return logWriteToken; } public String getFileExtension() { - return FSUtils.getFileExtensionFromLog(getPath()); + return fileExtension; } public String getSuffix() { - return FSUtils.getSuffixFromLogPath(getPath()); + return suffix; } public Path getPath() { @@ -131,9 +158,9 @@ public HoodieLogFile rollOver(FileSystem fs, String logWriteToken) throws IOExce String fileId = getFileId(); String baseCommitTime = getBaseCommitTime(); Path path = getPath(); - String extension = "." + FSUtils.getFileExtensionFromLog(path); + String extension = "." + fileExtension; int newVersion = FSUtils.computeNextLogVersion(fs, path.getParent(), fileId, extension, baseCommitTime); - return new HoodieLogFile(new Path(path.getParent(), + return new HoodieLogFile(new CachingPath(path.getParent(), FSUtils.makeLogFileName(fileId, extension, baseCommitTime, newVersion, logWriteToken))); } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java new file mode 100644 index 0000000000000..6dba0fefbcd2e --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +public class TestHoodieBaseFile { +} diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java new file mode 100644 index 0000000000000..be6088ea087a9 --- /dev/null +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.model; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestHoodieLogFile { + private final String pathStr = "file:///tmp/hoodie/2021/01/01/.136281f3-c24e-423b-a65a-95dbfbddce1d_100.log.2_1-0-1"; + private final String fileId = "136281f3-c24e-423b-a65a-95dbfbddce1d"; + private final String baseCommitTime = "100"; + private final int logVersion = 2; + private final String writeToken = "1-0-1"; + private final String fileExtension = "log"; + + private final int length = 10; + + @Test + void createFromLogFile() { + FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(pathStr)); + HoodieLogFile hoodieLogFile = new HoodieLogFile(fileStatus); + assertFileGetters(fileStatus, new HoodieLogFile(hoodieLogFile), length); + } + + @Test + void createFromFileStatus() { + FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(pathStr)); + HoodieLogFile hoodieLogFile = new HoodieLogFile(fileStatus); + assertFileGetters(fileStatus, hoodieLogFile, length); + } + + @Test + void createFromPath() { + HoodieLogFile hoodieLogFile = new HoodieLogFile(new Path(pathStr)); + assertFileGetters(null, hoodieLogFile, -1); + } + + @Test + void createFromPathAndLength() { + HoodieLogFile hoodieLogFile = new HoodieLogFile(new Path(pathStr), length); + assertFileGetters(null, hoodieLogFile, length); + } + + @Test + void createFromString() { + HoodieLogFile hoodieLogFile = new HoodieLogFile(pathStr); + assertFileGetters(null, hoodieLogFile, -1); + } + + private void assertFileGetters(FileStatus fileStatus, HoodieLogFile hoodieLogFile, long fileLength) { + assertEquals(fileId, hoodieLogFile.getFileId()); + assertEquals(baseCommitTime, hoodieLogFile.getBaseCommitTime()); + assertEquals(logVersion, hoodieLogFile.getLogVersion()); + assertEquals(writeToken, hoodieLogFile.getLogWriteToken()); + assertEquals(fileExtension, hoodieLogFile.getFileExtension()); + assertEquals(new Path(pathStr), hoodieLogFile.getPath()); + assertEquals(fileLength, hoodieLogFile.getFileSize()); + assertEquals(fileStatus, hoodieLogFile.getFileStatus()); + } +} From 01f6db34c50593c52c26f83a26e54c819c044c04 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Tue, 1 Aug 2023 14:24:09 -0700 Subject: [PATCH 03/14] update basefile code, update another usage of file id, optimize FSUtils.getFileId --- .../hudi/index/bloom/HoodieBloomIndex.java | 18 +++--- .../org/apache/hudi/common/fs/FSUtils.java | 6 +- .../apache/hudi/common/model/BaseFile.java | 2 +- .../hudi/common/model/HoodieBaseFile.java | 38 +++++++++++- .../hudi/common/model/TestHoodieBaseFile.java | 59 +++++++++++++++++++ 5 files changed, 109 insertions(+), 14 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index eca347a75bce2..c6d8b72e3a9f4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -25,7 +25,6 @@ import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodiePairData; import org.apache.hudi.common.engine.HoodieEngineContext; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieFileGroupId; import org.apache.hudi.common.model.HoodieKey; @@ -47,6 +46,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Stream; @@ -214,12 +214,16 @@ protected List> loadColumnRangesFromMetaIndex( String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); + List> baseFilesForAllPartitions = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable); + List> partitionFileNameList = new ArrayList<>(baseFilesForAllPartitions.size()); + Map, String> partitionAndFileNameToFileId = new HashMap<>(baseFilesForAllPartitions.size()); + baseFilesForAllPartitions.forEach(pair -> { + Pair parititonAndFileName = Pair.of(pair.getKey(), pair.getValue().getFileName()); + partitionFileNameList.add(parititonAndFileName); + partitionAndFileNameToFileId.put(parititonAndFileName, pair.getValue().getFileId()); + }); // Partition and file name pairs - List> partitionFileNameList = - HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable).stream() - .map(partitionBaseFilePair -> Pair.of(partitionBaseFilePair.getLeft(), partitionBaseFilePair.getRight().getFileName())) - .sorted() - .collect(toList()); + Collections.sort(partitionFileNameList); // TODO why does this need to be sorted? if (partitionFileNameList.isEmpty()) { return Collections.emptyList(); @@ -233,7 +237,7 @@ protected List> loadColumnRangesFromMetaIndex( for (Map.Entry, HoodieMetadataColumnStats> entry : fileToColumnStatsMap.entrySet()) { result.add(Pair.of(entry.getKey().getLeft(), new BloomIndexFileInfo( - FSUtils.getFileId(entry.getKey().getRight()), + partitionAndFileNameToFileId.get(entry.getKey()), // NOTE: Here we assume that the type of the primary key field is string (String) unwrapStatisticValueWrapper(entry.getValue().getMinValue()), (String) unwrapStatisticValueWrapper(entry.getValue().getMaxValue()) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 601a600bb7588..10226944ffdaa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -193,9 +193,9 @@ public static String getCommitFromCommitFile(String commitFileName) { public static String getCommitTime(String fullFileName) { try { if (isLogFile(fullFileName)) { - return fullFileName.split("_")[1].split("\\.")[0]; + return fullFileName.split("_")[1].split("\\.", 2)[0]; } - return fullFileName.split("_")[2].split("\\.")[0]; + return fullFileName.split("_")[2].split("\\.", 2)[0]; } catch (ArrayIndexOutOfBoundsException e) { throw new HoodieException("Failed to get commit time from filename: " + fullFileName, e); } @@ -206,7 +206,7 @@ public static long getFileSize(FileSystem fs, Path path) throws IOException { } public static String getFileId(String fullFileName) { - return fullFileName.split("_")[0]; + return fullFileName.split("_", 2)[0]; } /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java index 7c4c89777957c..cfca6c50c75f1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/BaseFile.java @@ -36,7 +36,7 @@ public class BaseFile implements Serializable { private transient FileStatus fileStatus; private final String fullPath; - private final String fileName; + protected final String fileName; private long fileLen; public BaseFile(BaseFile dataFile) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java index 2c640bb143294..ce64abe6194ca 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java @@ -18,7 +18,6 @@ package org.apache.hudi.common.model; -import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; import org.apache.hadoop.fs.FileStatus; @@ -28,12 +27,17 @@ * Supports APIs to get Hudi FileId, Commit Time and bootstrap file (if any). */ public class HoodieBaseFile extends BaseFile { + private static final long serialVersionUID = 1L; + private final String fileId; + private final String commitTime; private Option bootstrapBaseFile; public HoodieBaseFile(HoodieBaseFile dataFile) { super(dataFile); this.bootstrapBaseFile = dataFile.bootstrapBaseFile; + this.fileId = dataFile.getFileId(); + this.commitTime = dataFile.getCommitTime(); } public HoodieBaseFile(FileStatus fileStatus) { @@ -43,6 +47,9 @@ public HoodieBaseFile(FileStatus fileStatus) { public HoodieBaseFile(FileStatus fileStatus, BaseFile bootstrapBaseFile) { super(fileStatus); this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile); + String[] fileIdAndCommitTime = getFileIdAndCommitTimeFromFileName(); + this.fileId = fileIdAndCommitTime[0]; + this.commitTime = fileIdAndCommitTime[1]; } public HoodieBaseFile(String filePath) { @@ -52,14 +59,39 @@ public HoodieBaseFile(String filePath) { public HoodieBaseFile(String filePath, BaseFile bootstrapBaseFile) { super(filePath); this.bootstrapBaseFile = Option.ofNullable(bootstrapBaseFile); + String[] fileIdAndCommitTime = getFileIdAndCommitTimeFromFileName(); + this.fileId = fileIdAndCommitTime[0]; + this.commitTime = fileIdAndCommitTime[1]; + } + + private String[] getFileIdAndCommitTimeFromFileName() { + String[] values = new String[2]; + short underscoreCount = 0; + short lastUnderscoreIndex = 0; + for (int i = 0; i < fileName.length(); i++) { + char c = fileName.charAt(i); + if (c == '_') { + if (underscoreCount == 0) { + values[0] = fileName.substring(0, i); + } + lastUnderscoreIndex = (short) i; + underscoreCount++; + } else if (c == '.') { + if (underscoreCount == 2) { + values[1] = fileName.substring(lastUnderscoreIndex + 1, i); + break; + } + } + } + return values; } public String getFileId() { - return FSUtils.getFileId(getFileName()); + return fileId; } public String getCommitTime() { - return FSUtils.getCommitTime(getFileName()); + return commitTime; } public Option getBootstrapBaseFile() { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java index 6dba0fefbcd2e..15a93cd232bb5 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieBaseFile.java @@ -18,5 +18,64 @@ package org.apache.hudi.common.model; +import org.apache.hudi.common.util.Option; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.Path; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + public class TestHoodieBaseFile { + private final String fileName = "136281f3-c24e-423b-a65a-95dbfbddce1d_1-0-1_100.parquet"; + private final String pathStr = "file:/tmp/hoodie/2021/01/01/" + fileName; + private final String fileId = "136281f3-c24e-423b-a65a-95dbfbddce1d"; + private final String baseCommitTime = "100"; + private final int length = 10; + + @Test + void createFromHoodieBaseFile() { + FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(pathStr)); + HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus); + assertFileGetters(fileStatus, new HoodieBaseFile(hoodieBaseFile), length, Option.empty()); + } + + @Test + void createFromFileStatus() { + FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(pathStr)); + HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus); + assertFileGetters(fileStatus, hoodieBaseFile, length, Option.empty()); + } + + @Test + void createFromFileStatusAndBootstrapBaseFile() { + HoodieBaseFile bootstrapBaseFile = new HoodieBaseFile(pathStr); + FileStatus fileStatus = new FileStatus(length, false, 0, 0, 0, 0, null, null, null, new Path(pathStr)); + HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(fileStatus, bootstrapBaseFile); + assertFileGetters(fileStatus, hoodieBaseFile, length, Option.of(bootstrapBaseFile)); + } + + @Test + void createFromFilePath() { + HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(pathStr); + assertFileGetters(null, hoodieBaseFile, -1, Option.empty()); + } + + @Test + void createFromFilePathAndBootstrapBaseFile() { + HoodieBaseFile bootstrapBaseFile = new HoodieBaseFile(pathStr); + HoodieBaseFile hoodieBaseFile = new HoodieBaseFile(pathStr, bootstrapBaseFile); + assertFileGetters(null, hoodieBaseFile, -1, Option.of(bootstrapBaseFile)); + } + + private void assertFileGetters(FileStatus fileStatus, HoodieBaseFile hoodieBaseFile, long fileLength, Option bootstrapBaseFile) { + assertEquals(fileId, hoodieBaseFile.getFileId()); + assertEquals(baseCommitTime, hoodieBaseFile.getCommitTime()); + assertEquals(bootstrapBaseFile, hoodieBaseFile.getBootstrapBaseFile()); + assertEquals(fileName, hoodieBaseFile.getFileName()); + assertEquals(pathStr, hoodieBaseFile.getPath()); + assertEquals(new Path(pathStr), hoodieBaseFile.getHadoopPath()); + assertEquals(fileLength, hoodieBaseFile.getFileSize()); + assertEquals(fileStatus, hoodieBaseFile.getFileStatus()); + } } From d14b5be10efe87824b1cce2d0e1f8710780c8472 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Tue, 1 Aug 2023 14:27:05 -0700 Subject: [PATCH 04/14] return instance of path in log file --- .../main/java/org/apache/hudi/common/model/HoodieLogFile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index c2edce8967e62..4ba33bc07108d 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -131,7 +131,7 @@ public String getSuffix() { } public Path getPath() { - return new Path(pathStr); + return path; } public String getFileName() { From 655dfafb77fc0df33d56f7c97314f680fb13b01d Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Tue, 1 Aug 2023 14:34:19 -0700 Subject: [PATCH 05/14] add suffix test case --- .../apache/hudi/common/model/TestHoodieLogFile.java | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java index be6088ea087a9..e0e3f9b6da0ca 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java @@ -66,7 +66,20 @@ void createFromString() { assertFileGetters(null, hoodieLogFile, -1); } + @Test + void createFromStringWithSuffix() { + String suffix = ".cdc"; + String pathWithSuffix = pathStr + suffix; + HoodieLogFile hoodieLogFile = new HoodieLogFile(pathWithSuffix); + assertFileGetters(pathWithSuffix, null, hoodieLogFile, -1); + assertEquals(suffix, hoodieLogFile.getSuffix()); + } + private void assertFileGetters(FileStatus fileStatus, HoodieLogFile hoodieLogFile, long fileLength) { + assertFileGetters(pathStr, fileStatus, hoodieLogFile, fileLength); + } + + private void assertFileGetters(String pathStr, FileStatus fileStatus, HoodieLogFile hoodieLogFile, long fileLength) { assertEquals(fileId, hoodieLogFile.getFileId()); assertEquals(baseCommitTime, hoodieLogFile.getBaseCommitTime()); assertEquals(logVersion, hoodieLogFile.getLogVersion()); From 89d3dd3c8ba9bc10d6f6be906337c4d55c741780 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Tue, 1 Aug 2023 16:45:56 -0700 Subject: [PATCH 06/14] fix suffix handling --- .../java/org/apache/hudi/common/model/HoodieLogFile.java | 2 +- .../org/apache/hudi/common/model/TestHoodieLogFile.java | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index 4ba33bc07108d..59b6b70e5f82a 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -103,7 +103,7 @@ private HoodieLogFile(FileStatus fileStatus, Path logPath, String logPathStr, lo this.fileExtension = matcher.group(3); this.logVersion = Integer.parseInt(matcher.group(4)); this.logWriteToken = matcher.group(6); - this.suffix = matcher.group(10); + this.suffix = matcher.group(10) == null ? "" : matcher.group(10); } public String getFileId() { diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java index e0e3f9b6da0ca..1096d222ad904 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodieLogFile.java @@ -71,15 +71,14 @@ void createFromStringWithSuffix() { String suffix = ".cdc"; String pathWithSuffix = pathStr + suffix; HoodieLogFile hoodieLogFile = new HoodieLogFile(pathWithSuffix); - assertFileGetters(pathWithSuffix, null, hoodieLogFile, -1); - assertEquals(suffix, hoodieLogFile.getSuffix()); + assertFileGetters(pathWithSuffix, null, hoodieLogFile, -1, suffix); } private void assertFileGetters(FileStatus fileStatus, HoodieLogFile hoodieLogFile, long fileLength) { - assertFileGetters(pathStr, fileStatus, hoodieLogFile, fileLength); + assertFileGetters(pathStr, fileStatus, hoodieLogFile, fileLength, ""); } - private void assertFileGetters(String pathStr, FileStatus fileStatus, HoodieLogFile hoodieLogFile, long fileLength) { + private void assertFileGetters(String pathStr, FileStatus fileStatus, HoodieLogFile hoodieLogFile, long fileLength, String suffix) { assertEquals(fileId, hoodieLogFile.getFileId()); assertEquals(baseCommitTime, hoodieLogFile.getBaseCommitTime()); assertEquals(logVersion, hoodieLogFile.getLogVersion()); @@ -88,5 +87,6 @@ private void assertFileGetters(String pathStr, FileStatus fileStatus, HoodieLogF assertEquals(new Path(pathStr), hoodieLogFile.getPath()); assertEquals(fileLength, hoodieLogFile.getFileSize()); assertEquals(fileStatus, hoodieLogFile.getFileStatus()); + assertEquals(suffix, hoodieLogFile.getSuffix()); } } From e8527c9addb6b9135b6a39f8aedc2b6b863f3ca5 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Tue, 1 Aug 2023 17:37:23 -0700 Subject: [PATCH 07/14] mark path as transient for serialization --- .../java/org/apache/hudi/common/model/HoodieLogFile.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index 59b6b70e5f82a..0794188257860 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -51,6 +51,7 @@ public class HoodieLogFile implements Serializable { private static final Comparator LOG_FILE_COMPARATOR_REVERSED = new LogFileComparator().reversed(); private transient FileStatus fileStatus; + private transient Path path; private final String pathStr; private final String fileId; private final String baseCommitTime; @@ -58,7 +59,6 @@ public class HoodieLogFile implements Serializable { private final String logWriteToken; private final String fileExtension; private final String suffix; - private final Path path; private long fileLen; public HoodieLogFile(HoodieLogFile logFile) { @@ -131,6 +131,9 @@ public String getSuffix() { } public Path getPath() { + if (path == null) { + path = new CachingPath(pathStr); + } return path; } From 48b44eb2e8d531fb7d11ab8be777808e422aa975 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Tue, 1 Aug 2023 18:42:06 -0700 Subject: [PATCH 08/14] handle base file naming edge case --- .../java/org/apache/hudi/common/model/HoodieBaseFile.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java index ce64abe6194ca..7bf156cec8f42 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java @@ -79,10 +79,12 @@ private String[] getFileIdAndCommitTimeFromFileName() { } else if (c == '.') { if (underscoreCount == 2) { values[1] = fileName.substring(lastUnderscoreIndex + 1, i); - break; + return values; } } } + // case where there is no '.' in file name + values[1] = fileName.substring(lastUnderscoreIndex + 1); return values; } From 9cbb48c5cad3d7b467a05eee5a692900539ed863 Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Wed, 2 Aug 2023 09:08:46 -0700 Subject: [PATCH 09/14] more optimizations --- .../src/main/java/org/apache/hudi/common/fs/FSUtils.java | 2 +- .../org/apache/hudi/common/model/CompactionOperation.java | 4 ++-- .../org/apache/hudi/common/table/log/HoodieLogFileReader.java | 4 +++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java index 10226944ffdaa..455f80af497ab 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java @@ -538,7 +538,7 @@ public static Option> getLatestLogVersion(FileSystem fs, P getLatestLogFile(getAllLogFiles(fs, partitionPath, fileId, logFileExtension, baseCommitTime)); if (latestLogFile.isPresent()) { return Option - .of(Pair.of(latestLogFile.get().getLogVersion(), getWriteTokenFromLogPath(latestLogFile.get().getPath()))); + .of(Pair.of(latestLogFile.get().getLogVersion(), latestLogFile.get().getLogWriteToken())); } return Option.empty(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java index 14f8f59b3daea..861271b06309e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/CompactionOperation.java @@ -73,8 +73,8 @@ public CompactionOperation(Option dataFile, String partitionPath } else { assert logFiles.size() > 0; this.dataFileName = Option.empty(); - this.baseInstantTime = FSUtils.getBaseCommitTimeFromLogPath(logFiles.get(0).getPath()); - this.id = new HoodieFileGroupId(partitionPath, FSUtils.getFileIdFromLogPath(logFiles.get(0).getPath())); + this.baseInstantTime = logFiles.get(0).getBaseCommitTime(); + this.id = new HoodieFileGroupId(partitionPath, logFiles.get(0).getFileId()); this.dataFileCommitTime = Option.empty(); this.bootstrapFilePath = Option.empty(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java index f268accb7060d..6759650af7818 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java @@ -47,6 +47,7 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +115,8 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema readerSc // NOTE: We repackage {@code HoodieLogFile} here to make sure that the provided path // is prefixed with an appropriate scheme given that we're not propagating the FS // further - this.logFile = new HoodieLogFile(FSUtils.makeQualified(fs, logFile.getPath()), logFile.getFileSize()); + Path updatedPath = FSUtils.makeQualified(fs, logFile.getPath()); + this.logFile = updatedPath.equals(logFile.getPath()) ? logFile : new HoodieLogFile(updatedPath, logFile.getFileSize()); this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize); this.readerSchema = readerSchema; this.readBlockLazily = readBlockLazily; From ee75581ba4563bb8ac5b2caaffd13c50eb241e7d Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Thu, 3 Aug 2023 07:49:52 -0700 Subject: [PATCH 10/14] provide CachingPath instance to log file constructor when possible --- .../org/apache/hudi/common/model/HoodieBaseFile.java | 12 +++++++++--- .../table/log/AbstractHoodieLogRecordReader.java | 6 +++--- .../view/HoodieTablePreCommitFileSystemView.java | 5 ++--- .../hudi/utilities/HoodieMetadataTableValidator.java | 2 +- 4 files changed, 15 insertions(+), 10 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java index 7bf156cec8f42..ed1c32698eb84 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieBaseFile.java @@ -28,6 +28,8 @@ */ public class HoodieBaseFile extends BaseFile { private static final long serialVersionUID = 1L; + private static final char UNDERSCORE = '_'; + private static final char DOT = '.'; private final String fileId; private final String commitTime; @@ -64,26 +66,30 @@ public HoodieBaseFile(String filePath, BaseFile bootstrapBaseFile) { this.commitTime = fileIdAndCommitTime[1]; } + /** + * Parses the file ID and commit time from the fileName. + * @return String array of size 2 with fileId as the first and commitTime as the second element. + */ private String[] getFileIdAndCommitTimeFromFileName() { String[] values = new String[2]; short underscoreCount = 0; short lastUnderscoreIndex = 0; for (int i = 0; i < fileName.length(); i++) { char c = fileName.charAt(i); - if (c == '_') { + if (c == UNDERSCORE) { if (underscoreCount == 0) { values[0] = fileName.substring(0, i); } lastUnderscoreIndex = (short) i; underscoreCount++; - } else if (c == '.') { + } else if (c == DOT) { if (underscoreCount == 2) { values[1] = fileName.substring(lastUnderscoreIndex + 1, i); return values; } } } - // case where there is no '.' in file name + // case where there is no '.' in file name (no file suffix like .parquet) values[1] = fileName.substring(lastUnderscoreIndex + 1); return values; } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java index 6ef1a6f554270..7b1e737610b65 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java @@ -39,13 +39,13 @@ import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.action.InternalSchemaMerger; import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter; import org.apache.avro.Schema; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -237,7 +237,7 @@ private void scanInternalV1(Option keySpecOpt) { try { // Iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, - logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), + logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); Set scannedLogFiles = new HashSet<>(); @@ -396,7 +396,7 @@ private void scanInternalV2(Option keySpecOption, boolean skipProcessin try { // Iterate over the paths logFormatReaderWrapper = new HoodieLogFormatReader(fs, - logFilePaths.stream().map(logFile -> new HoodieLogFile(new Path(logFile))).collect(Collectors.toList()), + logFilePaths.stream().map(logFile -> new HoodieLogFile(new CachingPath(logFile))).collect(Collectors.toList()), readerSchema, readBlocksLazily, reverseReader, bufferSize, shouldLookupRecords(), recordKeyField, internalSchema); /** diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java index b50b74e65dfd1..1ff93327f7feb 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/view/HoodieTablePreCommitFileSystemView.java @@ -21,8 +21,7 @@ import org.apache.hudi.common.model.HoodieBaseFile; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableMetaClient; - -import org.apache.hadoop.fs.Path; +import org.apache.hudi.hadoop.CachingPath; import java.util.Collections; import java.util.List; @@ -72,7 +71,7 @@ public final Stream getLatestBaseFiles(String partitionStr) { Map newFilesWrittenForPartition = filesWritten.stream() .filter(file -> partitionStr.equals(file.getPartitionPath())) .collect(Collectors.toMap(HoodieWriteStat::getFileId, writeStat -> - new HoodieBaseFile(new Path(tableMetaClient.getBasePath(), writeStat.getPath()).toString()))); + new HoodieBaseFile(new CachingPath(tableMetaClient.getBasePath(), writeStat.getPath()).toString()))); Stream committedBaseFiles = this.completedCommitsFileSystemView.getLatestBaseFiles(partitionStr); Map allFileIds = committedBaseFiles diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 16c0b01147984..d79957c735f4f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -875,7 +875,7 @@ private boolean hasCommittedLogFiles( } Schema readerSchema = converter.convert(messageType); reader = - HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFilePathStr)), readerSchema); + HoodieLogFormat.newReader(fs, new HoodieLogFile(logFilePathStr), readerSchema); // read the avro blocks if (reader.hasNext()) { HoodieLogBlock block = reader.next(); From 57f45433badf2316a431153a1728a2a80ef562fd Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Thu, 3 Aug 2023 08:45:03 -0700 Subject: [PATCH 11/14] remove sorting --- .../org/apache/hudi/index/bloom/HoodieBloomIndex.java | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index c6d8b72e3a9f4..9d02ad56c2674 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -215,15 +215,14 @@ protected List> loadColumnRangesFromMetaIndex( String keyField = hoodieTable.getMetaClient().getTableConfig().getRecordKeyFieldProp(); List> baseFilesForAllPartitions = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable); + // Partition and file name pairs List> partitionFileNameList = new ArrayList<>(baseFilesForAllPartitions.size()); Map, String> partitionAndFileNameToFileId = new HashMap<>(baseFilesForAllPartitions.size()); baseFilesForAllPartitions.forEach(pair -> { - Pair parititonAndFileName = Pair.of(pair.getKey(), pair.getValue().getFileName()); - partitionFileNameList.add(parititonAndFileName); - partitionAndFileNameToFileId.put(parititonAndFileName, pair.getValue().getFileId()); + Pair partitionAndFileName = Pair.of(pair.getKey(), pair.getValue().getFileName()); + partitionFileNameList.add(partitionAndFileName); + partitionAndFileNameToFileId.put(partitionAndFileName, pair.getValue().getFileId()); }); - // Partition and file name pairs - Collections.sort(partitionFileNameList); // TODO why does this need to be sorted? if (partitionFileNameList.isEmpty()) { return Collections.emptyList(); From 306b6c94e2f4793f91ae9b6ffa3f102c8bc2a18e Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Thu, 3 Aug 2023 08:58:39 -0700 Subject: [PATCH 12/14] set load factor on map to avoid resize --- .../main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java index 9d02ad56c2674..dba2417f042f3 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/index/bloom/HoodieBloomIndex.java @@ -217,7 +217,7 @@ protected List> loadColumnRangesFromMetaIndex( List> baseFilesForAllPartitions = HoodieIndexUtils.getLatestBaseFilesForAllPartitions(partitions, context, hoodieTable); // Partition and file name pairs List> partitionFileNameList = new ArrayList<>(baseFilesForAllPartitions.size()); - Map, String> partitionAndFileNameToFileId = new HashMap<>(baseFilesForAllPartitions.size()); + Map, String> partitionAndFileNameToFileId = new HashMap<>(baseFilesForAllPartitions.size(), 1); baseFilesForAllPartitions.forEach(pair -> { Pair partitionAndFileName = Pair.of(pair.getKey(), pair.getValue().getFileName()); partitionFileNameList.add(partitionAndFileName); From 36a64db7653c9a21ee1d3c6c72c88510ed8d90cb Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Thu, 3 Aug 2023 19:35:07 -0700 Subject: [PATCH 13/14] fix HoodieMetadataBloomFilterProbingFunction bug --- .../bloom/HoodieMetadataBloomFilterProbingFunction.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java index 840932b2346d0..c96bd8740fe21 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/bloom/HoodieMetadataBloomFilterProbingFunction.java @@ -128,13 +128,13 @@ protected Iterator> co return fileToKeysMap.entrySet().stream() .map(entry -> { - Pair partitionPathFileNamePair = entry.getKey(); List hoodieKeyList = entry.getValue(); - - final String partitionPath = partitionPathFileNamePair.getLeft(); - final String fileId = partitionPathFileNamePair.getRight().getFileId(); + final String partitionPath = entry.getKey().getLeft(); + final HoodieBaseFile baseFile = entry.getKey().getRight(); + final String fileId = baseFile.getFileId(); ValidationUtils.checkState(!fileId.isEmpty()); + Pair partitionPathFileNamePair = Pair.of(partitionPath, baseFile.getFileName()); if (!fileToBloomFilterMap.containsKey(partitionPathFileNamePair)) { throw new HoodieIndexException("Failed to get the bloom filter for " + partitionPathFileNamePair); } From dbd715a0c9d98e2bbb69823025a6e314338bc17a Mon Sep 17 00:00:00 2001 From: Timothy Brown Date: Thu, 3 Aug 2023 19:58:54 -0700 Subject: [PATCH 14/14] lazy parse on hoodie log file, lazy init caching path as well --- .../cli/commands/HoodieLogFileCommand.java | 5 +- .../hudi/client/CompactionAdminClient.java | 5 +- .../hudi/common/model/HoodieLogFile.java | 57 +++++++++++++------ .../hudi/hadoop/realtime/RealtimeSplit.java | 8 ++- .../realtime/TestHoodieRealtimeFileSplit.java | 3 +- 5 files changed, 54 insertions(+), 24 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 82f9c1a646821..cf36a704c7d57 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -42,6 +42,7 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieMemoryConfig; +import org.apache.hudi.hadoop.CachingPath; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; @@ -232,9 +233,9 @@ public String showLogFileRecords( } else { for (String logFile : logFilePaths) { Schema writerSchema = new AvroSchemaConverter() - .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new Path(logFile)))); + .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(client.getFs(), new CachingPath(logFile)))); HoodieLogFormat.Reader reader = - HoodieLogFormat.newReader(fs, new HoodieLogFile(new Path(logFile)), writerSchema); + HoodieLogFormat.newReader(fs, new HoodieLogFile(new CachingPath(logFile)), writerSchema); // read the avro blocks while (reader.hasNext()) { HoodieLogBlock n = reader.next(); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java index 6445ca5b0719b..257d2cd855cc2 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/CompactionAdminClient.java @@ -41,6 +41,7 @@ import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.table.action.compact.OperationResult; import org.apache.hadoop.fs.FileStatus; @@ -244,7 +245,7 @@ protected static List> getRenamingActionsToAl merged.getLogFiles().filter(lf -> lf.getLogVersion() > maxVersion).collect(Collectors.toList()); return logFilesToBeMoved.stream().map(lf -> { ValidationUtils.checkArgument(lf.getLogVersion() - maxVersion > 0, "Expect new log version to be sane"); - HoodieLogFile newLogFile = new HoodieLogFile(new Path(lf.getPath().getParent(), + HoodieLogFile newLogFile = new HoodieLogFile(new CachingPath(lf.getPath().getParent(), FSUtils.makeLogFileName(lf.getFileId(), "." + lf.getFileExtension(), compactionInstant, lf.getLogVersion() - maxVersion, HoodieLogFormat.UNKNOWN_WRITE_TOKEN))); return Pair.of(lf, newLogFile); @@ -450,7 +451,7 @@ public List> getRenamingActionsForUnschedulin .orElse(fileSliceForCompaction.getLogFiles().findFirst().map(lf -> lf.getPath().getParent().toString()).get()); for (HoodieLogFile toRepair : logFilesToRepair) { int version = maxUsedVersion + 1; - HoodieLogFile newLf = new HoodieLogFile(new Path(parentPath, FSUtils.makeLogFileName(operation.getFileId(), + HoodieLogFile newLf = new HoodieLogFile(new CachingPath(parentPath, FSUtils.makeLogFileName(operation.getFileId(), logExtn, operation.getBaseInstantTime(), version, HoodieLogFormat.UNKNOWN_WRITE_TOKEN))); result.add(Pair.of(toRepair, newLf)); maxUsedVersion = version; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java index 0794188257860..ecfbd925dd144 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieLogFile.java @@ -53,16 +53,25 @@ public class HoodieLogFile implements Serializable { private transient FileStatus fileStatus; private transient Path path; private final String pathStr; - private final String fileId; - private final String baseCommitTime; - private final int logVersion; - private final String logWriteToken; - private final String fileExtension; - private final String suffix; + private String fileId; + private String baseCommitTime; + private int logVersion; + private String logWriteToken; + private String fileExtension; + private String suffix; private long fileLen; public HoodieLogFile(HoodieLogFile logFile) { - this(logFile.getFileStatus(), logFile.getPath(), logFile.pathStr, logFile.getFileSize()); + this.fileStatus = logFile.getFileStatus(); + this.path = logFile.getPath(); + this.pathStr = logFile.pathStr; + this.fileId = logFile.getFileId(); + this.baseCommitTime = logFile.getBaseCommitTime(); + this.logVersion = logFile.getLogVersion(); + this.logWriteToken = logFile.getLogWriteToken(); + this.fileExtension = logFile.getFileExtension(); + this.suffix = logFile.getSuffix(); + this.fileLen = logFile.getFileSize(); } public HoodieLogFile(FileStatus fileStatus) { @@ -85,16 +94,14 @@ private HoodieLogFile(FileStatus fileStatus, Path logPath, String logPathStr, lo this.fileStatus = fileStatus; this.pathStr = logPathStr; this.fileLen = fileLen; - if (logPath != null) { - if (logPath instanceof CachingPath) { - this.path = logPath; - } else { - this.path = new CachingPath(logPath.getParent(), logPath.getName()); - } - } else { - this.path = new CachingPath(pathStr); + this.logVersion = -1; // mark version as uninitialized + if (logPath instanceof CachingPath) { + this.path = logPath; } - Matcher matcher = LOG_FILE_PATTERN.matcher(path.getName()); + } + + private void parseFieldsFromPath() { + Matcher matcher = LOG_FILE_PATTERN.matcher(getPath().getName()); if (!matcher.find()) { throw new InvalidHoodiePathException(path, "LogFile"); } @@ -107,26 +114,44 @@ private HoodieLogFile(FileStatus fileStatus, Path logPath, String logPathStr, lo } public String getFileId() { + if (fileId == null) { + parseFieldsFromPath(); + } return fileId; } public String getBaseCommitTime() { + if (baseCommitTime == null) { + parseFieldsFromPath(); + } return baseCommitTime; } public int getLogVersion() { + if (logVersion == -1) { + parseFieldsFromPath(); + } return logVersion; } public String getLogWriteToken() { + if (logWriteToken == null) { + parseFieldsFromPath(); + } return logWriteToken; } public String getFileExtension() { + if (fileExtension == null) { + parseFieldsFromPath(); + } return fileExtension; } public String getSuffix() { + if (suffix == null) { + parseFieldsFromPath(); + } return suffix; } diff --git a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java index 4b0b2d6ea79e2..043122fbdf867 100644 --- a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java +++ b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/RealtimeSplit.java @@ -18,12 +18,14 @@ package org.apache.hudi.hadoop.realtime; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.InputSplitWithLocationInfo; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.util.Option; +import org.apache.hudi.hadoop.CachingPath; import org.apache.hudi.hadoop.InputSplitUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.InputSplitWithLocationInfo; + import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; @@ -126,7 +128,7 @@ default void readFromInput(DataInput in) throws IOException { for (int i = 0; i < totalLogFiles; i++) { String logFilePath = InputSplitUtils.readString(in); long logFileSize = in.readLong(); - deltaLogPaths.add(new HoodieLogFile(new Path(logFilePath), logFileSize)); + deltaLogPaths.add(new HoodieLogFile(new CachingPath(logFilePath), logFileSize)); } setDeltaLogFiles(deltaLogPaths); diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java index d2c4f1be61d9f..4b0f379aedb8d 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeFileSplit.java @@ -70,7 +70,8 @@ public class TestHoodieRealtimeFileSplit { @BeforeEach public void setUp(@TempDir java.nio.file.Path tempDir) throws Exception { basePath = tempDir.toAbsolutePath().toString(); - deltaLogFiles = Collections.singletonList(new HoodieLogFile(new Path(basePath + "/1.log"), 0L)); + Path logPath = new Path(basePath + "/1.log"); + deltaLogFiles = Collections.singletonList(new HoodieLogFile(logPath, 0L)); deltaLogPaths = Collections.singletonList(basePath + "/1.log"); fileSplitName = basePath + "/test.file"; baseFileSplit = new FileSplit(new Path(fileSplitName), 0, 100, new String[] {});