Skip to content

Commit

Permalink
YARN-3122. Metrics for container's actual CPU usage. (Anubhav Dhoot v…
Browse files Browse the repository at this point in the history
…ia kasha)
  • Loading branch information
kambatla committed Mar 5, 2015
1 parent 722b479 commit 53947f3
Show file tree
Hide file tree
Showing 12 changed files with 311 additions and 53 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -351,6 +351,9 @@ Release 2.7.0 - UNRELEASED
YARN-3272. Surface container locality info in RM web UI. YARN-3272. Surface container locality info in RM web UI.
(Jian He via wangda) (Jian He via wangda)


YARN-3122. Metrics for container's actual CPU usage.
(Anubhav Dhoot via kasha)

OPTIMIZATIONS OPTIMIZATIONS


YARN-2990. FairScheduler's delay-scheduling always waits for node-local and YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
Expand Down
@@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.util;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

import java.math.BigInteger;

@InterfaceAudience.Private
@InterfaceStability.Unstable
public class CpuTimeTracker {
public static final int UNAVAILABLE = -1;
final long MINIMUM_UPDATE_INTERVAL;

// CPU used time since system is on (ms)
BigInteger cumulativeCpuTime = BigInteger.ZERO;

// CPU used time read last time (ms)
BigInteger lastCumulativeCpuTime = BigInteger.ZERO;

// Unix timestamp while reading the CPU time (ms)
long sampleTime;
long lastSampleTime;
float cpuUsage;
BigInteger jiffyLengthInMillis;

public CpuTimeTracker(long jiffyLengthInMillis) {
this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis);
this.cpuUsage = UNAVAILABLE;
this.sampleTime = UNAVAILABLE;
this.lastSampleTime = UNAVAILABLE;
MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis;
}

/**
* Return percentage of cpu time spent over the time since last update.
* CPU time spent is based on elapsed jiffies multiplied by amount of
* time for 1 core. Thus, if you use 2 cores completely you would have spent
* twice the actual time between updates and this will return 200%.
*
* @return Return percentage of cpu usage since last update, {@link
* CpuTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than
* {@link CpuTimeTracker#MINIMUM_UPDATE_INTERVAL} apart
*/
public float getCpuTrackerUsagePercent() {
if (lastSampleTime == UNAVAILABLE ||
lastSampleTime > sampleTime) {
// lastSampleTime > sampleTime may happen when the system time is changed
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
return cpuUsage;
}
// When lastSampleTime is sufficiently old, update cpuUsage.
// Also take a sample of the current time and cumulative CPU time for the
// use of the next calculation.
if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) {
cpuUsage =
((cumulativeCpuTime.subtract(lastCumulativeCpuTime)).floatValue())
* 100F / ((float) (sampleTime - lastSampleTime));
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
}
return cpuUsage;
}

public void updateElapsedJiffies(BigInteger elapedJiffies, long sampleTime) {
this.cumulativeCpuTime = elapedJiffies.multiply(jiffyLengthInMillis);
this.sampleTime = sampleTime;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("SampleTime " + this.sampleTime);
sb.append(" CummulativeCpuTime " + this.cumulativeCpuTime);
sb.append(" LastSampleTime " + this.lastSampleTime);
sb.append(" LastCummulativeCpuTime " + this.lastCumulativeCpuTime);
sb.append(" CpuUsage " + this.cpuUsage);
sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis);
return sb.toString();
}
}
Expand Up @@ -23,6 +23,7 @@
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.IOException; import java.io.IOException;
import java.math.BigInteger;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
Expand All @@ -41,8 +42,6 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(LinuxResourceCalculatorPlugin.class); LogFactory.getLog(LinuxResourceCalculatorPlugin.class);


public static final int UNAVAILABLE = -1;

