Skip to content
Permalink
Browse files
[MINOR] Inline the partition path logic into the builder (#5310)
  • Loading branch information
danny0405 committed Apr 13, 2022
1 parent 43de2b4 commit 0281725c6bc3d5bc0c2a94a53a819fe6949976ba
Showing 8 changed files with 38 additions and 88 deletions.
@@ -67,7 +67,6 @@
import scala.Tuple2;
import scala.Tuple3;

import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.ValidationUtils.checkArgument;

/**
@@ -221,7 +220,6 @@ public String showLogFileRecords(
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withPartition(getRelativePartitionPath(new Path(client.getBasePath()), new Path(logFilePaths.get(0)).getParent()))
.build();
for (HoodieRecord<? extends HoodieRecordPayload> hoodieRecord : scanner) {
Option<IndexedRecord> record = hoodieRecord.getData().getInsertValue(readerSchema);
@@ -65,7 +65,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -204,7 +203,6 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
// get expected result of 10 records.
List<String> logFilePaths = Arrays.stream(fs.globStatus(new Path(partitionPath + "/*")))
.map(status -> status.getPath().toString()).collect(Collectors.toList());
assertTrue(logFilePaths.size() > 0);
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(tablePath)
@@ -223,7 +221,6 @@ public void testShowLogFileRecordsWithMerge() throws IOException, InterruptedExc
.withSpillableMapBasePath(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.defaultValue())
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.withPartition(getRelativePartitionPath(new Path(tablePath), new Path(logFilePaths.get(0)).getParent()))
.build();

Iterator<HoodieRecord<? extends HoodieRecordPayload>> records = scanner.iterator();
@@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieTimer;
@@ -37,6 +38,8 @@
import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.internal.schema.InternalSchema;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

@@ -45,6 +48,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.ValidationUtils.checkState;

/**
@@ -310,6 +314,9 @@ public Builder withPartition(String partitionName) {

@Override
public HoodieMergedLogRecordScanner build() {
if (this.partitionName == null && CollectionUtils.nonEmpty(this.logFilePaths)) {
this.partitionName = getRelativePartitionPath(new Path(basePath), new Path(this.logFilePaths.get(0)).getParent());
}
return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, readerSchema,
latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, reverseReader,
bufferSize, spillableMapBasePath, instantRange,
@@ -89,7 +89,6 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.testutils.SchemaTestUtil.getSimpleSchema;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -575,13 +574,12 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
writer.close();
FileCreateUtils.createDeltaCommit(basePath, "100", fs);
// scan all log blocks (across multiple log files)
List<String> logFilePaths = logFiles.stream()
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList());
assertTrue(logFilePaths.size() > 0);
HoodieMergedLogRecordScanner scanner = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(logFilePaths)
.withLogFilePaths(
logFiles.stream()
.map(logFile -> logFile.getPath().toString()).collect(Collectors.toList()))
.withReaderSchema(schema)
.withLatestInstantTime("100")
.withMaxMemorySizeInBytes(10240L)
@@ -591,7 +589,6 @@ public void testBasicAppendAndScanMultipleFiles(ExternalSpillableMap.DiskMapType
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logFilePaths.get(0)).getParent()))
.build();

List<IndexedRecord> scannedRecords = new ArrayList<>();
@@ -806,7 +803,6 @@ public void testAvroLogRecordReaderBasic(ExternalSpillableMap.DiskMapType diskMa
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(200, scanner.getTotalLogRecords());
Set<String> readKeys = new HashSet<>(200);
@@ -885,7 +881,6 @@ public void testAvroLogRecordReaderWithRollbackTombstone(ExternalSpillableMap.Di
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We read 200 records from 2 write batches");
Set<String> readKeys = new HashSet<>(200);
@@ -973,7 +968,6 @@ public void testAvroLogRecordReaderWithFailedPartialBlock(ExternalSpillableMap.D
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(200, scanner.getTotalLogRecords(), "We would read 200 records");
Set<String> readKeys = new HashSet<>(200);
@@ -1052,7 +1046,6 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();

assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records");
@@ -1099,7 +1092,6 @@ public void testAvroLogRecordReaderWithDeleteAndRollback(ExternalSpillableMap.Di
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
scanner.forEach(s -> readKeys.add(s.getKey().getRecordKey()));
assertEquals(200, readKeys.size(), "Stream collect should return all 200 records after rollback of delete");
@@ -1195,7 +1187,6 @@ public void testAvroLogRecordReaderWithDisorderDelete(ExternalSpillableMap.DiskM
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();

assertEquals(200, scanner.getTotalLogRecords(), "We still would read 200 records");
@@ -1299,7 +1290,6 @@ public void testAvroLogRecordReaderWithFailedRollbacks(ExternalSpillableMap.Disk
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would have scanned 0 records because of rollback");

@@ -1368,7 +1358,6 @@ public void testAvroLogRecordReaderWithInsertDeleteAndRollback(ExternalSpillable
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
@@ -1420,7 +1409,6 @@ public void testAvroLogRecordReaderWithInvalidRollback(ExternalSpillableMap.Disk
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(100, scanner.getTotalLogRecords(), "We still would read 100 records");
final List<String> readKeys = new ArrayList<>(100);
@@ -1491,7 +1479,6 @@ public void testAvroLogRecordReaderWithInsertsDeleteAndRollback(ExternalSpillabl
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
}
@@ -1598,7 +1585,6 @@ public void testAvroLogRecordReaderWithMixedInsertsCorruptsAndRollback(ExternalS
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();
assertEquals(0, scanner.getTotalLogRecords(), "We would read 0 records");
FileCreateUtils.deleteDeltaCommit(basePath, "100", fs);
@@ -1673,7 +1659,6 @@ private void testAvroLogRecordReaderMergingMultipleLogFiles(int numRecordsInLog1
.withSpillableMapBasePath(BASE_OUTPUT_PATH)
.withDiskMapType(diskMapType)
.withBitCaskDiskMapCompressionEnabled(isCompressionEnabled)
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(allLogFiles.get(0)).getParent()))
.build();

assertEquals(Math.max(numRecordsInLog1, numRecordsInLog2), scanner.getNumMergedRecordsInLog(),
@@ -57,8 +57,6 @@
import java.util.stream.IntStream;

import static junit.framework.TestCase.assertEquals;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -348,7 +346,7 @@ private static HoodieMergedLogRecordScanner getScanner(
List<String> logPaths,
Schema readSchema,
String instant) {
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(logPaths)
@@ -360,12 +358,8 @@ private static HoodieMergedLogRecordScanner getScanner(
.withMaxMemorySizeInBytes(1024 * 1024L)
.withSpillableMapBasePath("/tmp/")
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build();
}

/**
@@ -18,6 +18,7 @@

package org.apache.hudi.table.format;

import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
@@ -42,7 +43,6 @@
import org.apache.flink.types.RowKind;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.util.ArrayList;
import java.util.Arrays;
@@ -52,10 +52,6 @@
import java.util.Map;
import java.util.function.Function;

import static org.apache.hudi.common.fs.FSUtils.getFs;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;

/**
* Utilities for format.
*/
@@ -128,13 +124,11 @@ public static HoodieMergedLogRecordScanner logScanner(
Schema logSchema,
Configuration config,
boolean withOperationField) {
String basePath = split.getTablePath();
List<String> logPaths = split.getLogPaths().get();
FileSystem fs = getFs(basePath, config);
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(logPaths)
.withBasePath(split.getTablePath())
.withLogFilePaths(split.getLogPaths().get())
.withReaderSchema(logSchema)
.withLatestInstantTime(split.getLatestCommit())
.withReadBlocksLazily(
@@ -150,20 +144,16 @@ public static HoodieMergedLogRecordScanner logScanner(
config.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))
.withInstantRange(split.getInstantRange())
.withOperationField(withOperationField);
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
.withOperationField(withOperationField)
.build();
}

private static HoodieUnMergedLogRecordScanner unMergedLogScanner(
MergeOnReadInputSplit split,
Schema logSchema,
Configuration config,
HoodieUnMergedLogRecordScanner.LogRecordScannerCallback callback) {
FileSystem fs = getFs(split.getTablePath(), config);
FileSystem fs = FSUtils.getFs(split.getTablePath(), config);
return HoodieUnMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(split.getTablePath())
@@ -244,8 +234,8 @@ public static HoodieMergedLogRecordScanner logScanner(
HoodieWriteConfig writeConfig,
Configuration hadoopConf) {
String basePath = writeConfig.getBasePath();
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(getFs(basePath, hadoopConf))
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(FSUtils.getFs(basePath, hadoopConf))
.withBasePath(basePath)
.withLogFilePaths(logPaths)
.withReaderSchema(logSchema)
@@ -256,12 +246,8 @@ public static HoodieMergedLogRecordScanner logScanner(
.withMaxMemorySizeInBytes(writeConfig.getMaxMemoryPerPartitionMerge())
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled());
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();
}

private static Boolean string2Boolean(String s) {
@@ -67,8 +67,6 @@
import java.util.stream.IntStream;

import static junit.framework.TestCase.assertEquals;
import static org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath;
import static org.apache.hudi.common.util.CollectionUtils.isNullOrEmpty;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -663,7 +661,7 @@ private static HoodieMergedLogRecordScanner getScanner(
List<String> logPaths,
Schema readSchema,
String instant) {
HoodieMergedLogRecordScanner.Builder logRecordScannerBuilder = HoodieMergedLogRecordScanner.newBuilder()
return HoodieMergedLogRecordScanner.newBuilder()
.withFileSystem(fs)
.withBasePath(basePath)
.withLogFilePaths(logPaths)
@@ -675,12 +673,8 @@ private static HoodieMergedLogRecordScanner getScanner(
.withMaxMemorySizeInBytes(1024 * 1024L)
.withSpillableMapBasePath("/tmp/")
.withDiskMapType(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue())
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue());
if (!isNullOrEmpty(logPaths)) {
logRecordScannerBuilder
.withPartition(getRelativePartitionPath(new Path(basePath), new Path(logPaths.get(0)).getParent()));
}
return logRecordScannerBuilder.build();
.withBitCaskDiskMapCompressionEnabled(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())
.build();
}

/**

0 comments on commit 0281725

Please sign in to comment.