Skip to content

Commit

Permalink
YARN-1643. Make ContainersMonitor support changing monitoring size of…
Browse files Browse the repository at this point in the history
… an allocated container. Contributed by Meng Ding and Wangda Tan
  • Loading branch information
jian-he authored and wangdatan committed Sep 23, 2015
1 parent 5f5a968 commit c59ae4e
Show file tree
Hide file tree
Showing 7 changed files with 615 additions and 76 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ Release 2.8.0 - UNRELEASED
YARN-3867. ContainerImpl changes to support container resizing. (Meng Ding
via jianhe)

YARN-1643. Make ContainersMonitor support changing monitoring size of an
allocated container. (Meng Ding and Wangda Tan)

IMPROVEMENTS

YARN-644. Basic null check is not performed on passed in arguments before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,26 @@

package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;

import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.server.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
Expand All @@ -56,16 +56,16 @@ public class ContainersMonitorImpl extends AbstractService implements
private boolean containerMetricsEnabled;
private long containerMetricsPeriodMs;

final List<ContainerId> containersToBeRemoved;
final Map<ContainerId, ProcessTreeInfo> containersToBeAdded;
Map<ContainerId, ProcessTreeInfo> trackingContainers =
new HashMap<ContainerId, ProcessTreeInfo>();
@VisibleForTesting
final Map<ContainerId, ProcessTreeInfo> trackingContainers =
new ConcurrentHashMap<>();

final ContainerExecutor containerExecutor;
private final ContainerExecutor containerExecutor;
private final Dispatcher eventDispatcher;
private final Context context;
private ResourceCalculatorPlugin resourceCalculatorPlugin;
private Configuration conf;
private static float vmemRatio;
private Class<? extends ResourceCalculatorProcessTree> processTreeClass;

private long maxVmemAllottedForContainers = UNKNOWN_MEMORY_LIMIT;
Expand All @@ -82,6 +82,8 @@ public class ContainersMonitorImpl extends AbstractService implements

private ResourceUtilization containersUtilization;

private volatile boolean stopped = false;

