Skip to content

Commit

Permalink
improve spillableMapBasePath disk directory is full
Browse files Browse the repository at this point in the history
  • Loading branch information
XuQianJin-Stars committed Oct 14, 2022
1 parent eed98de commit d4bb16c
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;

import javax.annotation.concurrent.Immutable;

Expand Down Expand Up @@ -77,10 +80,19 @@ public class HoodieMemoryConfig extends HoodieConfig {
.defaultValue(16 * 1024 * 1024)
.withDocumentation("Property to control the max memory for dfs input stream buffer size");

public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
public static ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
.key("hoodie.memory.spillable.map.path")
.defaultValue("/tmp/")
.withDocumentation("Default file path prefix for spillable map");
.withInferFunction(cfg -> {
// if user doesn't configure it, infer the path from env var
// if nothing to infer from, return Option.empty() then it will fallback to the default /tmp/
if (cfg.contains(HoodieTableConfig.SPILLABLE_MAP_BASE_PATH)) {
return Option.of(cfg.getString(HoodieTableConfig.SPILLABLE_MAP_BASE_PATH));
}
String[] localDirs = FileIOUtils.getConfiguredLocalDirs();
return (localDirs != null && localDirs.length > 0) ? Option.of(localDirs[0]) : Option.of("/tmp/");
})
.withDocumentation("Default file path for spillable map");

public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION = ConfigProperty
.key("hoodie.memory.writestatus.failure.fraction")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ public class HoodieTableConfig extends HoodieConfig {
.defaultValue(false)
.withDocumentation("When set to true, will not write the partition columns into hudi. By default, false.");


public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
.key("hoodie.memory.spillable.map.path")
.noDefaultValue()
.withDocumentation("Default file path prefix for spillable map");

public static final ConfigProperty<String> URL_ENCODE_PARTITIONING = KeyGeneratorOptions.URL_ENCODE_PARTITIONING;
public static final ConfigProperty<String> HIVE_STYLE_PARTITIONING_ENABLE = KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hudi.common.table.log;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand All @@ -27,20 +30,14 @@
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.internal.schema.InternalSchema;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -92,8 +89,6 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<Stri
instantRange, withOperationField,
forceFullScan, partitionName, internalSchema);
try {
String[] localDirs = FileIOUtils.getConfiguredLocalDirs();
spillableMapBasePath = localDirs.length > 0 ? localDirs[0] : spillableMapBasePath;
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);
Expand Down

0 comments on commit d4bb16c

Please sign in to comment.