From 6dd3fee876707c2272d02aba0ff4f3f31be98d96 Mon Sep 17 00:00:00 2001 From: zhangyizhong Date: Wed, 23 Mar 2022 13:34:51 +0800 Subject: [PATCH 1/5] YARN_478 --- .../main/java/org/apache/hadoop/fs/DF.java | 3 + .../org/apache/hadoop/util/IoTimeTracker.java | 109 ++++++++++++++ .../org/apache/hadoop/util/NodeResource.java | 38 +++++ .../java/org/apache/hadoop/util/SysInfo.java | 20 ++- .../org/apache/hadoop/util/SysInfoLinux.java | 135 +++++++++++++++--- .../apache/hadoop/util/SysInfoWindows.java | 16 ++- .../DummyResourceCalculatorPlugin.java | 24 +++- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 5 + .../yarn/sls/scheduler/RMNodeWrapper.java | 6 + .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../yarn/util/ResourceCalculatorPlugin.java | 27 +++- .../yarn/server/api/records/NodeStatus.java | 30 +++- .../api/records/impl/pb/NodeStatusPBImpl.java | 34 +++++ .../proto/yarn_server_common_protos.proto | 3 + .../nodemanager/NodeStatusUpdaterImpl.java | 31 +++- .../monitor/MockResourceCalculatorPlugin.java | 11 ++ .../util/TestNodeManagerHardwareUtils.java | 11 ++ .../server/resourcemanager/rmnode/RMNode.java | 7 + .../resourcemanager/rmnode/RMNodeImpl.java | 25 ++++ .../rmnode/RMNodeStatusEvent.java | 4 + .../server/resourcemanager/MockNodes.java | 6 + 21 files changed, 516 insertions(+), 34 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java index da4636b2c0fbe..45f9311fc5a02 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java @@ -39,6 +39,9 @@ @InterfaceStability.Evolving public class DF extends Shell { + /** Default DF refresh interval. */ + public static final long DF_INTERVAL_DEFAULT = 3 * 1000; + private final String dirPath; private final File dirFile; private String filesystem; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java new file mode 100644 index 0000000000000..a063f795e1d8d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.math.BigInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class IoTimeTracker { + public static final int UNAVAILABLE = -1; + final long MINIMUM_UPDATE_INTERVAL; + + // IO used time since system is on (ms) + BigInteger cumulativeIoTime = BigInteger.ZERO; + + // IO used time read last time (ms) + BigInteger lastCumulativeIoTime = BigInteger.ZERO; + + // Unix timestamp while reading the IO time (ms) + long sampleTime; + long lastSampleTime; + float ioUsage; + BigInteger jiffyLengthInMillis; + + private ReadWriteLock rwLock = new ReentrantReadWriteLock(); + private Lock readLock = rwLock.readLock(); + public Lock writeLock = rwLock.writeLock(); + + public IoTimeTracker(long jiffyLengthInMillis) { + this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis); + this.ioUsage = UNAVAILABLE; + this.sampleTime = UNAVAILABLE; + this.lastSampleTime = UNAVAILABLE; + MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis; + } + + /** + * Return percentage of io time spent over the time since last update. + * IO time spent is based on elapsed jiffies multiplied by amount of + * time for 1 disk. Thus, if you use 2 disks completely you would have spent + * twice the actual time between updates and this will return 200%. + * + * @return Return percentage of io usage since last update, {@link + * IoTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than + * {@link IoTimeTracker#MINIMUM_UPDATE_INTERVAL} apart + */ + public float getIoTrackerUsagePercent() { + readLock.lock(); + try { + if (lastSampleTime == UNAVAILABLE || lastSampleTime > sampleTime) { + // lastSampleTime > sampleTime may happen when the system time is changed + lastSampleTime = sampleTime; + lastCumulativeIoTime = cumulativeIoTime; + return ioUsage; + } + // When lastSampleTime is sufficiently old, update ioUsage. + // Also take a sample of the current time and cumulative IO time for the + // use of the next calculation. + if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) { + ioUsage = ((cumulativeIoTime.subtract(lastCumulativeIoTime)).floatValue()) + / ((float) (sampleTime - lastSampleTime)); + lastSampleTime = sampleTime; + lastCumulativeIoTime = cumulativeIoTime; + } + } finally { + readLock.unlock(); + } + return ioUsage; + } + + public void updateElapsedJiffies(BigInteger totalIoPerDisk, long sampleTime) { + this.cumulativeIoTime = this.cumulativeIoTime.add(totalIoPerDisk); + this.sampleTime = sampleTime; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("SampleTime " + this.sampleTime); + sb.append(" CummulativeIoTime " + this.cumulativeIoTime); + sb.append(" LastSampleTime " + this.lastSampleTime); + sb.append(" LastCummulativeIoTime " + this.lastCumulativeIoTime); + sb.append(" IoUsage " + this.ioUsage); + sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis); + return sb.toString(); + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java new file mode 100644 index 0000000000000..65655bb266817 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java @@ -0,0 +1,38 @@ +package org.apache.hadoop.util; + +public class NodeResource { + + float cpuUsage; + float ioUsage; + float memoryUsage; + + public NodeResource(float cpuUsage, float ioUsage, float memoryUsage) { + this.cpuUsage = cpuUsage; + this.ioUsage = ioUsage; + this.memoryUsage = memoryUsage; + } + + public float getCpuUsage() { + return this.cpuUsage; + } + + public float getIoUsage() { + return this.ioUsage; + } + + public float getMemoryUsage() { + return this.memoryUsage; + } + + public void setCpuUsage(float cpuUsage) { + this.cpuUsage = cpuUsage; + } + + public void setIoUsage(float ioUsage) { + this.ioUsage = ioUsage; + } + + public void setMemoryUsage(float memoryUsage) { + this.memoryUsage = memoryUsage; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java index e8a571489e9ec..2db0f825d157f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java @@ -20,6 +20,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; /** * Plugin to calculate resource information on the system. @@ -108,6 +109,21 @@ public static SysInfo newInstance() { */ public abstract float getCpuUsagePercentage(); + /** + * Obtain the IO usage % of the machine. Return -1 if it is unavailable + * + * @return IO usage in % + */ + public abstract float getIoUsagePercentage(String[] paths); + + /** + * Obtain the node resource of the machine. Return null if it is unavailable + * + * @return cpu & io & memory usage in % + */ + public abstract NodeResource getNodeResourceLastPeriod( + String[] localDirs, long millis); + /** * Obtain the number of VCores used. Return -1 if it is unavailable * @@ -132,13 +148,13 @@ public static SysInfo newInstance() { * * @return total number of bytes read. */ - public abstract long getStorageBytesRead(); + public abstract long getStorageBytesRead(String[] paths); /** * Obtain the aggregated number of bytes written to disks. * * @return total number of bytes written. */ - public abstract long getStorageBytesWritten(); + public abstract long getStorageBytesWritten(String[] paths); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java index 38777d8f66465..a9dea8a6e287c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java @@ -19,14 +19,23 @@ package org.apache.hadoop.util; import java.io.BufferedReader; +import java.io.File; +import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.IOException; import java.math.BigInteger; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; import java.util.HashMap; import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -34,10 +43,14 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.DF; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.hadoop.fs.DF.DF_INTERVAL_DEFAULT; + /** * Plugin to calculate resource information on Linux systems. */ @@ -111,6 +124,8 @@ public class SysInfoLinux extends SysInfo { "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)"); + private IoTimeTracker ioTimeTracker; + /** * Pattern for parsing /sys/block/partition_name/queue/hw_sector_size. */ @@ -124,6 +139,10 @@ public class SysInfoLinux extends SysInfo { private String procfsDisksFile; private long jiffyLengthInMillis; + // cache mount path and disk mapping relation + private String[] lastPaths; + private List mountList; + private long ramSize = 0; private long swapSize = 0; private long ramSizeFree = 0; // free ram space on the machine (kB) @@ -139,6 +158,8 @@ public class SysInfoLinux extends SysInfo { private int numProcessors = 0; /* number of physical cores on the system. */ private int numCores = 0; + /* number of disks on the system. */ + private int numDisks = 0; private long cpuFrequency = 0L; // CPU frequency on the system (kHz) private long numNetBytesRead = 0L; // aggregated bytes read from network private long numNetBytesWritten = 0L; // aggregated bytes written to network @@ -155,6 +176,9 @@ public class SysInfoLinux extends SysInfo { public static final long JIFFY_LENGTH_IN_MILLIS = Math.max(Math.round(1000D / getConf("CLK_TCK")), -1); + /* map for node load sampling */ + Map nodeResourceSampleMap = new ConcurrentHashMap<>(); + private static long getConf(String attr) { if(Shell.LINUX) { try { @@ -207,6 +231,7 @@ public SysInfoLinux(String procfsMemFile, this.jiffyLengthInMillis = jiffyLengthInMillis; this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis); this.perDiskSectorSize = new HashMap(); + this.ioTimeTracker = new IoTimeTracker(jiffyLengthInMillis); } /** @@ -480,28 +505,39 @@ private void readProcNetInfoFile() { * Read /proc/diskstats file, parse and calculate amount * of bytes read and written from/to disks. */ - private void readProcDisksInfoFile() { + private void readProcDisksInfoFile(String[] paths) { numDisksBytesRead = 0L; numDisksBytesWritten = 0L; // Read "/proc/diskstats" file - BufferedReader in; - try { - in = new BufferedReader(new InputStreamReader( - Files.newInputStream(Paths.get(procfsDisksFile)), - Charset.forName("UTF-8"))); - } catch (IOException f) { - return; - } - Matcher mat; - try { + try (BufferedReader in = new BufferedReader(new InputStreamReader( + new FileInputStream(procfsDisksFile), StandardCharsets.UTF_8))){ + numDisks = 0; + ioTimeTracker.cumulativeIoTime = BigInteger.ZERO; + if (paths != null && !Arrays.equals(lastPaths, paths) || mountList.size() != paths.length) { + mountList = new ArrayList<>(paths.length); + for (String path : paths) { + try { + DF df = new DF(new File(path), DF_INTERVAL_DEFAULT); + mountList.add(df.getFilesystem()); + } catch (Throwable t) { + // path may not exist for unhealthy disks etc. + if (LOG.isDebugEnabled()) { + LOG.debug("Read disk path " + path + " error.", t); + } + } + } + lastPaths = paths; + } + String str = in.readLine(); + long totalIoPerDisk = 0; while (str != null) { mat = PROCFS_DISKSFILE_FORMAT.matcher(str); if (mat.find()) { - String diskName = mat.group(4); + String diskName = mat.group(3); assert diskName != null; // ignore loop or ram partitions if (diskName.contains("loop") || diskName.contains("ram")) { @@ -509,6 +545,9 @@ private void readProcDisksInfoFile() { continue; } + numDisks++; + totalIoPerDisk += Long.parseLong(mat.group(13)); + Integer sectorSize; synchronized (perDiskSectorSize) { sectorSize = perDiskSectorSize.get(diskName); @@ -530,15 +569,12 @@ private void readProcDisksInfoFile() { } str = in.readLine(); } + + ioTimeTracker.updateElapsedJiffies( + BigInteger.valueOf(totalIoPerDisk), + getCurrentTime()); } catch (IOException e) { LOG.warn("Error reading the stream " + procfsDisksFile, e); - } finally { - // Close the streams - try { - in.close(); - } catch (IOException e) { - LOG.warn("Error closing the stream " + procfsDisksFile, e); - } } } @@ -660,6 +696,59 @@ public float getCpuUsagePercentage() { return overallCpuUsage; } + + /** {@inheritDoc} */ + @Override + public float getIoUsagePercentage(String[] paths) { + readProcDisksInfoFile(paths); + float overallIoUsage = ioTimeTracker.getIoTrackerUsagePercent(); + if (overallIoUsage != IoTimeTracker.UNAVAILABLE && numDisks != 0) { + overallIoUsage = overallIoUsage / numDisks; + } + return overallIoUsage; + } + + /** {@inheritDoc} */ + @Override + public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { + + // trigger collect and obtain the latest NodeResource + NodeResource nowNodeResource = new NodeResource( + getCpuUsagePercentage(), + getIoUsagePercentage(localDirs), + 1 - (float) getAvailablePhysicalMemorySize() / getPhysicalMemorySize()); + + int nums = 0; + NodeResource totalResource = new NodeResource(0F, 0F, 0F); + Iterator> it = nodeResourceSampleMap.entrySet().iterator(); + while (it.hasNext()) { + Map.Entry entry = it.next(); + long delta = System.currentTimeMillis() - millis; + NodeResource oldNodeResource = entry.getValue(); + if (delta <= entry.getKey()) { + nums++; + totalResource.setCpuUsage(totalResource.cpuUsage + oldNodeResource.cpuUsage); + totalResource.setIoUsage(totalResource.ioUsage + oldNodeResource.ioUsage); + totalResource.setMemoryUsage(totalResource.memoryUsage + oldNodeResource.memoryUsage); + } else { + // remove outdated records + it.remove(); + } + } + + if (nums > 1) { + totalResource.setCpuUsage(totalResource.cpuUsage / nums); + totalResource.setIoUsage(totalResource.ioUsage / nums); + totalResource.setMemoryUsage(totalResource.memoryUsage / nums); + } + + if (nowNodeResource.cpuUsage >= 0 && nowNodeResource.ioUsage >= 0 + && nowNodeResource.memoryUsage >= 0) { + nodeResourceSampleMap.put(ioTimeTracker.sampleTime, nowNodeResource); + } + return totalResource; + } + /** {@inheritDoc} */ @Override public float getNumVCoresUsed() { @@ -686,14 +775,14 @@ public long getNetworkBytesWritten() { } @Override - public long getStorageBytesRead() { - readProcDisksInfoFile(); + public long getStorageBytesRead(String[] paths) { + readProcDisksInfoFile(paths); return numDisksBytesRead; } @Override - public long getStorageBytesWritten() { - readProcDisksInfoFile(); + public long getStorageBytesWritten(String[] paths) { + readProcDisksInfoFile(paths); return numDisksBytesWritten; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java index 91ebc4c50bb19..9ef7cdc8152fb 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java @@ -206,6 +206,20 @@ public synchronized float getCpuUsagePercentage() { return ret; } + /** {@inheritDoc} */ + @Override + public float getIoUsagePercentage(String[] paths) { + return 0f; + } + + /** {@inheritDoc} */ + @Override + public NodeResource getNodeResourceLastPeriod( + String[] localDirs, long millis) { + // TODO support for windows + return null; + } + /** {@inheritDoc} */ @Override public synchronized float getNumVCoresUsed() { @@ -232,7 +246,7 @@ public long getNetworkBytesWritten() { } @Override - public long getStorageBytesRead() { + public long getStorageBytesRead(String[] paths) { refreshIfNeeded(); return storageBytesRead; } diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java index 528202fd7f402..797a183814fd7 100644 --- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred.gridmix; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.util.NodeResource; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; /** @@ -52,6 +53,10 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin { "mapred.tasktracker.cumulativecputime.testing"; /** CPU usage percentage for testing */ public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing"; + /** Memory usage percentage for testing */ + public static final String MEMORY_USAGE = "mapred.tasktracker.memoryusage.testing"; + /** Disk IO usage percentage for testing */ + public static final String IO_USAGE = "mapred.tasktracker.iousage.testing"; /** cumulative number of bytes read over the network */ public static final String NETWORK_BYTES_READ = "mapred.tasktracker.networkread.testing"; @@ -128,6 +133,21 @@ public float getCpuUsagePercentage() { return getConf().getFloat(CPU_USAGE, -1); } + /** {@inheritDoc} */ + @Override + public float getIoUsagePercentage(String[] paths) { + return getConf().getFloat(IO_USAGE, -1); + } + + /** {@inheritDoc} */ + @Override + public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { + return new NodeResource(getConf().getFloat(CPU_USAGE, -1), + getConf().getFloat(MEMORY_USAGE, -1), + getConf().getFloat(IO_USAGE, -1)); + } + + /** {@inheritDoc} */ @Override public long getNetworkBytesRead() { @@ -142,13 +162,13 @@ public long getNetworkBytesWritten() { /** {@inheritDoc} */ @Override - public long getStorageBytesRead() { + public long getStorageBytesRead(String[] paths) { return getConf().getLong(STORAGE_BYTES_READ, -1); } /** {@inheritDoc} */ @Override - public long getStorageBytesWritten() { + public long getStorageBytesWritten(String[] paths) { return getConf().getLong(STORAGE_BYTES_WRITTEN, -1); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 32567db666ef3..9fb98083ada09 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -116,6 +117,10 @@ public String getHealthReport() { return healthReport; } + public NodeStatus getNodeStatus() { + return null; + } + public long getLastHealthReportTime() { return 0; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 26d35ac897235..4a6047e0f6715 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -89,6 +90,11 @@ public String getHealthReport() { return node.getHealthReport(); } + @Override + public NodeStatus getNodeStatus() { + return node.getNodeStatus(); + } + @Override public long getLastHealthReportTime() { return node.getLastHealthReportTime(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 28c96de8291a3..750b52269dcc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1974,6 +1974,11 @@ public static boolean isAclEnabled(Configuration conf) { @Deprecated public final static int DEFAULT_NM_CONTAINER_MON_INTERVAL_MS = 3000; + /** How long is the NM resource sample period.*/ + public final static String NM_RESOURCE_SAMPLE_PERIOD_MS = + NM_PREFIX + "resource.sample.period-ms"; + public final static long DEFAULT_NM_RESOURCE_SAMPLE_PERIOD_MS = 10000; + /** Class that calculates current resource utilization.*/ public static final String NM_MON_RESOURCE_CALCULATOR = NM_PREFIX + "resource-calculator.class"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java index 5fcc474652fb8..72392a8583492 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.util; +import org.apache.hadoop.util.NodeResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -129,6 +130,24 @@ public float getCpuUsagePercentage() { return sys.getCpuUsagePercentage(); } + /** + * Obtain the IO usage % of the machine. Return -1 if it is unavailable + * + * @return IO usage in % + */ + public float getIoUsagePercentage(String[] paths) { + return sys.getIoUsagePercentage(paths); + } + + /** + * Obtain the node resource of the machine. Return null if it is unavailable + * + * @return cpu & io & memory usage in % + */ + public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { + return sys.getNodeResourceLastPeriod(localDirs, millis); + } + /** * Obtain the number of VCores used. Return -1 if it is unavailable. * @@ -159,8 +178,8 @@ public long getNetworkBytesWritten() { * * @return total number of bytes read. */ - public long getStorageBytesRead() { - return sys.getStorageBytesRead(); + public long getStorageBytesRead(String[] paths) { + return sys.getStorageBytesRead(paths); } /** @@ -168,8 +187,8 @@ public long getStorageBytesRead() { * * @return total number of bytes written. */ - public long getStorageBytesWritten() { - return sys.getStorageBytesWritten(); + public long getStorageBytesWritten(String[] paths) { + return sys.getStorageBytesWritten(paths); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 440cd0a290294..99dfdd8bedb10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -59,7 +59,8 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId, NodeHealthStatus nodeHealthStatus, ResourceUtilization containersUtilization, ResourceUtilization nodeUtilization, - List increasedContainers) { + List increasedContainers, + float cpuUsage, float ioUsage, float memUsage) { NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); nodeStatus.setResponseId(responseId); nodeStatus.setNodeId(nodeId); @@ -69,6 +70,9 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId, nodeStatus.setContainersUtilization(containersUtilization); nodeStatus.setNodeUtilization(nodeUtilization); nodeStatus.setIncreasedContainers(increasedContainers); + nodeStatus.setCpuUsage(cpuUsage); + nodeStatus.setIoUsage(ioUsage); + nodeStatus.setMemUsage(memUsage); return nodeStatus; } @@ -132,4 +136,28 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setOpportunisticContainersStatus( OpportunisticContainersStatus opportunisticContainersStatus); + + @Private + @Unstable + public abstract void setCpuUsage(float cpuUsage); + + @Private + @Unstable + public abstract void setIoUsage(float ioUsage); + + @Private + @Unstable + public abstract void setMemUsage(float memUsage); + + @Private + @Unstable + public abstract float getCpuUsage(); + + @Private + @Unstable + public abstract float getIoUsage(); + + @Private + @Unstable + public abstract float getMemUsage(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8aebc6fa913ce..9f3d6ef21ea5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -487,4 +487,38 @@ private ContainerProto convertToProtoFormat( Container c) { return ((ContainerPBImpl)c).getProto(); } + + @Override + public synchronized float getCpuUsage() { + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + return p.getCpuUsage(); + } + @Override + public synchronized void setCpuUsage(float cpuUsage) { + maybeInitBuilder(); + builder.setCpuUsage(cpuUsage); + } + + @Override + public synchronized float getIoUsage() { + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + return p.getIoUsage(); + } + @Override + public synchronized void setIoUsage(float ioUsage) { + maybeInitBuilder(); + builder.setIoUsage(ioUsage); + } + + @Override + public synchronized float getMemUsage() { + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + return p.getMemUsage(); + } + + @Override + public synchronized void setMemUsage(float memUsage) { + maybeInitBuilder(); + builder.setMemUsage(memUsage); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index ea8df4fb800dc..d745dc2061cb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -41,6 +41,9 @@ message NodeStatusProto { optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; optional OpportunisticContainersStatusProto opportunistic_containers_status = 9; + optional float cpu_usage = 10; + optional float io_usage = 11; + optional float mem_usage = 12; } message OpportunisticContainersStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4dadf9c62e4c5..88a1b4a84589a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.NodeResource; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; @@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.resource.Resources; @@ -149,6 +151,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; + private Resource lastCapability; + private Runnable statusUpdaterRunnable; private Thread statusUpdater; private boolean failedToConnect = false; @@ -156,6 +160,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private boolean registeredWithRM = false; Set pendingContainersToRemove = new HashSet(); + private final ResourceCalculatorPlugin resourceCalculatorPlugin = + ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, null); + private NMNodeLabelsHandler nodeLabelsHandler; private NMNodeAttributesHandler nodeAttributesHandler; private NodeLabelsProvider nodeLabelsProvider; @@ -512,6 +519,27 @@ private List createKeepAliveApplicationList() { @VisibleForTesting protected NodeStatus getNodeStatus(int responseId) throws IOException { + float cpuUsage = 0, ioUsage = 0, memUsage = 0; + Configuration conf = getConfig(); + long millis = conf.getLong( + YarnConfiguration.NM_RESOURCE_SAMPLE_PERIOD_MS, + YarnConfiguration.DEFAULT_NM_RESOURCE_SAMPLE_PERIOD_MS); + + if (resourceCalculatorPlugin != null) { + NodeResource nodeResource = resourceCalculatorPlugin.getNodeResourceLastPeriod( + conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS), millis); + if (nodeResource != null) { + cpuUsage = nodeResource.getCpuUsage() / 100F; + ioUsage = nodeResource.getIoUsage(); + memUsage = nodeResource.getMemoryUsage(); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Node " + nodeId + ", cpu usage is " + cpuUsage + + ", disk io usage is " + ioUsage + ", memory usage is " + memUsage); + } + NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); @@ -528,7 +556,8 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization, increasedContainers); + containersUtilization, nodeUtilization, increasedContainers, + cpuUsage, ioUsage, memUsage); nodeStatus.setOpportunisticContainersStatus( getOpportunisticContainersStatus()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java index 64d117a5a5f80..08fae3ef208ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import org.apache.hadoop.util.NodeResource; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { @@ -71,6 +72,16 @@ public float getCpuUsagePercentage() { return 0; } + @Override + public float getIoUsagePercentage(String[] paths) { + return 0; + } + + @Override + public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { + return new NodeResource(0, 0, 0); + } + @Override public float getNumVCoresUsed() { return 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java index 767c308aeb656..4bfe300a86814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.util; +import org.apache.hadoop.util.NodeResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.junit.Assert; @@ -77,6 +78,16 @@ public float getCpuUsagePercentage() { return 0; } + @Override + public float getIoUsagePercentage(String[] paths) { + return 0; + } + + @Override + public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { + return new NodeResource(0, 0, 0); + } + @Override public int getNumCores() { return 4; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 5d60b4fbe0678..b7e2aa8e1fc8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.util.resource.Resources; @@ -87,6 +88,12 @@ public interface RMNode { */ public String getHealthReport(); + /** + * the latest node status report received from this node. + * @return the latest node status report received from this node. + */ + public NodeStatus getNodeStatus(); + /** * the time of the latest health report received from this node. * @return the time of the latest health report received from this node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b8aaea5de330c..3ec44b74ce032 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -150,6 +150,8 @@ public class RMNodeImpl implements RMNode, EventHandler { /** Physical resources in the node. */ private volatile Resource physicalResource; + private NodeStatus nodeStatus; + /* Container Queue Information for the node.. Used by Distributed Scheduler */ private OpportunisticContainersStatus opportunisticContainersStatus; @@ -511,6 +513,27 @@ public void setHealthReport(String healthReport) { this.writeLock.unlock(); } } + + @Override + public NodeStatus getNodeStatus() { + this.readLock.lock(); + + try { + return this.nodeStatus; + } finally { + this.readLock.unlock(); + } + } + + public void setNodeStatus(NodeStatus nodeStatus) { + this.writeLock.lock(); + + try { + this.nodeStatus = nodeStatus; + } finally { + this.writeLock.unlock(); + } + } public void setLastHealthReportTime(long lastHealthReportTime) { this.writeLock.lock(); @@ -945,6 +968,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List containers = null; + rmNode.setNodeStatus(startEvent.getNodeStatus()); + NodeId nodeId = rmNode.nodeId; RMNode previousRMNode = rmNode.context.getInactiveRMNodes().remove(nodeId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 5f5fe24d173c5..b42839062b80f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -47,6 +47,10 @@ public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, this.logAggregationReportsForApps = logAggregationReportsForApps; } + public NodeStatus getNodeStatus() { + return this.nodeStatus; + } + public NodeHealthStatus getNodeHealthStatus() { return this.nodeStatus.getNodeHealthStatus(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index c951ba2c641df..89dab877b219f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -265,6 +266,11 @@ public String getHealthReport() { return healthReport; } + @Override + public NodeStatus getNodeStatus() { + return null; + } + @Override public long getLastHealthReportTime() { return lastHealthReportTime; From a4a7f40ba52c1c196b62be11cd9b968475661b08 Mon Sep 17 00:00:00 2001 From: zhangyizhong Date: Wed, 23 Mar 2022 15:28:39 +0800 Subject: [PATCH 2/5] revert hadoop common --- .../main/java/org/apache/hadoop/fs/DF.java | 3 - .../org/apache/hadoop/util/IoTimeTracker.java | 109 -------------- .../org/apache/hadoop/util/NodeResource.java | 38 ----- .../java/org/apache/hadoop/util/SysInfo.java | 20 +-- .../org/apache/hadoop/util/SysInfoLinux.java | 135 +++--------------- .../apache/hadoop/util/SysInfoWindows.java | 16 +-- .../DummyResourceCalculatorPlugin.java | 24 +--- .../hadoop/yarn/sls/nodemanager/NodeInfo.java | 5 - .../yarn/sls/scheduler/RMNodeWrapper.java | 6 - 9 files changed, 28 insertions(+), 328 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java index 45f9311fc5a02..da4636b2c0fbe 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DF.java @@ -39,9 +39,6 @@ @InterfaceStability.Evolving public class DF extends Shell { - /** Default DF refresh interval. */ - public static final long DF_INTERVAL_DEFAULT = 3 * 1000; - private final String dirPath; private final File dirFile; private String filesystem; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java index a063f795e1d8d..e69de29bb2d1d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java @@ -1,109 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.util; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -import java.math.BigInteger; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class IoTimeTracker { - public static final int UNAVAILABLE = -1; - final long MINIMUM_UPDATE_INTERVAL; - - // IO used time since system is on (ms) - BigInteger cumulativeIoTime = BigInteger.ZERO; - - // IO used time read last time (ms) - BigInteger lastCumulativeIoTime = BigInteger.ZERO; - - // Unix timestamp while reading the IO time (ms) - long sampleTime; - long lastSampleTime; - float ioUsage; - BigInteger jiffyLengthInMillis; - - private ReadWriteLock rwLock = new ReentrantReadWriteLock(); - private Lock readLock = rwLock.readLock(); - public Lock writeLock = rwLock.writeLock(); - - public IoTimeTracker(long jiffyLengthInMillis) { - this.jiffyLengthInMillis = BigInteger.valueOf(jiffyLengthInMillis); - this.ioUsage = UNAVAILABLE; - this.sampleTime = UNAVAILABLE; - this.lastSampleTime = UNAVAILABLE; - MINIMUM_UPDATE_INTERVAL = 10 * jiffyLengthInMillis; - } - - /** - * Return percentage of io time spent over the time since last update. - * IO time spent is based on elapsed jiffies multiplied by amount of - * time for 1 disk. Thus, if you use 2 disks completely you would have spent - * twice the actual time between updates and this will return 200%. - * - * @return Return percentage of io usage since last update, {@link - * IoTimeTracker#UNAVAILABLE} if there haven't been 2 updates more than - * {@link IoTimeTracker#MINIMUM_UPDATE_INTERVAL} apart - */ - public float getIoTrackerUsagePercent() { - readLock.lock(); - try { - if (lastSampleTime == UNAVAILABLE || lastSampleTime > sampleTime) { - // lastSampleTime > sampleTime may happen when the system time is changed - lastSampleTime = sampleTime; - lastCumulativeIoTime = cumulativeIoTime; - return ioUsage; - } - // When lastSampleTime is sufficiently old, update ioUsage. - // Also take a sample of the current time and cumulative IO time for the - // use of the next calculation. - if (sampleTime > lastSampleTime + MINIMUM_UPDATE_INTERVAL) { - ioUsage = ((cumulativeIoTime.subtract(lastCumulativeIoTime)).floatValue()) - / ((float) (sampleTime - lastSampleTime)); - lastSampleTime = sampleTime; - lastCumulativeIoTime = cumulativeIoTime; - } - } finally { - readLock.unlock(); - } - return ioUsage; - } - - public void updateElapsedJiffies(BigInteger totalIoPerDisk, long sampleTime) { - this.cumulativeIoTime = this.cumulativeIoTime.add(totalIoPerDisk); - this.sampleTime = sampleTime; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("SampleTime " + this.sampleTime); - sb.append(" CummulativeIoTime " + this.cumulativeIoTime); - sb.append(" LastSampleTime " + this.lastSampleTime); - sb.append(" LastCummulativeIoTime " + this.lastCumulativeIoTime); - sb.append(" IoUsage " + this.ioUsage); - sb.append(" JiffyLengthMillisec " + this.jiffyLengthInMillis); - return sb.toString(); - } -} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java index 65655bb266817..e69de29bb2d1d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java @@ -1,38 +0,0 @@ -package org.apache.hadoop.util; - -public class NodeResource { - - float cpuUsage; - float ioUsage; - float memoryUsage; - - public NodeResource(float cpuUsage, float ioUsage, float memoryUsage) { - this.cpuUsage = cpuUsage; - this.ioUsage = ioUsage; - this.memoryUsage = memoryUsage; - } - - public float getCpuUsage() { - return this.cpuUsage; - } - - public float getIoUsage() { - return this.ioUsage; - } - - public float getMemoryUsage() { - return this.memoryUsage; - } - - public void setCpuUsage(float cpuUsage) { - this.cpuUsage = cpuUsage; - } - - public void setIoUsage(float ioUsage) { - this.ioUsage = ioUsage; - } - - public void setMemoryUsage(float memoryUsage) { - this.memoryUsage = memoryUsage; - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java index 2db0f825d157f..e8a571489e9ec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfo.java @@ -20,7 +20,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; /** * Plugin to calculate resource information on the system. @@ -109,21 +108,6 @@ public static SysInfo newInstance() { */ public abstract float getCpuUsagePercentage(); - /** - * Obtain the IO usage % of the machine. Return -1 if it is unavailable - * - * @return IO usage in % - */ - public abstract float getIoUsagePercentage(String[] paths); - - /** - * Obtain the node resource of the machine. Return null if it is unavailable - * - * @return cpu & io & memory usage in % - */ - public abstract NodeResource getNodeResourceLastPeriod( - String[] localDirs, long millis); - /** * Obtain the number of VCores used. Return -1 if it is unavailable * @@ -148,13 +132,13 @@ public abstract NodeResource getNodeResourceLastPeriod( * * @return total number of bytes read. */ - public abstract long getStorageBytesRead(String[] paths); + public abstract long getStorageBytesRead(); /** * Obtain the aggregated number of bytes written to disks. * * @return total number of bytes written. */ - public abstract long getStorageBytesWritten(String[] paths); + public abstract long getStorageBytesWritten(); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java index a9dea8a6e287c..38777d8f66465 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoLinux.java @@ -19,23 +19,14 @@ package org.apache.hadoop.util; import java.io.BufferedReader; -import java.io.File; -import java.io.FileInputStream; import java.io.InputStreamReader; import java.io.IOException; import java.math.BigInteger; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; import java.util.HashMap; import java.util.HashSet; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -43,14 +34,10 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.DF; import org.apache.hadoop.util.Shell.ShellCommandExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.fs.DF.DF_INTERVAL_DEFAULT; - /** * Plugin to calculate resource information on Linux systems. */ @@ -124,8 +111,6 @@ public class SysInfoLinux extends SysInfo { "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)" + "[ \t]*([0-9]+)[ \t]*([0-9]+)[ \t]*([0-9]+)"); - private IoTimeTracker ioTimeTracker; - /** * Pattern for parsing /sys/block/partition_name/queue/hw_sector_size. */ @@ -139,10 +124,6 @@ public class SysInfoLinux extends SysInfo { private String procfsDisksFile; private long jiffyLengthInMillis; - // cache mount path and disk mapping relation - private String[] lastPaths; - private List mountList; - private long ramSize = 0; private long swapSize = 0; private long ramSizeFree = 0; // free ram space on the machine (kB) @@ -158,8 +139,6 @@ public class SysInfoLinux extends SysInfo { private int numProcessors = 0; /* number of physical cores on the system. */ private int numCores = 0; - /* number of disks on the system. */ - private int numDisks = 0; private long cpuFrequency = 0L; // CPU frequency on the system (kHz) private long numNetBytesRead = 0L; // aggregated bytes read from network private long numNetBytesWritten = 0L; // aggregated bytes written to network @@ -176,9 +155,6 @@ public class SysInfoLinux extends SysInfo { public static final long JIFFY_LENGTH_IN_MILLIS = Math.max(Math.round(1000D / getConf("CLK_TCK")), -1); - /* map for node load sampling */ - Map nodeResourceSampleMap = new ConcurrentHashMap<>(); - private static long getConf(String attr) { if(Shell.LINUX) { try { @@ -231,7 +207,6 @@ public SysInfoLinux(String procfsMemFile, this.jiffyLengthInMillis = jiffyLengthInMillis; this.cpuTimeTracker = new CpuTimeTracker(jiffyLengthInMillis); this.perDiskSectorSize = new HashMap(); - this.ioTimeTracker = new IoTimeTracker(jiffyLengthInMillis); } /** @@ -505,39 +480,28 @@ private void readProcNetInfoFile() { * Read /proc/diskstats file, parse and calculate amount * of bytes read and written from/to disks. */ - private void readProcDisksInfoFile(String[] paths) { + private void readProcDisksInfoFile() { numDisksBytesRead = 0L; numDisksBytesWritten = 0L; // Read "/proc/diskstats" file - Matcher mat; - try (BufferedReader in = new BufferedReader(new InputStreamReader( - new FileInputStream(procfsDisksFile), StandardCharsets.UTF_8))){ - numDisks = 0; - ioTimeTracker.cumulativeIoTime = BigInteger.ZERO; - if (paths != null && !Arrays.equals(lastPaths, paths) || mountList.size() != paths.length) { - mountList = new ArrayList<>(paths.length); - for (String path : paths) { - try { - DF df = new DF(new File(path), DF_INTERVAL_DEFAULT); - mountList.add(df.getFilesystem()); - } catch (Throwable t) { - // path may not exist for unhealthy disks etc. - if (LOG.isDebugEnabled()) { - LOG.debug("Read disk path " + path + " error.", t); - } - } - } - lastPaths = paths; - } + BufferedReader in; + try { + in = new BufferedReader(new InputStreamReader( + Files.newInputStream(Paths.get(procfsDisksFile)), + Charset.forName("UTF-8"))); + } catch (IOException f) { + return; + } + Matcher mat; + try { String str = in.readLine(); - long totalIoPerDisk = 0; while (str != null) { mat = PROCFS_DISKSFILE_FORMAT.matcher(str); if (mat.find()) { - String diskName = mat.group(3); + String diskName = mat.group(4); assert diskName != null; // ignore loop or ram partitions if (diskName.contains("loop") || diskName.contains("ram")) { @@ -545,9 +509,6 @@ private void readProcDisksInfoFile(String[] paths) { continue; } - numDisks++; - totalIoPerDisk += Long.parseLong(mat.group(13)); - Integer sectorSize; synchronized (perDiskSectorSize) { sectorSize = perDiskSectorSize.get(diskName); @@ -569,12 +530,15 @@ private void readProcDisksInfoFile(String[] paths) { } str = in.readLine(); } - - ioTimeTracker.updateElapsedJiffies( - BigInteger.valueOf(totalIoPerDisk), - getCurrentTime()); } catch (IOException e) { LOG.warn("Error reading the stream " + procfsDisksFile, e); + } finally { + // Close the streams + try { + in.close(); + } catch (IOException e) { + LOG.warn("Error closing the stream " + procfsDisksFile, e); + } } } @@ -696,59 +660,6 @@ public float getCpuUsagePercentage() { return overallCpuUsage; } - - /** {@inheritDoc} */ - @Override - public float getIoUsagePercentage(String[] paths) { - readProcDisksInfoFile(paths); - float overallIoUsage = ioTimeTracker.getIoTrackerUsagePercent(); - if (overallIoUsage != IoTimeTracker.UNAVAILABLE && numDisks != 0) { - overallIoUsage = overallIoUsage / numDisks; - } - return overallIoUsage; - } - - /** {@inheritDoc} */ - @Override - public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { - - // trigger collect and obtain the latest NodeResource - NodeResource nowNodeResource = new NodeResource( - getCpuUsagePercentage(), - getIoUsagePercentage(localDirs), - 1 - (float) getAvailablePhysicalMemorySize() / getPhysicalMemorySize()); - - int nums = 0; - NodeResource totalResource = new NodeResource(0F, 0F, 0F); - Iterator> it = nodeResourceSampleMap.entrySet().iterator(); - while (it.hasNext()) { - Map.Entry entry = it.next(); - long delta = System.currentTimeMillis() - millis; - NodeResource oldNodeResource = entry.getValue(); - if (delta <= entry.getKey()) { - nums++; - totalResource.setCpuUsage(totalResource.cpuUsage + oldNodeResource.cpuUsage); - totalResource.setIoUsage(totalResource.ioUsage + oldNodeResource.ioUsage); - totalResource.setMemoryUsage(totalResource.memoryUsage + oldNodeResource.memoryUsage); - } else { - // remove outdated records - it.remove(); - } - } - - if (nums > 1) { - totalResource.setCpuUsage(totalResource.cpuUsage / nums); - totalResource.setIoUsage(totalResource.ioUsage / nums); - totalResource.setMemoryUsage(totalResource.memoryUsage / nums); - } - - if (nowNodeResource.cpuUsage >= 0 && nowNodeResource.ioUsage >= 0 - && nowNodeResource.memoryUsage >= 0) { - nodeResourceSampleMap.put(ioTimeTracker.sampleTime, nowNodeResource); - } - return totalResource; - } - /** {@inheritDoc} */ @Override public float getNumVCoresUsed() { @@ -775,14 +686,14 @@ public long getNetworkBytesWritten() { } @Override - public long getStorageBytesRead(String[] paths) { - readProcDisksInfoFile(paths); + public long getStorageBytesRead() { + readProcDisksInfoFile(); return numDisksBytesRead; } @Override - public long getStorageBytesWritten(String[] paths) { - readProcDisksInfoFile(paths); + public long getStorageBytesWritten() { + readProcDisksInfoFile(); return numDisksBytesWritten; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java index 9ef7cdc8152fb..91ebc4c50bb19 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/SysInfoWindows.java @@ -206,20 +206,6 @@ public synchronized float getCpuUsagePercentage() { return ret; } - /** {@inheritDoc} */ - @Override - public float getIoUsagePercentage(String[] paths) { - return 0f; - } - - /** {@inheritDoc} */ - @Override - public NodeResource getNodeResourceLastPeriod( - String[] localDirs, long millis) { - // TODO support for windows - return null; - } - /** {@inheritDoc} */ @Override public synchronized float getNumVCoresUsed() { @@ -246,7 +232,7 @@ public long getNetworkBytesWritten() { } @Override - public long getStorageBytesRead(String[] paths) { + public long getStorageBytesRead() { refreshIfNeeded(); return storageBytesRead; } diff --git a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java index 797a183814fd7..528202fd7f402 100644 --- a/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java +++ b/hadoop-tools/hadoop-gridmix/src/test/java/org/apache/hadoop/mapred/gridmix/DummyResourceCalculatorPlugin.java @@ -19,7 +19,6 @@ package org.apache.hadoop.mapred.gridmix; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.util.NodeResource; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; /** @@ -53,10 +52,6 @@ public class DummyResourceCalculatorPlugin extends ResourceCalculatorPlugin { "mapred.tasktracker.cumulativecputime.testing"; /** CPU usage percentage for testing */ public static final String CPU_USAGE = "mapred.tasktracker.cpuusage.testing"; - /** Memory usage percentage for testing */ - public static final String MEMORY_USAGE = "mapred.tasktracker.memoryusage.testing"; - /** Disk IO usage percentage for testing */ - public static final String IO_USAGE = "mapred.tasktracker.iousage.testing"; /** cumulative number of bytes read over the network */ public static final String NETWORK_BYTES_READ = "mapred.tasktracker.networkread.testing"; @@ -133,21 +128,6 @@ public float getCpuUsagePercentage() { return getConf().getFloat(CPU_USAGE, -1); } - /** {@inheritDoc} */ - @Override - public float getIoUsagePercentage(String[] paths) { - return getConf().getFloat(IO_USAGE, -1); - } - - /** {@inheritDoc} */ - @Override - public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { - return new NodeResource(getConf().getFloat(CPU_USAGE, -1), - getConf().getFloat(MEMORY_USAGE, -1), - getConf().getFloat(IO_USAGE, -1)); - } - - /** {@inheritDoc} */ @Override public long getNetworkBytesRead() { @@ -162,13 +142,13 @@ public long getNetworkBytesWritten() { /** {@inheritDoc} */ @Override - public long getStorageBytesRead(String[] paths) { + public long getStorageBytesRead() { return getConf().getLong(STORAGE_BYTES_READ, -1); } /** {@inheritDoc} */ @Override - public long getStorageBytesWritten(String[] paths) { + public long getStorageBytesWritten() { return getConf().getLong(STORAGE_BYTES_WRITTEN, -1); } } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java index 9fb98083ada09..32567db666ef3 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java @@ -39,7 +39,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -117,10 +116,6 @@ public String getHealthReport() { return healthReport; } - public NodeStatus getNodeStatus() { - return null; - } - public long getLastHealthReportTime() { return 0; } diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java index 4a6047e0f6715..26d35ac897235 100644 --- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java +++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java @@ -30,7 +30,6 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; -import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -90,11 +89,6 @@ public String getHealthReport() { return node.getHealthReport(); } - @Override - public NodeStatus getNodeStatus() { - return node.getNodeStatus(); - } - @Override public long getLastHealthReportTime() { return node.getLastHealthReportTime(); From 35b408c763a661fa1facb6a6b4c70dacea8eae2e Mon Sep 17 00:00:00 2001 From: zhangyizhong Date: Wed, 23 Mar 2022 17:57:30 +0800 Subject: [PATCH 3/5] missed parts --- .../scheduler/fair/FairScheduler.java | 46 ++++++++++++++++++- .../fair/FairSchedulerConfiguration.java | 32 +++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a865d7543dd3d..ebbbaca6b62a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; @@ -201,6 +202,11 @@ public class FairScheduler extends protected boolean assignMultiple; // Allocate multiple containers per // heartbeat + protected volatile boolean nodeLoadBasedAssignEnable; // node load based assign enabled or not + private volatile float nodeLoadMemoryLimit; // max memory ratio limit of a node to assign container + private volatile float nodeLoadCpuLimit; // max cpu ratio limit of a node to assign container + private volatile float nodeLoadDiskIoLimit; // max disk io ratio limit of a node to assign container + @VisibleForTesting boolean maxAssignDynamic; protected int maxAssign; // Max containers to assign per heartbeat @@ -1013,8 +1019,14 @@ protected void nodeUpdate(RMNode nm) { long start = getClock().getTime(); super.nodeUpdate(nm); + NodeStatus nodeStatus = nm.getNodeStatus(); + FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); - attemptScheduling(fsNode); + if (nodeLoadBasedAssignEnable && nodeStatus != null && isNodeOverload(nodeStatus)) { + // not schedule this node + } else { + attemptScheduling(fsNode); + } long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); @@ -1023,6 +1035,34 @@ protected void nodeUpdate(RMNode nm) { } } + private boolean isNodeOverload(NodeStatus nodeStatus) { + float cpuUsage = nodeStatus.getCpuUsage(); + float diskIoUsage = nodeStatus.getIoUsage(); + float memoryUsage = nodeStatus.getMemUsage(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Node " + nodeStatus.getNodeId() + ", cpu usage is " + cpuUsage + + ", disk io usage is " + diskIoUsage + ", memory usage is " + memoryUsage); + } + + if (nodeLoadCpuLimit > 0 && cpuUsage > nodeLoadCpuLimit) { + LOG.warn("Node " + nodeStatus.getNodeId() + " is " + cpuUsage + " over cpu limit " + + nodeLoadCpuLimit + ", skip this heart beat by not scheduling."); + return true; + } + if (nodeLoadDiskIoLimit > 0 && diskIoUsage > nodeLoadDiskIoLimit) { + LOG.warn("Node " + nodeStatus.getNodeId() + " is " + diskIoUsage + " over disk io limit " + + nodeLoadDiskIoLimit + ", skip this heart beat by not scheduling."); + return true; + } + if (nodeLoadMemoryLimit > 0 && memoryUsage > nodeLoadMemoryLimit) { + LOG.warn("Node " + nodeStatus.getNodeId() + " is " + memoryUsage + " over memory limit " + + nodeLoadMemoryLimit + ", skip this heart beat by not scheduling."); + return true; + } + return false; + } + @Deprecated void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); @@ -1438,6 +1478,10 @@ private void initScheduler(Configuration conf) throws IOException { sizeBasedWeight = this.conf.getSizeBasedWeight(); usePortForNodeName = this.conf.getUsePortForNodeName(); reservableNodesRatio = this.conf.getReservableNodes(); + nodeLoadBasedAssignEnable = this.conf.getNodeLoadBasedAssignEnabled(); + nodeLoadMemoryLimit = this.conf.getNodeLoadMemoryLimit(); + nodeLoadCpuLimit = this.conf.getNodeLoadCpuLimit(); + nodeLoadDiskIoLimit = this.conf.getNodeDiskIoLimit(); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 2a74d56d925dc..8f6ab5e2e9d89 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -211,6 +211,22 @@ public class FairSchedulerConfiguration extends Configuration { public static final long DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000; + /** Whether to assign containers on an overload node. */ + public static final String NODE_LOAD_BASED_ASSIGN_ENABLE = CONF_PREFIX + "node-load-based-assign-enabled"; + protected static final boolean DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE = false; + + /** The max ratio limit of (used memory / total memory). */ + protected static final String NODE_LOAD_MEMORY_LIMIT = CONF_PREFIX + "node-load-memory-limit"; + protected static final float DEFAULT_NODE_LOAD_MEMORY_LIMIT = -1.0f; + + /** The max ratio limit of (used cpu / total cpu). */ + protected static final String NODE_LOAD_CPU_LIMIT = CONF_PREFIX + "node-load-cpu-limit"; + protected static final float DEFAULT_NODE_LOAD_CPU_LIMIT = -1.0f; + + /** The max ratio limit of disk io load. */ + protected static final String NODE_LOAD_DISK_IO_LIMIT = CONF_PREFIX + "node-load-disk-io-limit"; + protected static final float DEFAULT_NODE_LOAD_DISK_IO_LIMIT = -1.0f; + /** Whether to assign multiple containers in one check-in. */ public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; public static final boolean DEFAULT_ASSIGN_MULTIPLE = false; @@ -427,6 +443,22 @@ public float getPreemptionUtilizationThreshold() { return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD); } + public boolean getNodeLoadBasedAssignEnabled() { + return getBoolean(NODE_LOAD_BASED_ASSIGN_ENABLE, DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE); + } + + public float getNodeLoadMemoryLimit() { + return getFloat(NODE_LOAD_MEMORY_LIMIT, DEFAULT_NODE_LOAD_MEMORY_LIMIT); + } + + public float getNodeLoadCpuLimit() { + return getFloat(NODE_LOAD_CPU_LIMIT, DEFAULT_NODE_LOAD_CPU_LIMIT); + } + + public float getNodeDiskIoLimit() { + return getFloat(NODE_LOAD_DISK_IO_LIMIT, DEFAULT_NODE_LOAD_DISK_IO_LIMIT); + } + public boolean getAssignMultiple() { return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE); } From bdb66c827d441e75ede5636af1d9a156a0355595 Mon Sep 17 00:00:00 2001 From: Deegue Date: Wed, 23 Mar 2022 23:03:11 +0800 Subject: [PATCH 4/5] remove empty files --- .../src/main/java/org/apache/hadoop/util/IoTimeTracker.java | 0 .../src/main/java/org/apache/hadoop/util/NodeResource.java | 0 2 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IoTimeTracker.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/NodeResource.java deleted file mode 100644 index e69de29bb2d1d..0000000000000 From 3d8ea4f394d3bb937a3f7bfbd6ede9947073a51c Mon Sep 17 00:00:00 2001 From: Deegue Date: Fri, 1 Apr 2022 22:43:00 +0800 Subject: [PATCH 5/5] Common implementation and support both CS and FS --- .../hadoop/yarn/conf/YarnConfiguration.java | 18 ++++++ .../scheduler/AbstractYarnScheduler.java | 56 +++++++++++++++++++ .../scheduler/capacity/CapacityScheduler.java | 8 +++ .../scheduler/fair/FairScheduler.java | 40 +------------ .../fair/FairSchedulerConfiguration.java | 16 ------ .../scheduler/fifo/FifoScheduler.java | 9 ++- 6 files changed, 92 insertions(+), 55 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 750b52269dcc6..6a83d25a18e44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -133,6 +133,8 @@ private static void addDeprecatedKeys() { public static final String YARN_PREFIX = "yarn."; + public static final String SCHEDULER_PREFIX = "yarn.scheduler."; + ///////////////////////////// // Resource types configs //////////////////////////// @@ -421,6 +423,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false; + /** Whether to assign containers on an overload node. */ + public static final String NODE_LOAD_BASED_ASSIGN_ENABLE = SCHEDULER_PREFIX + "node-load-based-assign-enabled"; + public static final boolean DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE = false; + + /** The max ratio limit of (used memory / total memory). */ + public static final String NODE_LOAD_MEMORY_LIMIT = SCHEDULER_PREFIX + "node-load-memory-limit"; + public static final float DEFAULT_NODE_LOAD_MEMORY_LIMIT = -1.0f; + + /** The max ratio limit of (used cpu / total cpu). */ + public static final String NODE_LOAD_CPU_LIMIT = SCHEDULER_PREFIX + "node-load-cpu-limit"; + public static final float DEFAULT_NODE_LOAD_CPU_LIMIT = -1.0f; + + /** The max ratio limit of disk io load. */ + public static final String NODE_LOAD_DISK_IO_LIMIT = SCHEDULER_PREFIX + "node-load-disk-io-limit"; + public static final float DEFAULT_NODE_LOAD_DISK_IO_LIMIT = -1.0f; + /** * Maximum number of opportunistic containers to be allocated in * AM heartbeat. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8772ddb44e6b3..18135a51d902e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -34,6 +34,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +138,11 @@ public abstract class AbstractYarnScheduler protected SchedulerHealth schedulerHealth = new SchedulerHealth(); protected volatile long lastNodeUpdateTime; + protected volatile static boolean nodeLoadBasedAssignEnable; // node load based assign enabled or not + protected volatile static float nodeLoadMemoryLimit; // max memory ratio limit of a node to assign container + protected volatile static float nodeLoadCpuLimit; // max cpu ratio limit of a node to assign container + protected volatile static float nodeLoadDiskIoLimit; // max disk io ratio limit of a node to assign container + // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -226,6 +232,19 @@ public void serviceInit(Configuration conf) throws Exception { conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS); + nodeLoadBasedAssignEnable = + conf.getBoolean(YarnConfiguration.NODE_LOAD_BASED_ASSIGN_ENABLE, + YarnConfiguration.DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE); + nodeLoadMemoryLimit = + conf.getFloat(YarnConfiguration.NODE_LOAD_MEMORY_LIMIT, + YarnConfiguration.DEFAULT_NODE_LOAD_MEMORY_LIMIT); + nodeLoadCpuLimit = + conf.getFloat(YarnConfiguration.NODE_LOAD_CPU_LIMIT, + YarnConfiguration.DEFAULT_NODE_LOAD_CPU_LIMIT); + nodeLoadDiskIoLimit = + conf.getFloat(YarnConfiguration.NODE_LOAD_DISK_IO_LIMIT, + YarnConfiguration.DEFAULT_NODE_LOAD_DISK_IO_LIMIT); + if (updateInterval > 0) { updateThread = new UpdateThread(); updateThread.setName("SchedulerUpdateThread"); @@ -503,6 +522,43 @@ public void setEntitlement(String queue, QueueEntitlement entitlement) + " does not support this operation"); } + protected static boolean isNodeOverload(NodeStatus nodeStatus, boolean printVerboseLog) { + float cpuUsage = nodeStatus.getCpuUsage(); + float diskIoUsage = nodeStatus.getIoUsage(); + float memoryUsage = nodeStatus.getMemUsage(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Node " + nodeStatus.getNodeId() + ", cpu usage is " + cpuUsage + + ", disk io usage is " + diskIoUsage + ", memory usage is " + memoryUsage); + } + + if (nodeLoadCpuLimit > 0 && cpuUsage > nodeLoadCpuLimit) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip this node " + nodeStatus.getNodeId() + + " because cpu usage:" + cpuUsage + " is over cpu limit " + + nodeLoadCpuLimit); + } + return true; + } + if (nodeLoadDiskIoLimit > 0 && diskIoUsage > nodeLoadDiskIoLimit) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip this node " + nodeStatus.getNodeId() + + " because disk I/Os:" + diskIoUsage + " is over disk I/Os limit " + + nodeLoadCpuLimit); + } + return true; + } + if (nodeLoadMemoryLimit > 0 && memoryUsage > nodeLoadMemoryLimit) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip this node " + nodeStatus.getNodeId() + + " because memory usage:" + memoryUsage + " is over memory limit " + + nodeLoadCpuLimit); + } + return true; + } + return false; + } + private void killOrphanContainerOnNode(RMNode node, NMContainerStatus container) { if (!container.getContainerState().equals(ContainerState.COMPLETE)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 44e80a6c23487..516ebb638dc60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; @@ -521,6 +522,13 @@ public static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, } return true; } + + NodeStatus nodeStatus = node.getRMNode().getNodeStatus(); + if (nodeLoadBasedAssignEnable && nodeStatus != null + && isNodeOverload(nodeStatus, printVerboseLog)) { + return true; + } + return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index ebbbaca6b62a6..de7623cd95f78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -202,11 +202,6 @@ public class FairScheduler extends protected boolean assignMultiple; // Allocate multiple containers per // heartbeat - protected volatile boolean nodeLoadBasedAssignEnable; // node load based assign enabled or not - private volatile float nodeLoadMemoryLimit; // max memory ratio limit of a node to assign container - private volatile float nodeLoadCpuLimit; // max cpu ratio limit of a node to assign container - private volatile float nodeLoadDiskIoLimit; // max disk io ratio limit of a node to assign container - @VisibleForTesting boolean maxAssignDynamic; protected int maxAssign; // Max containers to assign per heartbeat @@ -1022,7 +1017,8 @@ protected void nodeUpdate(RMNode nm) { NodeStatus nodeStatus = nm.getNodeStatus(); FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); - if (nodeLoadBasedAssignEnable && nodeStatus != null && isNodeOverload(nodeStatus)) { + if (nodeLoadBasedAssignEnable && nodeStatus != null + && isNodeOverload(nodeStatus, true)) { // not schedule this node } else { attemptScheduling(fsNode); @@ -1035,34 +1031,6 @@ protected void nodeUpdate(RMNode nm) { } } - private boolean isNodeOverload(NodeStatus nodeStatus) { - float cpuUsage = nodeStatus.getCpuUsage(); - float diskIoUsage = nodeStatus.getIoUsage(); - float memoryUsage = nodeStatus.getMemUsage(); - - if (LOG.isDebugEnabled()) { - LOG.debug("Node " + nodeStatus.getNodeId() + ", cpu usage is " + cpuUsage - + ", disk io usage is " + diskIoUsage + ", memory usage is " + memoryUsage); - } - - if (nodeLoadCpuLimit > 0 && cpuUsage > nodeLoadCpuLimit) { - LOG.warn("Node " + nodeStatus.getNodeId() + " is " + cpuUsage + " over cpu limit " - + nodeLoadCpuLimit + ", skip this heart beat by not scheduling."); - return true; - } - if (nodeLoadDiskIoLimit > 0 && diskIoUsage > nodeLoadDiskIoLimit) { - LOG.warn("Node " + nodeStatus.getNodeId() + " is " + diskIoUsage + " over disk io limit " - + nodeLoadDiskIoLimit + ", skip this heart beat by not scheduling."); - return true; - } - if (nodeLoadMemoryLimit > 0 && memoryUsage > nodeLoadMemoryLimit) { - LOG.warn("Node " + nodeStatus.getNodeId() + " is " + memoryUsage + " over memory limit " - + nodeLoadMemoryLimit + ", skip this heart beat by not scheduling."); - return true; - } - return false; - } - @Deprecated void continuousSchedulingAttempt() throws InterruptedException { long start = getClock().getTime(); @@ -1478,10 +1446,6 @@ private void initScheduler(Configuration conf) throws IOException { sizeBasedWeight = this.conf.getSizeBasedWeight(); usePortForNodeName = this.conf.getUsePortForNodeName(); reservableNodesRatio = this.conf.getReservableNodes(); - nodeLoadBasedAssignEnable = this.conf.getNodeLoadBasedAssignEnabled(); - nodeLoadMemoryLimit = this.conf.getNodeLoadMemoryLimit(); - nodeLoadCpuLimit = this.conf.getNodeLoadCpuLimit(); - nodeLoadDiskIoLimit = this.conf.getNodeDiskIoLimit(); updateInterval = this.conf.getUpdateInterval(); if (updateInterval < 0) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 8f6ab5e2e9d89..f0a5f612a40b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -443,22 +443,6 @@ public float getPreemptionUtilizationThreshold() { return getFloat(PREEMPTION_THRESHOLD, DEFAULT_PREEMPTION_THRESHOLD); } - public boolean getNodeLoadBasedAssignEnabled() { - return getBoolean(NODE_LOAD_BASED_ASSIGN_ENABLE, DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE); - } - - public float getNodeLoadMemoryLimit() { - return getFloat(NODE_LOAD_MEMORY_LIMIT, DEFAULT_NODE_LOAD_MEMORY_LIMIT); - } - - public float getNodeLoadCpuLimit() { - return getFloat(NODE_LOAD_CPU_LIMIT, DEFAULT_NODE_LOAD_CPU_LIMIT); - } - - public float getNodeDiskIoLimit() { - return getFloat(NODE_LOAD_DISK_IO_LIMIT, DEFAULT_NODE_LOAD_DISK_IO_LIMIT); - } - public boolean getAssignMultiple() { return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 42b1ec32c099c..185582aaddeec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -974,7 +975,13 @@ protected synchronized void nodeUpdate(RMNode nm) { LOG.debug("Node heartbeat " + nm.getNodeID() + " available resource = " + node.getUnallocatedResource()); - assignContainers(node); + NodeStatus nodeStatus = nm.getNodeStatus(); + if (nodeLoadBasedAssignEnable && nodeStatus != null + && isNodeOverload(nodeStatus, true)) { + // not schedule this node + } else { + assignContainers(node); + } LOG.debug("Node after allocation " + nm.getNodeID() + " resource = " + node.getUnallocatedResource());