public ContainersMonitorImpl(ContainerExecutor exec,
AsyncDispatcher dispatcher, Context context) {
super("containers-monitor");
Expand All @@ -90,8 +92,6 @@ public ContainersMonitorImpl(ContainerExecutor exec,
this.eventDispatcher = dispatcher;
this.context = context;

this.containersToBeAdded = new HashMap<ContainerId, ProcessTreeInfo>();
this.containersToBeRemoved = new ArrayList<ContainerId>();
this.monitoringThread = new MonitoringThread();

this.containersUtilization = ResourceUtilization.newInstance(0, 0, 0.0f);
Expand Down Expand Up @@ -140,7 +140,7 @@ protected void serviceInit(Configuration conf) throws Exception {
this.maxVCoresAllottedForContainers = configuredVCoresForContainers;

// ///////// Virtual memory configuration //////
float vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
vmemRatio = conf.getFloat(YarnConfiguration.NM_VMEM_PMEM_RATIO,
YarnConfiguration.DEFAULT_NM_VMEM_PMEM_RATIO);
Preconditions.checkArgument(vmemRatio > 0.99f,
YarnConfiguration.NM_VMEM_PMEM_RATIO + " should be at least 1.0");
Expand Down Expand Up @@ -218,6 +218,7 @@ protected void serviceStart() throws Exception {
@Override
protected void serviceStop() throws Exception {
if (containersMonitorEnabled) {
stopped = true;
this.monitoringThread.interrupt();
try {
this.monitoringThread.join();
Expand All @@ -228,7 +229,8 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}

private static class ProcessTreeInfo {
@VisibleForTesting
static class ProcessTreeInfo {
private ContainerId containerId;
private String pid;
private ResourceCalculatorProcessTree pTree;
Expand Down Expand Up @@ -267,26 +269,43 @@ public void setProcessTree(ResourceCalculatorProcessTree pTree) {
this.pTree = pTree;
}

public long getVmemLimit() {
/**
* @return Virtual memory limit for the process tree in bytes
*/
public synchronized long getVmemLimit() {
return this.vmemLimit;
}

/**
* @return Physical memory limit for the process tree in bytes
*/
public long getPmemLimit() {
public synchronized long getPmemLimit() {
return this.pmemLimit;
}

/**
* Return the number of cpu vcores assigned
* @return
* @return Number of cpu vcores assigned
*/
public int getCpuVcores() {
public synchronized int getCpuVcores() {
return this.cpuVcores;
}
}

/**
* Set resource limit for enforcement
* @param pmemLimit
* Physical memory limit for the process tree in bytes
* @param vmemLimit
* Virtual memory limit for the process tree in bytes
* @param cpuVcores
* Number of cpu vcores assigned
*/
public synchronized void setResourceLimit(
long pmemLimit, long vmemLimit, int cpuVcores) {
this.pmemLimit = pmemLimit;
this.vmemLimit = vmemLimit;
this.cpuVcores = cpuVcores;
}
}

/**
* Check whether a container's process tree's current memory usage is over
Expand Down Expand Up @@ -359,8 +378,7 @@ public MonitoringThread() {
@Override
public void run() {

while (true) {

while (!stopped && !Thread.currentThread().isInterrupted()) {
// Print the processTrees for debugging.
if (LOG.isDebugEnabled()) {
StringBuilder tmp = new StringBuilder("[ ");
Expand All @@ -372,31 +390,6 @@ public void run() {
+ tmp.substring(0, tmp.length()) + "]");
}

// Add new containers
synchronized (containersToBeAdded) {
for (Entry<ContainerId, ProcessTreeInfo> entry : containersToBeAdded
.entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo processTreeInfo = entry.getValue();
LOG.info("Starting resource-monitoring for " + containerId);
trackingContainers.put(containerId, processTreeInfo);
}
containersToBeAdded.clear();
}

// Remove finished containers
synchronized (containersToBeRemoved) {
for (ContainerId containerId : containersToBeRemoved) {
if (containerMetricsEnabled) {
ContainerMetrics.forContainer(
containerId, containerMetricsPeriodMs).finished();
}
trackingContainers.remove(containerId);
LOG.info("Stopping resource-monitoring for " + containerId);
}
containersToBeRemoved.clear();
}

// Temporary structure to calculate the total resource utilization of
// the containers
ResourceUtilization trackedContainersUtilization =
Expand All @@ -408,10 +401,8 @@ public void run() {
long pmemByAllContainers = 0;
long cpuUsagePercentPerCoreByAllContainers = 0;
long cpuUsageTotalCoresByAllContainers = 0;
for (Iterator<Map.Entry<ContainerId, ProcessTreeInfo>> it =
trackingContainers.entrySet().iterator(); it.hasNext();) {

Map.Entry<ContainerId, ProcessTreeInfo> entry = it.next();
for (Entry<ContainerId, ProcessTreeInfo> entry : trackingContainers
.entrySet()) {
ContainerId containerId = entry.getKey();
ProcessTreeInfo ptInfo = entry.getValue();
try {
Expand All @@ -435,11 +426,6 @@ public void run() {
if (containerMetricsEnabled) {
ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs);
int cpuVcores = ptInfo.getCpuVcores();
final int vmemLimit = (int) (ptInfo.getVmemLimit() >> 20);
final int pmemLimit = (int) (ptInfo.getPmemLimit() >> 20);
usageMetrics.recordResourceLimit(
vmemLimit, pmemLimit, cpuVcores);
usageMetrics.recordProcessId(pId);
}
}
Expand Down Expand Up @@ -548,7 +534,7 @@ && isProcessTreeOverLimit(containerId.toString(),
eventDispatcher.getEventHandler().handle(
new ContainerKillEvent(containerId,
containerExitStatus, msg));
it.remove();
trackingContainers.remove(containerId);
LOG.info("Removed ProcessTree with root " + pId);
}
} catch (Exception e) {
Expand Down Expand Up @@ -605,6 +591,60 @@ private String formatUsageString(long currentVmemUsage, long vmemLimit,
}
}

private void changeContainerResource(
ContainerId containerId, Resource resource) {
Container container = context.getContainers().get(containerId);
// Check container existence
if (container == null) {
LOG.warn("Container " + containerId.toString() + "does not exist");
return;
}
container.setResource(resource);
}

private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
if (!containerMetricsEnabled || monitoringEvent == null) {
return;
}

ContainerId containerId = monitoringEvent.getContainerId();
ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs);

int vmemLimitMBs;
int pmemLimitMBs;
int cpuVcores;
switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER:
ContainerStartMonitoringEvent startEvent =
(ContainerStartMonitoringEvent) monitoringEvent;
usageMetrics.recordStateChangeDurations(
startEvent.getLaunchDuration(),
startEvent.getLocalizationDuration());
cpuVcores = startEvent.getCpuVcores();
vmemLimitMBs = (int) (startEvent.getVmemLimit() >> 20);
pmemLimitMBs = (int) (startEvent.getPmemLimit() >> 20);
usageMetrics.recordResourceLimit(
vmemLimitMBs, pmemLimitMBs, cpuVcores);
break;
case STOP_MONITORING_CONTAINER:
usageMetrics.finished();
break;
case CHANGE_MONITORING_CONTAINER_RESOURCE:
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
Resource resource = changeEvent.getResource();
pmemLimitMBs = resource.getMemory();
vmemLimitMBs = (int) (pmemLimitMBs * vmemRatio);
cpuVcores = resource.getVirtualCores();
usageMetrics.recordResourceLimit(
vmemLimitMBs, pmemLimitMBs, cpuVcores);
break;
default:
break;
}
}

@Override
public long getVmemAllocatedForContainers() {
return this.maxVmemAllottedForContainers;
Expand Down Expand Up @@ -650,38 +690,53 @@ public void setContainersUtilization(ResourceUtilization utilization) {
}

@Override
@SuppressWarnings("unchecked")
public void handle(ContainersMonitorEvent monitoringEvent) {

ContainerId containerId = monitoringEvent.getContainerId();
if (!containersMonitorEnabled) {
if (monitoringEvent.getType() == ContainersMonitorEventType
.CHANGE_MONITORING_CONTAINER_RESOURCE) {
// Nothing to enforce. Update container resource immediately.
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
changeContainerResource(containerId, changeEvent.getResource());
}
return;
}

ContainerId containerId = monitoringEvent.getContainerId();
switch (monitoringEvent.getType()) {
case START_MONITORING_CONTAINER:
ContainerStartMonitoringEvent startEvent =
(ContainerStartMonitoringEvent) monitoringEvent;

if (containerMetricsEnabled) {
ContainerMetrics usageMetrics = ContainerMetrics
.forContainer(containerId, containerMetricsPeriodMs);
usageMetrics.recordStateChangeDurations(
startEvent.getLaunchDuration(),
startEvent.getLocalizationDuration());
}

synchronized (this.containersToBeAdded) {
ProcessTreeInfo processTreeInfo =
new ProcessTreeInfo(containerId, null, null,
startEvent.getVmemLimit(), startEvent.getPmemLimit(),
startEvent.getCpuVcores());
this.containersToBeAdded.put(containerId, processTreeInfo);
}
LOG.info("Starting resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.put(containerId,
new ProcessTreeInfo(containerId, null, null,
startEvent.getVmemLimit(), startEvent.getPmemLimit(),
startEvent.getCpuVcores()));
break;
case STOP_MONITORING_CONTAINER:
synchronized (this.containersToBeRemoved) {
this.containersToBeRemoved.add(containerId);
LOG.info("Stopping resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
trackingContainers.remove(containerId);
break;
case CHANGE_MONITORING_CONTAINER_RESOURCE:
ChangeMonitoringContainerResourceEvent changeEvent =
(ChangeMonitoringContainerResourceEvent) monitoringEvent;
ProcessTreeInfo processTreeInfo = trackingContainers.get(containerId);
if (processTreeInfo == null) {
LOG.warn("Failed to track container "
+ containerId.toString()
+ ". It may have already completed.");
break;
}
LOG.info("Changing resource-monitoring for " + containerId);
updateContainerMetrics(monitoringEvent);
long pmemLimit = changeEvent.getResource().getMemory() * 1024L * 1024L;
long vmemLimit = (long) (pmemLimit * vmemRatio);
int cpuVcores = changeEvent.getResource().getVirtualCores();
processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
changeContainerResource(containerId, changeEvent.getResource());
break;
default:
// TODO: Wrong event.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,17 @@ public void testIncreaseContainerResourceWithInvalidResource() throws Exception
super.testIncreaseContainerResourceWithInvalidResource();
}

@Override
public void testChangeContainerResource() throws Exception {
// Don't run the test if the binary is not available.
if (!shouldRunTest()) {
LOG.info("LCE binary path is not passed. Not running the test");
return;
}
LOG.info("Running testChangeContainerResource");
super.testChangeContainerResource();
}

private boolean shouldRunTest() {
return System
.getProperty(YarnConfiguration.NM_LINUX_CONTAINER_EXECUTOR_PATH) != null;
Expand Down

0 comments on commit c59ae4e

Please sign in to comment.