Skip to content

Commit

Permalink
[HUDI-4316] Support for spillable diskmap configuration when construc…
Browse files Browse the repository at this point in the history
…ting HoodieMergedLogRecordScanner (#5959)
  • Loading branch information
cxzl25 committed Jun 27, 2022
1 parent 7a6eb0f commit 72fa19b
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ private List<HoodieRecord<T>> readRecordsForGroupWithLogs(List<ClusteringOperati
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();

Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,8 @@ private HoodieData<HoodieRecord<T>> readRecordsForGroupWithLogs(JavaSparkContext
.withBufferSize(config.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(config.getSpillableMapBasePath())
.withPartition(clusteringOp.getPartitionPath())
.withDiskMapType(config.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();

Option<HoodieFileReader> baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ private Iterator<RowData> readRecordsForGroupWithLogs(List<ClusteringOperation>
.withReverseReader(writeConfig.getCompactionReverseLogReadEnabled())
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.build();

HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ public static HoodieMergedLogRecordScanner logScanner(
.withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
.withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
.withInstantRange(split.getInstantRange())
.withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption}
import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability}
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig}
import org.apache.hudi.common.engine.HoodieLocalEngineContext
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath
Expand Down Expand Up @@ -361,7 +361,12 @@ private object HoodieMergeOnReadRDD {
.withSpillableMapBasePath(
hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP,
HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH))

.withDiskMapType(
hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key,
HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue))
.withBitCaskDiskMapCompressionEnabled(
hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(),
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))
if (logFiles.nonEmpty) {
logRecordScannerBuilder.withPartition(
getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))
Expand Down

0 comments on commit 72fa19b

Please sign in to comment.