Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-4526] Improve spillableMapBasePath when disk directory is full #6284

Merged
merged 2 commits into from
Nov 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.hudi.common.config.ConfigGroups;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;

import javax.annotation.concurrent.Immutable;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
Expand Down Expand Up @@ -80,7 +81,11 @@ public class HoodieMemoryConfig extends HoodieConfig {
public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
.key("hoodie.memory.spillable.map.path")
.defaultValue("/tmp/")
.withDocumentation("Default file path prefix for spillable map");
.withInferFunction(cfg -> {
String[] localDirs = FileIOUtils.getConfiguredLocalDirs();
return (localDirs != null && localDirs.length > 0) ? Option.of(localDirs[0]) : Option.empty();
})
.withDocumentation("Default file path for spillable map");

public static final ConfigProperty<Double> WRITESTATUS_FAILURE_FRACTION = ConfigProperty
.key("hoodie.memory.writestatus.failure.fraction")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.hudi.common.table.log;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.config.HoodieCommonConfig;
import org.apache.hudi.common.model.DeleteRecord;
import org.apache.hudi.common.model.HoodieAvroRecord;
Expand All @@ -35,12 +38,7 @@
import org.apache.hudi.common.util.SpillableMapUtils;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.exception.HoodieIOException;

import org.apache.avro.Schema;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hudi.internal.schema.InternalSchema;

import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

Expand Down Expand Up @@ -97,6 +95,7 @@ protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath, List<Stri
// Store merged records for all versions for this log file, set the in-memory footprint to maxInMemoryMapSize
this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, spillableMapBasePath, new DefaultSizeEstimator(),
new HoodieRecordSizeEstimator(readerSchema), diskMapType, isBitCaskDiskMapCompressionEnabled);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when we clean these files then ?

Copy link
Contributor Author

@XuQianJin-Stars XuQianJin-Stars Aug 3, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So when we clean these files then ?

  1. BitCaskDiskMap set writeOnlyFile.deleteOnExit()
  2. RocksDbDiskMap::close

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why the dir is full if it is cleaned in time ?

Copy link
Contributor Author

@XuQianJin-Stars XuQianJin-Stars Aug 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So why the dir is full if it is cleaned in time ?

When spark writes multiple jobs concurrently, there are not only hudi jobs, but the tmp directory shared by many jobs will cause it to explode.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we can do this. spillableMapbase path is configurable. If one does not want "/tmp/" which is the default, they can always override using the configs.

Copy link
Contributor Author

@XuQianJin-Stars XuQianJin-Stars Aug 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if we can do this. spillableMapbase path is configurable. If one does not want "/tmp/" which is the default, they can always override using the configs.

It is more troublesome for users to use, and this option needs to be specified additionally, and considering that the basepath will definitely be in a large data disk, the temporary directory can be put into the same disk as the basepath.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basepath is hdfs path, spillableMapbase_path is local path,
It is wrong to use hdfs path directly as spillableMapbase_path

this.maxMemorySizeInBytes = maxMemorySizeInBytes;
} catch (IOException e) {
throw new HoodieIOException("IOException when creating ExternalSpillableMap at " + spillableMapBasePath, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,40 @@ public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.
public static Option<byte[]> readDataFromPath(FileSystem fileSystem, org.apache.hadoop.fs.Path detailPath) {
return readDataFromPath(fileSystem, detailPath, false);
}

/**
* Return the configured local directories where hudi can write files. This
* method does not create any directories on its own, it only encapsulates the
* logic of locating the local directories according to deployment mode.
*/
public static String[] getConfiguredLocalDirs() {
if (isRunningInYarnContainer()) {
// If we are in yarn mode, systems can have different disk layouts so we must set it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We totally ignore the option through hoodie write config then ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We totally ignore the option through hoodie write config then ?

First, get the local dir from the yarn container. If you can't get it, you can get it from the hoodie write config. In this way, users on yarn basically do not need to actively modify the hoodie write config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can't get it, you can get it from the hoodie write config

I didn't see the logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you can't get it, you can get it from the hoodie write config

I didn't see the logic.

spillableMapBasePath = localDirs.length > 0 ? localDirs[0] : spillableMapBasePath;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, should we take the write config as higher priority then ? If user configured it explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, should we take the write config as higher priority then ? If user configured it explicitly.

It may be necessary to remove the default value

  public static final ConfigProperty<String> SPILLABLE_MAP_BASE_PATH = ConfigProperty
      .key("hoodie.memory.spillable.map.path")
      .defaultValue("/tmp/")
      .withDocumentation("Default file path prefix for spillable map");

// to what Yarn on this system said was available. Note this assumes that Yarn has
// created the directories already, and that they are secured so that only the
// user has access to them.
return getYarnLocalDirs().split(",");
} else if (System.getProperty("java.io.tmpdir") != null) {
return System.getProperty("java.io.tmpdir").split(",");
} else {
return null;
}
}

private static boolean isRunningInYarnContainer() {
// These environment variables are set by YARN.
return System.getenv("CONTAINER_ID") != null;
}

/**
* Get the Yarn approved local directories.
*/
private static String getYarnLocalDirs() {
String localDirs = Option.of(System.getenv("LOCAL_DIRS")).orElse("");

if (localDirs.isEmpty()) {
throw new HoodieIOException("Yarn Local dirs can't be empty");
}
return localDirs;
}
}