/** /**
* proc's meminfo virtual file has keys-values in the format * proc's meminfo virtual file has keys-values in the format
* "key:[ \t]*value[ \t]kB". * "key:[ \t]*value[ \t]kB".
Expand Down Expand Up @@ -74,6 +73,7 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private static final Pattern CPU_TIME_FORMAT = private static final Pattern CPU_TIME_FORMAT =
Pattern.compile("^cpu[ \t]*([0-9]*)" + Pattern.compile("^cpu[ \t]*([0-9]*)" +
"[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*"); "[ \t]*([0-9]*)[ \t]*([0-9]*)[ \t].*");
private CpuTimeTracker cpuTimeTracker;


private String procfsMemFile; private String procfsMemFile;
private String procfsCpuFile; private String procfsCpuFile;
Expand All @@ -87,12 +87,6 @@ public class LinuxResourceCalculatorPlugin extends ResourceCalculatorPlugin {
private long inactiveSize = 0; // inactive cache memory (kB) private long inactiveSize = 0; // inactive cache memory (kB)
private int numProcessors = 0; // number of processors on the system private int numProcessors = 0; // number of processors on the system
private long cpuFrequency = 0L; // CPU frequency on the system (kHz) private long cpuFrequency = 0L; // CPU frequency on the system (kHz)
private long cumulativeCpuTime = 0L; // CPU used time since system is on (ms)
private long lastCumulativeCpuTime = 0L; // CPU used time read last time (ms)
// Unix timestamp while reading the CPU time (ms)
private float cpuUsage = UNAVAILABLE;
private long sampleTime = UNAVAILABLE;
private long lastSampleTime = UNAVAILABLE;


boolean readMemInfoFile = false; boolean readMemInfoFile = false;
boolean readCpuInfoFile = false; boolean readCpuInfoFile = false;
Expand All @@ -106,10 +100,8 @@ long getCurrentTime() {
} }


public LinuxResourceCalculatorPlugin() { public LinuxResourceCalculatorPlugin() {
procfsMemFile = PROCFS_MEMFILE; this(PROCFS_MEMFILE, PROCFS_CPUINFO, PROCFS_STAT,
procfsCpuFile = PROCFS_CPUINFO; ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS);
procfsStatFile = PROCFS_STAT;
jiffyLengthInMillis = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS;
} }


/** /**
Expand All @@ -128,6 +120,7 @@ public LinuxResourceCalculatorPlugin(String procfsMemFile,
this.procfsCpuFile = procfsCpuFile; this.procfsCpuFile = procfsCpuFile;
this.procfsStatFile = procfsStatFile; this.procfsStatFile = procfsStatFile;
this.jiffyLengthInMillis = jiffyLengthInMillis; this.jiffyLengthInMillis = jiffyLengthInMillis;
this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis);
} }


/** /**
Expand Down Expand Up @@ -276,12 +269,13 @@ private void readProcStatFile() {
long uTime = Long.parseLong(mat.group(1)); long uTime = Long.parseLong(mat.group(1));
long nTime = Long.parseLong(mat.group(2)); long nTime = Long.parseLong(mat.group(2));
long sTime = Long.parseLong(mat.group(3)); long sTime = Long.parseLong(mat.group(3));
cumulativeCpuTime = uTime + nTime + sTime; // milliseconds cpuTimeTracker.updateElapsedJiffies(
BigInteger.valueOf(uTime + nTime + sTime),
getCurrentTime());
break; break;
} }
str = in.readLine(); str = in.readLine();
} }
cumulativeCpuTime *= jiffyLengthInMillis;
} catch (IOException io) { } catch (IOException io) {
LOG.warn("Error reading the stream " + io); LOG.warn("Error reading the stream " + io);
} finally { } finally {
Expand Down Expand Up @@ -345,32 +339,18 @@ public long getCpuFrequency() {
@Override @Override
public long getCumulativeCpuTime() { public long getCumulativeCpuTime() {
readProcStatFile(); readProcStatFile();
return cumulativeCpuTime; return cpuTimeTracker.cumulativeCpuTime.longValue();
} }


/** {@inheritDoc} */ /** {@inheritDoc} */
@Override @Override
public float getCpuUsage() { public float getCpuUsage() {
readProcStatFile(); readProcStatFile();
sampleTime = getCurrentTime(); float overallCpuUsage = cpuTimeTracker.getCpuTrackerUsagePercent();
if (lastSampleTime == UNAVAILABLE || if (overallCpuUsage != CpuTimeTracker.UNAVAILABLE) {
lastSampleTime > sampleTime) { overallCpuUsage = overallCpuUsage / getNumProcessors();
// lastSampleTime > sampleTime may happen when the system time is changed
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
return cpuUsage;
}
// When lastSampleTime is sufficiently old, update cpuUsage.
// Also take a sample of the current time and cumulative CPU time for the
// use of the next calculation.
final long MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis;
if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) {
cpuUsage = (float)(cumulativeCpuTime - lastCumulativeCpuTime) * 100F /
((float)(sampleTime - lastSampleTime) * getNumProcessors());
lastSampleTime = sampleTime;
lastCumulativeCpuTime = cumulativeCpuTime;
} }
return cpuUsage; return overallCpuUsage;
} }


