diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 28c96de8291a3..6a83d25a18e44 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -133,6 +133,8 @@ private static void addDeprecatedKeys() { public static final String YARN_PREFIX = "yarn."; + public static final String SCHEDULER_PREFIX = "yarn.scheduler."; + ///////////////////////////// // Resource types configs //////////////////////////// @@ -421,6 +423,22 @@ public static boolean isAclEnabled(Configuration conf) { public static final boolean DEFAULT_OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED = false; + /** Whether to assign containers on an overload node. */ + public static final String NODE_LOAD_BASED_ASSIGN_ENABLE = SCHEDULER_PREFIX + "node-load-based-assign-enabled"; + public static final boolean DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE = false; + + /** The max ratio limit of (used memory / total memory). */ + public static final String NODE_LOAD_MEMORY_LIMIT = SCHEDULER_PREFIX + "node-load-memory-limit"; + public static final float DEFAULT_NODE_LOAD_MEMORY_LIMIT = -1.0f; + + /** The max ratio limit of (used cpu / total cpu). */ + public static final String NODE_LOAD_CPU_LIMIT = SCHEDULER_PREFIX + "node-load-cpu-limit"; + public static final float DEFAULT_NODE_LOAD_CPU_LIMIT = -1.0f; + + /** The max ratio limit of disk io load. */ + public static final String NODE_LOAD_DISK_IO_LIMIT = SCHEDULER_PREFIX + "node-load-disk-io-limit"; + public static final float DEFAULT_NODE_LOAD_DISK_IO_LIMIT = -1.0f; + /** * Maximum number of opportunistic containers to be allocated in * AM heartbeat. @@ -1974,6 +1992,11 @@ public static boolean isAclEnabled(Configuration conf) { @Deprecated public final static int DEFAULT_NM_CONTAINER_MON_INTERVAL_MS = 3000; + /** How long is the NM resource sample period.*/ + public final static String NM_RESOURCE_SAMPLE_PERIOD_MS = + NM_PREFIX + "resource.sample.period-ms"; + public final static long DEFAULT_NM_RESOURCE_SAMPLE_PERIOD_MS = 10000; + /** Class that calculates current resource utilization.*/ public static final String NM_MON_RESOURCE_CALCULATOR = NM_PREFIX + "resource-calculator.class"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java index 5fcc474652fb8..72392a8583492 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/ResourceCalculatorPlugin.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.util; +import org.apache.hadoop.util.NodeResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -129,6 +130,24 @@ public float getCpuUsagePercentage() { return sys.getCpuUsagePercentage(); } + /** + * Obtain the IO usage % of the machine. Return -1 if it is unavailable + * + * @return IO usage in % + */ + public float getIoUsagePercentage(String[] paths) { + return sys.getIoUsagePercentage(paths); + } + + /** + * Obtain the node resource of the machine. Return null if it is unavailable + * + * @return cpu & io & memory usage in % + */ + public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { + return sys.getNodeResourceLastPeriod(localDirs, millis); + } + /** * Obtain the number of VCores used. Return -1 if it is unavailable. * @@ -159,8 +178,8 @@ public long getNetworkBytesWritten() { * * @return total number of bytes read. */ - public long getStorageBytesRead() { - return sys.getStorageBytesRead(); + public long getStorageBytesRead(String[] paths) { + return sys.getStorageBytesRead(paths); } /** @@ -168,8 +187,8 @@ public long getStorageBytesRead() { * * @return total number of bytes written. */ - public long getStorageBytesWritten() { - return sys.getStorageBytesWritten(); + public long getStorageBytesWritten(String[] paths) { + return sys.getStorageBytesWritten(paths); } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java index 440cd0a290294..99dfdd8bedb10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java @@ -59,7 +59,8 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId, NodeHealthStatus nodeHealthStatus, ResourceUtilization containersUtilization, ResourceUtilization nodeUtilization, - List increasedContainers) { + List increasedContainers, + float cpuUsage, float ioUsage, float memUsage) { NodeStatus nodeStatus = Records.newRecord(NodeStatus.class); nodeStatus.setResponseId(responseId); nodeStatus.setNodeId(nodeId); @@ -69,6 +70,9 @@ public static NodeStatus newInstance(NodeId nodeId, int responseId, nodeStatus.setContainersUtilization(containersUtilization); nodeStatus.setNodeUtilization(nodeUtilization); nodeStatus.setIncreasedContainers(increasedContainers); + nodeStatus.setCpuUsage(cpuUsage); + nodeStatus.setIoUsage(ioUsage); + nodeStatus.setMemUsage(memUsage); return nodeStatus; } @@ -132,4 +136,28 @@ public abstract void setIncreasedContainers( @Unstable public abstract void setOpportunisticContainersStatus( OpportunisticContainersStatus opportunisticContainersStatus); + + @Private + @Unstable + public abstract void setCpuUsage(float cpuUsage); + + @Private + @Unstable + public abstract void setIoUsage(float ioUsage); + + @Private + @Unstable + public abstract void setMemUsage(float memUsage); + + @Private + @Unstable + public abstract float getCpuUsage(); + + @Private + @Unstable + public abstract float getIoUsage(); + + @Private + @Unstable + public abstract float getMemUsage(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java index 8aebc6fa913ce..9f3d6ef21ea5f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java @@ -487,4 +487,38 @@ private ContainerProto convertToProtoFormat( Container c) { return ((ContainerPBImpl)c).getProto(); } + + @Override + public synchronized float getCpuUsage() { + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + return p.getCpuUsage(); + } + @Override + public synchronized void setCpuUsage(float cpuUsage) { + maybeInitBuilder(); + builder.setCpuUsage(cpuUsage); + } + + @Override + public synchronized float getIoUsage() { + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + return p.getIoUsage(); + } + @Override + public synchronized void setIoUsage(float ioUsage) { + maybeInitBuilder(); + builder.setIoUsage(ioUsage); + } + + @Override + public synchronized float getMemUsage() { + NodeStatusProtoOrBuilder p = viaProto ? proto : builder; + return p.getMemUsage(); + } + + @Override + public synchronized void setMemUsage(float memUsage) { + maybeInitBuilder(); + builder.setMemUsage(memUsage); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto index ea8df4fb800dc..d745dc2061cb4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto @@ -41,6 +41,9 @@ message NodeStatusProto { optional ResourceUtilizationProto node_utilization = 7; repeated ContainerProto increased_containers = 8; optional OpportunisticContainersStatusProto opportunistic_containers_status = 9; + optional float cpu_usage = 10; + optional float io_usage = 11; + optional float mem_usage = 12; } message OpportunisticContainersStatusProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java index 4dadf9c62e4c5..88a1b4a84589a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java @@ -42,6 +42,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.NodeResource; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.VersionUtil; import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest; @@ -93,6 +94,7 @@ import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher; import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils; import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils; +import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.resource.Resources; @@ -149,6 +151,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private final NodeHealthCheckerService healthChecker; private final NodeManagerMetrics metrics; + private Resource lastCapability; + private Runnable statusUpdaterRunnable; private Thread statusUpdater; private boolean failedToConnect = false; @@ -156,6 +160,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements private boolean registeredWithRM = false; Set pendingContainersToRemove = new HashSet(); + private final ResourceCalculatorPlugin resourceCalculatorPlugin = + ResourceCalculatorPlugin.getResourceCalculatorPlugin(null, null); + private NMNodeLabelsHandler nodeLabelsHandler; private NMNodeAttributesHandler nodeAttributesHandler; private NodeLabelsProvider nodeLabelsProvider; @@ -512,6 +519,27 @@ private List createKeepAliveApplicationList() { @VisibleForTesting protected NodeStatus getNodeStatus(int responseId) throws IOException { + float cpuUsage = 0, ioUsage = 0, memUsage = 0; + Configuration conf = getConfig(); + long millis = conf.getLong( + YarnConfiguration.NM_RESOURCE_SAMPLE_PERIOD_MS, + YarnConfiguration.DEFAULT_NM_RESOURCE_SAMPLE_PERIOD_MS); + + if (resourceCalculatorPlugin != null) { + NodeResource nodeResource = resourceCalculatorPlugin.getNodeResourceLastPeriod( + conf.getTrimmedStrings(YarnConfiguration.NM_LOCAL_DIRS), millis); + if (nodeResource != null) { + cpuUsage = nodeResource.getCpuUsage() / 100F; + ioUsage = nodeResource.getIoUsage(); + memUsage = nodeResource.getMemoryUsage(); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Node " + nodeId + ", cpu usage is " + cpuUsage + + ", disk io usage is " + ioUsage + ", memory usage is " + memUsage); + } + NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus(); nodeHealthStatus.setHealthReport(healthChecker.getHealthReport()); nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy()); @@ -528,7 +556,8 @@ protected NodeStatus getNodeStatus(int responseId) throws IOException { NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId, containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus, - containersUtilization, nodeUtilization, increasedContainers); + containersUtilization, nodeUtilization, increasedContainers, + cpuUsage, ioUsage, memUsage); nodeStatus.setOpportunisticContainersStatus( getOpportunisticContainersStatus()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java index 64d117a5a5f80..08fae3ef208ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/MockResourceCalculatorPlugin.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor; +import org.apache.hadoop.util.NodeResource; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; public class MockResourceCalculatorPlugin extends ResourceCalculatorPlugin { @@ -71,6 +72,16 @@ public float getCpuUsagePercentage() { return 0; } + @Override + public float getIoUsagePercentage(String[] paths) { + return 0; + } + + @Override + public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { + return new NodeResource(0, 0, 0); + } + @Override public float getNumVCoresUsed() { return 0; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java index 767c308aeb656..4bfe300a86814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/util/TestNodeManagerHardwareUtils.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.nodemanager.util; +import org.apache.hadoop.util.NodeResource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin; import org.junit.Assert; @@ -77,6 +78,16 @@ public float getCpuUsagePercentage() { return 0; } + @Override + public float getIoUsagePercentage(String[] paths) { + return 0; + } + + @Override + public NodeResource getNodeResourceLastPeriod(String[] localDirs, long millis) { + return new NodeResource(0, 0, 0); + } + @Override public int getNumCores() { return 4; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java index 5d60b4fbe0678..b7e2aa8e1fc8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.util.resource.Resources; @@ -87,6 +88,12 @@ public interface RMNode { */ public String getHealthReport(); + /** + * the latest node status report received from this node. + * @return the latest node status report received from this node. + */ + public NodeStatus getNodeStatus(); + /** * the time of the latest health report received from this node. * @return the time of the latest health report received from this node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java index b8aaea5de330c..3ec44b74ce032 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java @@ -150,6 +150,8 @@ public class RMNodeImpl implements RMNode, EventHandler { /** Physical resources in the node. */ private volatile Resource physicalResource; + private NodeStatus nodeStatus; + /* Container Queue Information for the node.. Used by Distributed Scheduler */ private OpportunisticContainersStatus opportunisticContainersStatus; @@ -511,6 +513,27 @@ public void setHealthReport(String healthReport) { this.writeLock.unlock(); } } + + @Override + public NodeStatus getNodeStatus() { + this.readLock.lock(); + + try { + return this.nodeStatus; + } finally { + this.readLock.unlock(); + } + } + + public void setNodeStatus(NodeStatus nodeStatus) { + this.writeLock.lock(); + + try { + this.nodeStatus = nodeStatus; + } finally { + this.writeLock.unlock(); + } + } public void setLastHealthReportTime(long lastHealthReportTime) { this.writeLock.lock(); @@ -945,6 +968,8 @@ public NodeState transition(RMNodeImpl rmNode, RMNodeEvent event) { RMNodeStartedEvent startEvent = (RMNodeStartedEvent) event; List containers = null; + rmNode.setNodeStatus(startEvent.getNodeStatus()); + NodeId nodeId = rmNode.nodeId; RMNode previousRMNode = rmNode.context.getInactiveRMNodes().remove(nodeId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java index 5f5fe24d173c5..b42839062b80f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java @@ -47,6 +47,10 @@ public RMNodeStatusEvent(NodeId nodeId, NodeStatus nodeStatus, this.logAggregationReportsForApps = logAggregationReportsForApps; } + public NodeStatus getNodeStatus() { + return this.nodeStatus; + } + public NodeHealthStatus getNodeHealthStatus() { return this.nodeStatus.getNodeHealthStatus(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 8772ddb44e6b3..18135a51d902e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -34,6 +34,7 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,6 +138,11 @@ public abstract class AbstractYarnScheduler protected SchedulerHealth schedulerHealth = new SchedulerHealth(); protected volatile long lastNodeUpdateTime; + protected volatile static boolean nodeLoadBasedAssignEnable; // node load based assign enabled or not + protected volatile static float nodeLoadMemoryLimit; // max memory ratio limit of a node to assign container + protected volatile static float nodeLoadCpuLimit; // max cpu ratio limit of a node to assign container + protected volatile static float nodeLoadDiskIoLimit; // max disk io ratio limit of a node to assign container + // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -226,6 +232,19 @@ public void serviceInit(Configuration conf) throws Exception { conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS, YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS); + nodeLoadBasedAssignEnable = + conf.getBoolean(YarnConfiguration.NODE_LOAD_BASED_ASSIGN_ENABLE, + YarnConfiguration.DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE); + nodeLoadMemoryLimit = + conf.getFloat(YarnConfiguration.NODE_LOAD_MEMORY_LIMIT, + YarnConfiguration.DEFAULT_NODE_LOAD_MEMORY_LIMIT); + nodeLoadCpuLimit = + conf.getFloat(YarnConfiguration.NODE_LOAD_CPU_LIMIT, + YarnConfiguration.DEFAULT_NODE_LOAD_CPU_LIMIT); + nodeLoadDiskIoLimit = + conf.getFloat(YarnConfiguration.NODE_LOAD_DISK_IO_LIMIT, + YarnConfiguration.DEFAULT_NODE_LOAD_DISK_IO_LIMIT); + if (updateInterval > 0) { updateThread = new UpdateThread(); updateThread.setName("SchedulerUpdateThread"); @@ -503,6 +522,43 @@ public void setEntitlement(String queue, QueueEntitlement entitlement) + " does not support this operation"); } + protected static boolean isNodeOverload(NodeStatus nodeStatus, boolean printVerboseLog) { + float cpuUsage = nodeStatus.getCpuUsage(); + float diskIoUsage = nodeStatus.getIoUsage(); + float memoryUsage = nodeStatus.getMemUsage(); + + if (LOG.isDebugEnabled()) { + LOG.debug("Node " + nodeStatus.getNodeId() + ", cpu usage is " + cpuUsage + + ", disk io usage is " + diskIoUsage + ", memory usage is " + memoryUsage); + } + + if (nodeLoadCpuLimit > 0 && cpuUsage > nodeLoadCpuLimit) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip this node " + nodeStatus.getNodeId() + + " because cpu usage:" + cpuUsage + " is over cpu limit " + + nodeLoadCpuLimit); + } + return true; + } + if (nodeLoadDiskIoLimit > 0 && diskIoUsage > nodeLoadDiskIoLimit) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip this node " + nodeStatus.getNodeId() + + " because disk I/Os:" + diskIoUsage + " is over disk I/Os limit " + + nodeLoadCpuLimit); + } + return true; + } + if (nodeLoadMemoryLimit > 0 && memoryUsage > nodeLoadMemoryLimit) { + if (printVerboseLog && LOG.isDebugEnabled()) { + LOG.debug("Skip this node " + nodeStatus.getNodeId() + + " because memory usage:" + memoryUsage + " is over memory limit " + + nodeLoadCpuLimit); + } + return true; + } + return false; + } + private void killOrphanContainerOnNode(RMNode node, NMContainerStatus container) { if (!container.getContainerState().equals(ContainerState.COMPLETE)) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 44e80a6c23487..516ebb638dc60 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -36,6 +36,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; @@ -521,6 +522,13 @@ public static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node, } return true; } + + NodeStatus nodeStatus = node.getRMNode().getNodeStatus(); + if (nodeLoadBasedAssignEnable && nodeStatus != null + && isNodeOverload(nodeStatus, printVerboseLog)) { + return true; + } + return false; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index a865d7543dd3d..de7623cd95f78 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMCriticalThreadUncaughtExceptionHandler; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; @@ -1013,8 +1014,15 @@ protected void nodeUpdate(RMNode nm) { long start = getClock().getTime(); super.nodeUpdate(nm); + NodeStatus nodeStatus = nm.getNodeStatus(); + FSSchedulerNode fsNode = getFSSchedulerNode(nm.getNodeID()); - attemptScheduling(fsNode); + if (nodeLoadBasedAssignEnable && nodeStatus != null + && isNodeOverload(nodeStatus, true)) { + // not schedule this node + } else { + attemptScheduling(fsNode); + } long duration = getClock().getTime() - start; fsOpDurations.addNodeUpdateDuration(duration); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java index 2a74d56d925dc..f0a5f612a40b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java @@ -211,6 +211,22 @@ public class FairSchedulerConfiguration extends Configuration { public static final long DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000; + /** Whether to assign containers on an overload node. */ + public static final String NODE_LOAD_BASED_ASSIGN_ENABLE = CONF_PREFIX + "node-load-based-assign-enabled"; + protected static final boolean DEFAULT_NODE_LOAD_BASED_ASSIGN_ENABLE = false; + + /** The max ratio limit of (used memory / total memory). */ + protected static final String NODE_LOAD_MEMORY_LIMIT = CONF_PREFIX + "node-load-memory-limit"; + protected static final float DEFAULT_NODE_LOAD_MEMORY_LIMIT = -1.0f; + + /** The max ratio limit of (used cpu / total cpu). */ + protected static final String NODE_LOAD_CPU_LIMIT = CONF_PREFIX + "node-load-cpu-limit"; + protected static final float DEFAULT_NODE_LOAD_CPU_LIMIT = -1.0f; + + /** The max ratio limit of disk io load. */ + protected static final String NODE_LOAD_DISK_IO_LIMIT = CONF_PREFIX + "node-load-disk-io-limit"; + protected static final float DEFAULT_NODE_LOAD_DISK_IO_LIMIT = -1.0f; + /** Whether to assign multiple containers in one check-in. */ public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple"; public static final boolean DEFAULT_ASSIGN_MULTIPLE = false; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java index 42b1ec32c099c..185582aaddeec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; @@ -974,7 +975,13 @@ protected synchronized void nodeUpdate(RMNode nm) { LOG.debug("Node heartbeat " + nm.getNodeID() + " available resource = " + node.getUnallocatedResource()); - assignContainers(node); + NodeStatus nodeStatus = nm.getNodeStatus(); + if (nodeLoadBasedAssignEnable && nodeStatus != null + && isNodeOverload(nodeStatus, true)) { + // not schedule this node + } else { + assignContainers(node); + } LOG.debug("Node after allocation " + nm.getNodeID() + " resource = " + node.getUnallocatedResource()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java index c951ba2c641df..89dab877b219f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java @@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse; +import org.apache.hadoop.yarn.server.api.records.NodeStatus; import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo; @@ -265,6 +266,11 @@ public String getHealthReport() { return healthReport; } + @Override + public NodeStatus getNodeStatus() { + return null; + } + @Override public long getLastHealthReportTime() { return lastHealthReportTime;