Skip to content

Commit

Permalink
YARN-160. Enhanced NodeManager to automatically obtain cpu/memory val…
Browse files Browse the repository at this point in the history
…ues from underlying OS when configured to do so. Contributed by Varun Vasudev.
  • Loading branch information
vinoduec committed May 26, 2015
1 parent 022f49d commit 500a1d9
Show file tree
Hide file tree
Showing 16 changed files with 696 additions and 107 deletions.
Expand Up @@ -88,6 +88,12 @@ public int getNumProcessors() {
return getConf().getInt(NUM_PROCESSORS, -1);
}

/** {@inheritDoc} */
@Override
public int getNumCores() {
return getNumProcessors();
}

/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
Expand Down
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -120,6 +120,9 @@ Release 2.8.0 - UNRELEASED
YARN-3541. Add version info on timeline service / generic history web UI
and REST API. (Zhijie Shen via xgong)

YARN-160. Enhanced NodeManager to automatically obtain cpu/memory values from
underlying OS when configured to do so. (Varun Vasudev via vinodkv)

IMPROVEMENTS

YARN-644. Basic null check is not performed on passed in arguments before
Expand Down
Expand Up @@ -804,10 +804,14 @@ private static void addDeprecatedKeys() {
public static final String YARN_TRACKING_URL_GENERATOR =
YARN_PREFIX + "tracking.url.generator";

/** Amount of memory in GB that can be allocated for containers.*/
/** Amount of memory in MB that can be allocated for containers.*/
public static final String NM_PMEM_MB = NM_PREFIX + "resource.memory-mb";
public static final int DEFAULT_NM_PMEM_MB = 8 * 1024;

/** Amount of memory in MB that has been reserved for non-yarn use. */
public static final String NM_SYSTEM_RESERVED_PMEM_MB = NM_PREFIX
+ "resource.system-reserved-memory-mb";

/** Specifies whether physical memory check is enabled. */
public static final String NM_PMEM_CHECK_ENABLED = NM_PREFIX
+ "pmem-check-enabled";
Expand All @@ -827,12 +831,29 @@ private static void addDeprecatedKeys() {
public static final String NM_VCORES = NM_PREFIX + "resource.cpu-vcores";
public static final int DEFAULT_NM_VCORES = 8;

/** Count logical processors(like hyperthreads) as cores. */
public static final String NM_COUNT_LOGICAL_PROCESSORS_AS_CORES = NM_PREFIX
+ "resource.count-logical-processors-as-cores";
public static final boolean DEFAULT_NM_COUNT_LOGICAL_PROCESSORS_AS_CORES =
false;

/** Multiplier to convert physical cores to vcores. */
public static final String NM_PCORES_VCORES_MULTIPLIER = NM_PREFIX
+ "resource.pcores-vcores-multiplier";
public static final float DEFAULT_NM_PCORES_VCORES_MULTIPLIER = 1.0f;

/** Percentage of overall CPU which can be allocated for containers. */
public static final String NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
NM_PREFIX + "resource.percentage-physical-cpu-limit";
public static final int DEFAULT_NM_RESOURCE_PERCENTAGE_PHYSICAL_CPU_LIMIT =
100;

/** Enable or disable node hardware capability detection. */
public static final String NM_ENABLE_HARDWARE_CAPABILITY_DETECTION =
NM_PREFIX + "resource.detect-hardware-capabilities";
public static final boolean DEFAULT_NM_ENABLE_HARDWARE_CAPABILITY_DETECTION =
false;

/**
* Prefix for disk configurations. Work in progress: This configuration
* parameter may be changed/removed in the future.
Expand Down
Expand Up @@ -25,9 +25,11 @@
import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset;
import java.util.HashSet;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -58,41 +60,48 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final String INACTIVE_STRING = "Inactive";

/**
* Patterns for parsing /proc/cpuinfo
* Patterns for parsing /proc/cpuinfo.
*/
private static final String PROCFS_CPUINFO = "/proc/cpuinfo";
private static final Pattern PROCESSOR_FORMAT =
Pattern.compile("^processor[ \t]:[ \t]*([0-9]*)");
private static final Pattern FREQUENCY_FORMAT =
Pattern.compile("^cpu MHz[ \t]*:[ \t]*([0-9.]*)");
private static final Pattern PHYSICAL_ID_FORMAT =
Pattern.compile("^physical id[ \t]*:[ \t]*([0-9]*)");
private static final Pattern CORE_ID_FORMAT =
Pattern.compile("^core id[ \t]*:[ \t]*([0-9]*)");

/**
* Pattern for parsing /proc/stat
* Pattern for parsing /proc/stat.
*/
private static final String PROCFS_STAT = "/proc/stat";
private static final Pattern CPU_TIME_FORMAT =
Pattern.compile("^cpu[ \t]*([0-9]*)" +
Pattern.compile("^cpu[ \t]*([0-9]*)" +
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
private CpuTimeTracker cpuTimeTracker;

private String procfsMemFile;
private String procfsCpuFile;
private String procfsStatFile;
long jiffyLengthInMillis;
private long jiffyLengthInMillis;

private long ramSize = 0;
private long swapSize = 0;
private long ramSizeFree = 0; // free ram space on the machine (kB)
private long swapSizeFree = 0; // free swap space on the machine (kB)
private long inactiveSize = 0; // inactive cache memory (kB)
private int numProcessors = 0; // number of processors on the system
/* number of logical processors on the system. */
private int numProcessors = 0;
/* number of physical cores on the system. */
private int numCores = 0;
private long cpuFrequency = 0L; // CPU frequency on the system (kHz)

boolean readMemInfoFile = false;
boolean readCpuInfoFile = false;
private boolean readMemInfoFile = false;
private boolean readCpuInfoFile = false;

/**
* Get current time
* Get current time.
* @return Unix time stamp in millisecond
*/
long getCurrentTime() {
Expand All @@ -106,7 +115,7 @@ public LinuxResourceCalculatorPlugin() {

/**
* Constructor which allows assigning the /proc/ directories. This will be
* used only in unit tests
* used only in unit tests.
* @param procfsMemFile fake file for /proc/meminfo
* @param procfsCpuFile fake file for /proc/cpuinfo
* @param procfsStatFile fake file for /proc/stat
Expand All @@ -124,14 +133,14 @@ public LinuxResourceCalculatorPlugin(String procfsMemFile,
}

/**
* Read /proc/meminfo, parse and compute memory information only once
* Read /proc/meminfo, parse and compute memory information only once.
*/
private void readProcMemInfoFile() {
readProcMemInfoFile(false);
}

/**
* Read /proc/meminfo, parse and compute memory information
* Read /proc/meminfo, parse and compute memory information.
* @param readAgain if false, read only on the first time
*/
private void readProcMemInfoFile(boolean readAgain) {
Expand All @@ -141,18 +150,20 @@ private void readProcMemInfoFile(boolean readAgain) {
}

// Read "/proc/memInfo" file
BufferedReader in = null;
InputStreamReader fReader = null;
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsMemFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
LOG.warn("Couldn't read " + procfsMemFile
+ "; can't determine memory settings");
return;
}

Matcher mat = null;
Matcher mat;

try {
String str = in.readLine();
Expand Down Expand Up @@ -193,27 +204,31 @@ private void readProcMemInfoFile(boolean readAgain) {
}

/**
* Read /proc/cpuinfo, parse and calculate CPU information
* Read /proc/cpuinfo, parse and calculate CPU information.
*/
private void readProcCpuInfoFile() {
// This directory needs to be read only once
if (readCpuInfoFile) {
return;
}
HashSet<String> coreIdSet = new HashSet<>();
// Read "/proc/cpuinfo" file
BufferedReader in = null;
InputStreamReader fReader = null;
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsCpuFile), Charset.forName("UTF-8"));
in = new BufferedReader(fReader);
} catch (FileNotFoundException f) {
// shouldn't happen....
LOG.warn("Couldn't read " + procfsCpuFile + "; can't determine cpu info");
return;
}
Matcher mat = null;
Matcher mat;
try {
numProcessors = 0;
numCores = 1;
String currentPhysicalId = "";
String str = in.readLine();
while (str != null) {
mat = PROCESSOR_FORMAT.matcher(str);
Expand All @@ -224,6 +239,15 @@ private void readProcCpuInfoFile() {
if (mat.find()) {
cpuFrequency = (long)(Double.parseDouble(mat.group(1)) * 1000); // kHz
}
mat = PHYSICAL_ID_FORMAT.matcher(str);
if (mat.find()) {
currentPhysicalId = str;
}
mat = CORE_ID_FORMAT.matcher(str);
if (mat.find()) {
coreIdSet.add(currentPhysicalId + " " + str);
numCores = coreIdSet.size();
}
str = in.readLine();
}
} catch (IOException io) {
Expand All @@ -245,12 +269,12 @@ private void readProcCpuInfoFile() {
}

/**
* Read /proc/stat file, parse and calculate cumulative CPU
* Read /proc/stat file, parse and calculate cumulative CPU.
*/
private void readProcStatFile() {
// Read "/proc/stat" file
BufferedReader in = null;
InputStreamReader fReader = null;
BufferedReader in;
InputStreamReader fReader;
try {
fReader = new InputStreamReader(
new FileInputStream(procfsStatFile), Charset.forName("UTF-8"));
Expand All @@ -260,7 +284,7 @@ private void readProcStatFile() {
return;
}

Matcher mat = null;
Matcher mat;
try {
String str = in.readLine();
while (str != null) {
Expand Down Expand Up @@ -328,6 +352,13 @@ public int getNumProcessors() {
return numProcessors;
}

/** {@inheritDoc} */
@Override
public int getNumCores() {
readProcCpuInfoFile();
return numCores;
}

/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
Expand All @@ -354,9 +385,9 @@ public float getCpuUsage() {
}

/**
* Test the {@link LinuxResourceCalculatorPlugin}
* Test the {@link LinuxResourceCalculatorPlugin}.
*
* @param args
* @param args - arguments to this calculator test
*/
public static void main(String[] args) {
LinuxResourceCalculatorPlugin plugin = new LinuxResourceCalculatorPlugin();
Expand All @@ -380,4 +411,13 @@ public static void main(String[] args) {
}
System.out.println("CPU usage % : " + plugin.getCpuUsage());
}

@VisibleForTesting
void setReadCpuInfoFile(boolean readCpuInfoFileValue) {
this.readCpuInfoFile = readCpuInfoFileValue;
}

public long getJiffyLengthInMillis() {
return this.jiffyLengthInMillis;
}
}
Expand Up @@ -64,12 +64,19 @@ public abstract class ResourceCalculatorPlugin extends Configured {
public abstract long getAvailablePhysicalMemorySize();

/**
* Obtain the total number of processors present on the system.
* Obtain the total number of logical processors present on the system.
*
* @return number of processors
* @return number of logical processors
*/
public abstract int getNumProcessors();

/**
* Obtain total number of physical cores present on the system.
*
* @return number of physical cores
*/
public abstract int getNumCores();

/**
* Obtain the CPU frequency of on the system.
*
Expand Down
Expand Up @@ -147,6 +147,12 @@ public int getNumProcessors() {
return numProcessors;
}

/** {@inheritDoc} */
@Override
public int getNumCores() {
return getNumProcessors();
}

/** {@inheritDoc} */
@Override
public long getCpuFrequency() {
Expand Down

0 comments on commit 500a1d9

Please sign in to comment.