/** /**
Expand Down
Expand Up @@ -66,6 +66,8 @@ public class ProcfsBasedProcessTree extends ResourceCalculatorProcessTree {
public static final String PROCFS_CMDLINE_FILE = "cmdline"; public static final String PROCFS_CMDLINE_FILE = "cmdline";
public static final long PAGE_SIZE; public static final long PAGE_SIZE;
public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
private final CpuTimeTracker cpuTimeTracker;
private Clock clock;


enum MemInfo { enum MemInfo {
SIZE("Size"), RSS("Rss"), PSS("Pss"), SHARED_CLEAN("Shared_Clean"), SIZE("Size"), RSS("Rss"), PSS("Pss"), SHARED_CLEAN("Shared_Clean"),
Expand Down Expand Up @@ -144,7 +146,7 @@ public static MemInfo getMemInfoByName(String name) {
new HashMap<String, ProcessInfo>(); new HashMap<String, ProcessInfo>();


public ProcfsBasedProcessTree(String pid) { public ProcfsBasedProcessTree(String pid) {
this(pid, PROCFS); this(pid, PROCFS, new SystemClock());
} }


@Override @Override
Expand All @@ -157,6 +159,10 @@ public void setConf(Configuration conf) {
} }
} }


public ProcfsBasedProcessTree(String pid, String procfsDir) {
this(pid, procfsDir, new SystemClock());
}

/** /**
* Build a new process tree rooted at the pid. * Build a new process tree rooted at the pid.
* *
Expand All @@ -165,11 +171,14 @@ public void setConf(Configuration conf) {
* *
* @param pid root of the process tree * @param pid root of the process tree
* @param procfsDir the root of a proc file system - only used for testing. * @param procfsDir the root of a proc file system - only used for testing.
* @param clock clock for controlling time for testing
*/ */
public ProcfsBasedProcessTree(String pid, String procfsDir) { public ProcfsBasedProcessTree(String pid, String procfsDir, Clock clock) {
super(pid); super(pid);
this.clock = clock;
this.pid = getValidPID(pid); this.pid = getValidPID(pid);
this.procfsDir = procfsDir; this.procfsDir = procfsDir;
this.cpuTimeTracker = new CpuTimeTracker(JIFFY_LENGTH_IN_MILLIS);
} }


/** /**
Expand Down Expand Up @@ -447,6 +456,26 @@ public long getCumulativeCpuTime() {
return cpuTime; return cpuTime;
} }


private BigInteger getTotalProcessJiffies() {
BigInteger totalStime = BigInteger.ZERO;
long totalUtime = 0;
for (ProcessInfo p : processTree.values()) {
if (p != null) {
totalUtime += p.getUtime();
totalStime = totalStime.add(p.getStime());
}
}
return totalStime.add(BigInteger.valueOf(totalUtime));
}

@Override
public float getCpuUsagePercent() {
BigInteger processTotalJiffies = getTotalProcessJiffies();
cpuTimeTracker.updateElapsedJiffies(processTotalJiffies,
clock.getTime());
return cpuTimeTracker.getCpuTrackerUsagePercent();
}

private static String getValidPID(String pid) { private static String getValidPID(String pid) {
if (pid == null) return deadPid; if (pid == null) return deadPid;
Matcher m = numberPattern.matcher(pid); Matcher m = numberPattern.matcher(pid);
Expand Down Expand Up @@ -962,4 +991,48 @@ public String toString() {
return sb.toString(); return sb.toString();
} }
} }

/**
* Test the {@link ProcfsBasedProcessTree}
*
* @param args
*/
public static void main(String[] args) {
if (args.length != 1) {
System.out.println("Provide <pid of process to monitor>");
return;
}

int numprocessors =
ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, null)
.getNumProcessors();
System.out.println("Number of processors " + numprocessors);

