diff --git a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java index 233c70ecf9eb..adcbb874e84e 100644 --- a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java +++ b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/JavaExecutionStrategy.java @@ -191,6 +191,8 @@ private List> readRecordsForGroupWithLogs(List baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java index e09457f0e513..df0ad6e2b82d 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java @@ -221,6 +221,8 @@ private HoodieData> readRecordsForGroupWithLogs(JavaSparkContext .withBufferSize(config.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(config.getSpillableMapBasePath()) .withPartition(clusteringOp.getPartitionPath()) + .withDiskMapType(config.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); Option baseFileReader = StringUtils.isNullOrEmpty(clusteringOp.getDataFilePath()) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java index 676532402c7e..ca48dcf6cbad 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/clustering/ClusteringOperator.java @@ -269,6 +269,8 @@ private Iterator readRecordsForGroupWithLogs(List .withReverseReader(writeConfig.getCompactionReverseLogReadEnabled()) .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) + .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .build(); HoodieTableConfig tableConfig = table.getMetaClient().getTableConfig(); diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java index eb058597f805..8adbde355cf7 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java @@ -136,6 +136,7 @@ public static HoodieMergedLogRecordScanner logScanner( .withBufferSize(writeConfig.getMaxDFSStreamBufferSize()) .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes()) .withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType()) + .withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled()) .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath()) .withInstantRange(split.getInstantRange()) .withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED)) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala index 2fdb9b882e67..cd38e233e1fb 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapred.JobConf import org.apache.hudi.HoodieConversionUtils.{toJavaOption, toScalaOption} import org.apache.hudi.HoodieMergeOnReadRDD.{AvroDeserializerSupport, collectFieldOrdinals, getPartitionPath, projectAvro, projectAvroUnsafe, projectRowUnsafe, resolveAvroSchemaNullability} -import org.apache.hudi.common.config.HoodieMetadataConfig +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMetadataConfig} import org.apache.hudi.common.engine.HoodieLocalEngineContext import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.fs.FSUtils.getRelativePartitionPath @@ -361,7 +361,12 @@ private object HoodieMergeOnReadRDD { .withSpillableMapBasePath( hadoopConf.get(HoodieRealtimeConfig.SPILLABLE_MAP_BASE_PATH_PROP, HoodieRealtimeConfig.DEFAULT_SPILLABLE_MAP_BASE_PATH)) - + .withDiskMapType( + hadoopConf.getEnum(HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.key, + HoodieCommonConfig.SPILLABLE_DISK_MAP_TYPE.defaultValue)) + .withBitCaskDiskMapCompressionEnabled( + hadoopConf.getBoolean(HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), + HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) if (logFiles.nonEmpty) { logRecordScannerBuilder.withPartition( getRelativePartitionPath(new Path(tableState.tablePath), logFiles.head.getPath.getParent))