Skip to content

Commit

Permalink
YARN-7688. Miscellaneous Improvements To ProcfsBasedProcessTree. Cont…
Browse files Browse the repository at this point in the history
…ributed by BELUGA BEHR.
  • Loading branch information
szegedim committed Jan 3, 2018
1 parent 4ad39ec commit 626b510
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 63 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -20,21 +20,30 @@


import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.InputStreamReader;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.math.BigInteger; import java.math.BigInteger;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.ArrayDeque;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Queue;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;


import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.io.filefilter.AndFileFilter;
import org.apache.commons.io.filefilter.DirectoryFileFilter;
import org.apache.commons.io.filefilter.RegexFileFilter;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -85,8 +94,9 @@ private MemInfo(String name) {
} }


public static MemInfo getMemInfoByName(String name) { public static MemInfo getMemInfoByName(String name) {
String searchName = StringUtils.trimToNull(name);
for (MemInfo info : MemInfo.values()) { for (MemInfo info : MemInfo.values()) {
if (info.name.trim().equalsIgnoreCase(name.trim())) { if (info.name.trim().equalsIgnoreCase(searchName)) {
return info; return info;
} }
} }
Expand Down Expand Up @@ -170,7 +180,7 @@ public static boolean isAvailable() {
return false; return false;
} }
} catch (SecurityException se) { } catch (SecurityException se) {
LOG.warn("Failed to get Operating System name. " + se); LOG.warn("Failed to get Operating System name.", se);
return false; return false;
} }
return true; return true;
Expand Down Expand Up @@ -214,12 +224,12 @@ public void updateProcessTree() {
// Add each process to its parent. // Add each process to its parent.
for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) { for (Map.Entry<String, ProcessInfo> entry : allProcessInfo.entrySet()) {
String pID = entry.getKey(); String pID = entry.getKey();
if (!pID.equals("1")) { if (!"1".equals(pID)) {
ProcessInfo pInfo = entry.getValue(); ProcessInfo pInfo = entry.getValue();
String ppid = pInfo.getPpid(); String ppid = pInfo.getPpid();
// If parent is init and process is not session leader, // If parent is init and process is not session leader,
// attach to sessionID // attach to sessionID
if (ppid.equals("1")) { if ("1".equals(ppid)) {
String sid = pInfo.getSessionId().toString(); String sid = pInfo.getSessionId().toString();
if (!pID.equals(sid)) { if (!pID.equals(sid)) {
ppid = sid; ppid = sid;
Expand All @@ -233,8 +243,8 @@ public void updateProcessTree() {
} }


// now start constructing the process-tree // now start constructing the process-tree
LinkedList<ProcessInfo> pInfoQueue = new LinkedList<ProcessInfo>(); List<ProcessInfo> children = me.getChildren();
pInfoQueue.addAll(me.getChildren()); Queue<ProcessInfo> pInfoQueue = new ArrayDeque<ProcessInfo>(children);
while (!pInfoQueue.isEmpty()) { while (!pInfoQueue.isEmpty()) {
ProcessInfo pInfo = pInfoQueue.remove(); ProcessInfo pInfo = pInfoQueue.remove();
if (!processTree.containsKey(pInfo.getPid())) { if (!processTree.containsKey(pInfo.getPid())) {
Expand All @@ -254,12 +264,10 @@ public void updateProcessTree() {
} }
} }


if (LOG.isDebugEnabled()) { LOG.debug(this);
// Log.debug the ProcfsBasedProcessTree
LOG.debug(this.toString());
}
if (smapsEnabled) { if (smapsEnabled) {
//Update smaps info // Update smaps info
processSMAPTree.clear(); processSMAPTree.clear();
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if (p != null) { if (p != null) {
Expand Down Expand Up @@ -296,9 +304,7 @@ public static boolean checkPidPgrpidForMatch(String _pid, String procfs) {
"\t|- %s %s %d %d %s %d %d %d %d %s%n"; "\t|- %s %s %d %d %s %d %d %d %d %s%n";


public List<String> getCurrentProcessIDs() { public List<String> getCurrentProcessIDs() {
List<String> currentPIDs = new ArrayList<String>(); return Collections.unmodifiableList(new ArrayList<>(processTree.keySet()));
currentPIDs.addAll(processTree.keySet());
return currentPIDs;
} }


/** /**
Expand Down Expand Up @@ -327,18 +333,17 @@ public String getProcessTreeDump() {


@Override @Override
public long getVirtualMemorySize(int olderThanAge) { public long getVirtualMemorySize(int olderThanAge) {
long total = UNAVAILABLE; long total = 0L;
boolean isAvailable = false;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if (p != null) { if (p != null) {
if (total == UNAVAILABLE ) { isAvailable = true;
total = 0;
}
if (p.getAge() > olderThanAge) { if (p.getAge() > olderThanAge) {
total += p.getVmem(); total += p.getVmem();
} }
} }
} }
return total; return isAvailable ? total : UNAVAILABLE;
} }


@Override @Override
Expand All @@ -352,11 +357,11 @@ public long getRssMemorySize(int olderThanAge) {
boolean isAvailable = false; boolean isAvailable = false;
long totalPages = 0; long totalPages = 0;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if ((p != null) ) { if (p != null) {
isAvailable = true;
if (p.getAge() > olderThanAge) { if (p.getAge() > olderThanAge) {
totalPages += p.getRssmemPage(); totalPages += p.getRssmemPage();
} }
isAvailable = true;
} }
} }
return isAvailable ? totalPages * PAGE_SIZE : UNAVAILABLE; // convert # pages to byte return isAvailable ? totalPages * PAGE_SIZE : UNAVAILABLE; // convert # pages to byte
Expand Down Expand Up @@ -405,9 +410,7 @@ private long getSmapBasedRssMemorySize(int olderThanAge) {
} }
} }
} }
if (LOG.isDebugEnabled()) { LOG.debug(procMemInfo);
LOG.debug(procMemInfo.toString());
}
} }
} }
} }
Expand All @@ -427,9 +430,9 @@ public long getCumulativeCpuTime() {
boolean isAvailable = false; boolean isAvailable = false;
for (ProcessInfo p : processTree.values()) { for (ProcessInfo p : processTree.values()) {
if (p != null) { if (p != null) {
incJiffies += p.getDtime();
// data is available // data is available
isAvailable = true; isAvailable = true;
incJiffies += p.getDtime();
} }
} }
if (isAvailable) { if (isAvailable) {
Expand Down Expand Up @@ -481,21 +484,17 @@ private static String getValidPID(String pid) {
* Get the list of all processes in the system. * Get the list of all processes in the system.
*/ */
private List<String> getProcessList() { private List<String> getProcessList() {
List<String> processList = new ArrayList<String>(); List<String> processList = Collections.emptyList();
String[] processDirs = (new File(procfsDir)).list(); FileFilter procListFileFilter = new AndFileFilter(
if (processDirs != null) { DirectoryFileFilter.INSTANCE, new RegexFileFilter(numberPattern));
for (String dir : processDirs) {
Matcher m = numberPattern.matcher(dir); File dir = new File(procfsDir);
if (!m.matches()) { File[] processDirs = dir.listFiles(procListFileFilter);
continue;
} if (ArrayUtils.isNotEmpty(processDirs)) {
try { processList = new ArrayList<String>(processDirs.length);
if ((new File(procfsDir, dir)).isDirectory()) { for (File processDir : processDirs) {
processList.add(dir); processList.add(processDir.getName());
}
} catch (SecurityException s) {
// skip this process
}
} }
} }
return processList; return processList;
Expand Down Expand Up @@ -547,7 +546,7 @@ private static ProcessInfo constructProcessInfo(ProcessInfo pinfo,
ret = null; ret = null;
} }
} catch (IOException io) { } catch (IOException io) {
LOG.warn("Error reading the stream " + io); LOG.warn("Error reading the stream", io);
ret = null; ret = null;
} finally { } finally {
// Close the streams // Close the streams
Expand All @@ -556,10 +555,10 @@ private static ProcessInfo constructProcessInfo(ProcessInfo pinfo,
try { try {
in.close(); in.close();
} catch (IOException i) { } catch (IOException i) {
LOG.warn("Error closing the stream " + in); LOG.warn("Error closing the stream", i);
} }
} catch (IOException i) { } catch (IOException i) {
LOG.warn("Error closing the stream " + fReader); LOG.warn("Error closing the stream", i);
} }
} }


Expand Down Expand Up @@ -729,14 +728,14 @@ public String getCmdLine(String procfsDir) {
ret = "N/A"; ret = "N/A";
} else { } else {
ret = ret.replace('\0', ' '); // Replace each null char with a space ret = ret.replace('\0', ' '); // Replace each null char with a space
if (ret.equals("")) { if (ret.isEmpty()) {
// The cmdline might be empty because the process is swapped out or // The cmdline might be empty because the process is swapped out or
// is a zombie. // is a zombie.
ret = "N/A"; ret = "N/A";
} }
} }
} catch (IOException io) { } catch (IOException io) {
LOG.warn("Error reading the stream " + io); LOG.warn("Error reading the stream", io);
ret = "N/A"; ret = "N/A";
} finally { } finally {
// Close the streams // Close the streams
Expand All @@ -745,10 +744,10 @@ public String getCmdLine(String procfsDir) {
try { try {
in.close(); in.close();
} catch (IOException i) { } catch (IOException i) {
LOG.warn("Error closing the stream " + in); LOG.warn("Error closing the stream", i);
} }
} catch (IOException i) { } catch (IOException i) {
LOG.warn("Error closing the stream " + fReader); LOG.warn("Error closing the stream", i);
} }
} }


Expand Down Expand Up @@ -805,11 +804,11 @@ private static void constructProcessSMAPInfo(ProcessTreeSmapMemInfo pInfo,
} }
} }
} catch (FileNotFoundException f) { } catch (FileNotFoundException f) {
LOG.error(f.getMessage()); LOG.error(f);
} catch (IOException e) { } catch (IOException e) {
LOG.error(e.getMessage()); LOG.error(e);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error(t.getMessage()); LOG.error(t);
} finally { } finally {
IOUtils.closeQuietly(in); IOUtils.closeQuietly(in);
} }
Expand Down Expand Up @@ -839,7 +838,7 @@ public String toString() {
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
for (ProcessSmapMemoryInfo info : memoryInfoList) { for (ProcessSmapMemoryInfo info : memoryInfoList) {
sb.append("\n"); sb.append("\n");
sb.append(info.toString()); sb.append(info);
} }
return sb.toString(); return sb.toString();
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ public void setup() throws IOException {
} }


@Test(timeout = 30000) @Test(timeout = 30000)
@SuppressWarnings("deprecation")
public void testProcessTree() throws Exception { public void testProcessTree() throws Exception {
try { try {
Assert.assertTrue(ProcfsBasedProcessTree.isAvailable()); Assert.assertTrue(ProcfsBasedProcessTree.isAvailable());
Expand Down Expand Up @@ -163,7 +162,7 @@ public void testProcessTree() throws Exception {
LOG.info("Root process pid: " + pid); LOG.info("Root process pid: " + pid);
ProcfsBasedProcessTree p = createProcessTree(pid); ProcfsBasedProcessTree p = createProcessTree(pid);
p.updateProcessTree(); // initialize p.updateProcessTree(); // initialize
LOG.info("ProcessTree: " + p.toString()); LOG.info("ProcessTree: " + p);


File leaf = new File(lowestDescendant); File leaf = new File(lowestDescendant);
// wait till lowest descendant process of Rougue Task starts execution // wait till lowest descendant process of Rougue Task starts execution
Expand All @@ -176,7 +175,7 @@ public void testProcessTree() throws Exception {
} }


p.updateProcessTree(); // reconstruct p.updateProcessTree(); // reconstruct
LOG.info("ProcessTree: " + p.toString()); LOG.info("ProcessTree: " + p);


// Verify the orphaned pid is In process tree // Verify the orphaned pid is In process tree
String lostpid = getPidFromPidFile(lostDescendant); String lostpid = getPidFromPidFile(lostDescendant);
Expand Down Expand Up @@ -395,7 +394,6 @@ public void createMemoryMappingInfo(ProcessTreeSmapMemInfo[] procMemInfo) {
* files. * files.
*/ */
@Test(timeout = 30000) @Test(timeout = 30000)
@SuppressWarnings("deprecation")
public void testCpuAndMemoryForProcessTree() throws IOException { public void testCpuAndMemoryForProcessTree() throws IOException {


// test processes // test processes
Expand Down Expand Up @@ -908,13 +906,8 @@ public static void setupPidDirs(File procfsRootDir, String[] pids)
throws IOException { throws IOException {
for (String pid : pids) { for (String pid : pids) {
File pidDir = new File(procfsRootDir, pid); File pidDir = new File(procfsRootDir, pid);
pidDir.mkdir(); FileUtils.forceMkdir(pidDir);
if (!pidDir.exists()) { LOG.info("created pid dir: " + pidDir);
throw new IOException("couldn't make process directory under "
+ "fake procfs");
} else {
LOG.info("created pid dir");
}
} }
} }


Expand Down

0 comments on commit 626b510

Please sign in to comment.