System.out.println("Creating ProcfsBasedProcessTree for process " +
args[0]);
ProcfsBasedProcessTree procfsBasedProcessTree = new
ProcfsBasedProcessTree(args[0]);
procfsBasedProcessTree.updateProcessTree();

System.out.println(procfsBasedProcessTree.getProcessTreeDump());
System.out.println("Get cpu usage " + procfsBasedProcessTree
.getCpuUsagePercent());

try {
// Sleep so we can compute the CPU usage
Thread.sleep(500L);
} catch (InterruptedException e) {
// do nothing
}

procfsBasedProcessTree.updateProcessTree();

System.out.println(procfsBasedProcessTree.getProcessTreeDump());
System.out.println("Cpu usage " + procfsBasedProcessTree
.getCpuUsagePercent());
System.out.println("Vmem usage in bytes " + procfsBasedProcessTree
.getCumulativeVmem());
System.out.println("Rss mem usage in bytes " + procfsBasedProcessTree
.getCumulativeRssmem());
}
} }
Expand Up @@ -108,13 +108,23 @@ public long getCumulativeRssmem() {


/** /**
* Get the CPU time in millisecond used by all the processes in the * Get the CPU time in millisecond used by all the processes in the
* process-tree since the process-tree created * process-tree since the process-tree was created
* *
* @return cumulative CPU time in millisecond since the process-tree created * @return cumulative CPU time in millisecond since the process-tree created
* return 0 if it cannot be calculated * return 0 if it cannot be calculated
*/ */
public abstract long getCumulativeCpuTime(); public abstract long getCumulativeCpuTime();


/**
* Get the CPU usage by all the processes in the process-tree based on
* average between samples as a ratio of overall CPU cycles similar to top.
* Thus, if 2 out of 4 cores are used this should return 200.0.
*
* @return percentage CPU usage since the process-tree was created
* return {@link CpuTimeTracker#UNAVAILABLE} if it cannot be calculated
*/
public abstract float getCpuUsagePercent();

/** Verify that the tree process id is same as its process group id. /** Verify that the tree process id is same as its process group id.
* @return true if the process id matches else return false. * @return true if the process id matches else return false.
*/ */
Expand Down
Expand Up @@ -34,7 +34,7 @@ public class WindowsBasedProcessTree extends ResourceCalculatorProcessTree {


static final Log LOG = LogFactory static final Log LOG = LogFactory
.getLog(WindowsBasedProcessTree.class); .getLog(WindowsBasedProcessTree.class);

static class ProcessInfo { static class ProcessInfo {
String pid; // process pid String pid; // process pid
long vmem; // virtual memory long vmem; // virtual memory
Expand Down Expand Up @@ -202,4 +202,9 @@ public long getCumulativeCpuTime() {
return cpuTimeMs; return cpuTimeMs;
} }


@Override
public float getCpuUsagePercent() {
return CpuTimeTracker.UNAVAILABLE;
}

} }
Expand Up @@ -171,8 +171,8 @@ public void parsingProcStatAndCpuFile() throws IOException {
updateStatFile(uTime, nTime, sTime); updateStatFile(uTime, nTime, sTime);
assertEquals(plugin.getCumulativeCpuTime(), assertEquals(plugin.getCumulativeCpuTime(),
FAKE_JIFFY_LENGTH * (uTime + nTime + sTime)); FAKE_JIFFY_LENGTH * (uTime + nTime + sTime));
assertEquals(plugin.getCpuUsage(), (float)(LinuxResourceCalculatorPlugin.UNAVAILABLE),0.0); assertEquals(plugin.getCpuUsage(), (float)(CpuTimeTracker.UNAVAILABLE),0.0);

// Advance the time and sample again to test the CPU usage calculation // Advance the time and sample again to test the CPU usage calculation
uTime += 100L; uTime += 100L;
plugin.advanceTime(200L); plugin.advanceTime(200L);
Expand Down

0 comments on commit 53947f3

Please sign in to comment.