Skip to content

Commit

Permalink
YARN-1645. ContainerManager implementation to support container resiz…
Browse files Browse the repository at this point in the history
…ing. Contributed by Meng Ding & Wangda Tan
  • Loading branch information
jian-he authored and wangdatan committed Sep 23, 2015
1 parent 83a18ad commit ffd820c
Show file tree
Hide file tree
Showing 10 changed files with 486 additions and 36 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -206,6 +206,9 @@ Release 2.8.0 - UNRELEASED
YARN-1449. AM-NM protocol changes to support container resizing.
(Meng Ding & Wangda Tan via jianhe)

YARN-1645. ContainerManager implementation to support container resizing.
(Meng Ding & Wangda Tan via jianhe)

IMPROVEMENTS

YARN-644. Basic null check is not performed on passed in arguments before
Expand Down
@@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager;

import org.apache.hadoop.yarn.api.records.Container;
import java.util.List;

public class CMgrDecreaseContainersResourceEvent extends ContainerManagerEvent {

private final List<Container> containersToDecrease;

public CMgrDecreaseContainersResourceEvent(List<Container>
containersToDecrease) {
super(ContainerManagerEventType.DECREASE_CONTAINERS_RESOURCE);
this.containersToDecrease = containersToDecrease;
}

public List<Container> getContainersToDecrease() {
return this.containersToDecrease;
}
}
Expand Up @@ -21,4 +21,5 @@
public enum ContainerManagerEventType {
FINISH_APPS,
FINISH_CONTAINERS,
DECREASE_CONTAINERS_RESOURCE
}
Expand Up @@ -74,6 +74,7 @@
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SerializedException;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
Expand All @@ -95,6 +96,7 @@
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
import org.apache.hadoop.yarn.server.nodemanager.Context;
Expand All @@ -113,6 +115,7 @@
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ChangeContainerResourceEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
Expand Down Expand Up @@ -141,6 +144,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.ByteString;
import org.apache.hadoop.yarn.util.resource.Resources;

public class ContainerManagerImpl extends CompositeService implements
ServiceStateChangeListener, ContainerManagementProtocol,
Expand Down Expand Up @@ -681,33 +685,45 @@ protected void authorizeUser(UserGroupInformation remoteUgi,

/**
* @param containerTokenIdentifier
* of the container to be started
* of the container whose resource is to be started or increased
* @throws YarnException
*/
@Private
@VisibleForTesting
protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
protected void authorizeStartAndResourceIncreaseRequest(
NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
boolean startRequest)
throws YarnException {
if (nmTokenIdentifier == null) {
throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
}
if (containerTokenIdentifier == null) {
throw RPCUtil.getRemoteException(INVALID_CONTAINERTOKEN_MSG);
}
/*
* Check the following:
* 1. The request comes from the same application attempt
* 2. The request possess a container token that has not expired
* 3. The request possess a container token that is granted by a known RM
*/
ContainerId containerId = containerTokenIdentifier.getContainerID();
String containerIDStr = containerId.toString();
boolean unauthorized = false;
StringBuilder messageBuilder =
new StringBuilder("Unauthorized request to start container. ");
new StringBuilder("Unauthorized request to " + (startRequest ?
"start container." : "increase container resource."));
if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().
equals(containerId.getApplicationAttemptId().getApplicationId())) {
unauthorized = true;
messageBuilder.append("\nNMToken for application attempt : ")
.append(nmTokenIdentifier.getApplicationAttemptId())
.append(" was used for starting container with container token")
.append(" was used for "
+ (startRequest ? "starting " : "increasing resource of ")
+ "container with container token")
.append(" issued for application attempt : ")
.append(containerId.getApplicationAttemptId());
} else if (!this.context.getContainerTokenSecretManager()
} else if (startRequest && !this.context.getContainerTokenSecretManager()
.isValidStartContainerRequest(containerTokenIdentifier)) {
// Is the container being relaunched? Or RPC layer let startCall with
// tokens generated off old-secret through?
Expand All @@ -729,6 +745,14 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
LOG.error(msg);
throw RPCUtil.getRemoteException(msg);
}
if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
.getRMIdentifier()) {
// Is the container coming from unknown RM
StringBuilder sb = new StringBuilder("\nContainer ");
sb.append(containerTokenIdentifier.getContainerID().toString())
.append(" rejected as it is allocated by a previous RM");
throw new InvalidContainerException(sb.toString());
}
}

/**
Expand All @@ -745,7 +769,7 @@ protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi,nmTokenIdentifier);
authorizeUser(remoteUgi, nmTokenIdentifier);
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
Expand Down Expand Up @@ -844,16 +868,8 @@ private void startContainerInternal(NMTokenIdentifier nmTokenIdentifier,
* belongs to correct Node Manager (part of retrieve password). c) It has
* correct RMIdentifier. d) It is not expired.
*/
authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);

if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
.getRMIdentifier()) {
// Is the container coming from unknown RM
StringBuilder sb = new StringBuilder("\nContainer ");
sb.append(containerTokenIdentifier.getContainerID().toString())
.append(" rejected as it is allocated by a previous RM");
throw new InvalidContainerException(sb.toString());
}
authorizeStartAndResourceIncreaseRequest(
nmTokenIdentifier, containerTokenIdentifier, true);
// update NMToken
updateNMTokenIdentifier(nmTokenIdentifier);

Expand Down Expand Up @@ -960,9 +976,118 @@ protected ContainerTokenIdentifier verifyAndGetContainerTokenIdentifier(
@Override
public IncreaseContainersResourceResponse increaseContainersResource(
IncreaseContainersResourceRequest requests)
throws YarnException, IOException {
// To be implemented in YARN-1645
return null;
throws YarnException, IOException {
if (blockNewContainerRequests.get()) {
throw new NMNotYetReadyException(
"Rejecting container resource increase as NodeManager has not"
+ " yet connected with ResourceManager");
}
UserGroupInformation remoteUgi = getRemoteUgi();
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
authorizeUser(remoteUgi, nmTokenIdentifier);
List<ContainerId> successfullyIncreasedContainers
= new ArrayList<ContainerId>();
Map<ContainerId, SerializedException> failedContainers =
new HashMap<ContainerId, SerializedException>();
// Process container resource increase requests
for (org.apache.hadoop.yarn.api.records.Token token :
requests.getContainersToIncrease()) {
ContainerId containerId = null;
try {
if (token.getIdentifier() == null) {
throw new IOException(INVALID_CONTAINERTOKEN_MSG);
}
ContainerTokenIdentifier containerTokenIdentifier =
BuilderUtils.newContainerTokenIdentifier(token);
verifyAndGetContainerTokenIdentifier(token,
containerTokenIdentifier);
authorizeStartAndResourceIncreaseRequest(
nmTokenIdentifier, containerTokenIdentifier, false);
containerId = containerTokenIdentifier.getContainerID();
// Reuse the startContainer logic to update NMToken,
// as container resource increase request will have come with
// an updated NMToken.
updateNMTokenIdentifier(nmTokenIdentifier);
Resource resource = containerTokenIdentifier.getResource();
changeContainerResourceInternal(containerId, resource, true);
successfullyIncreasedContainers.add(containerId);
} catch (YarnException | InvalidToken e) {
failedContainers.put(containerId, SerializedException.newInstance(e));
} catch (IOException e) {
throw RPCUtil.getRemoteException(e);
}
}
return IncreaseContainersResourceResponse.newInstance(
successfullyIncreasedContainers, failedContainers);
}

@SuppressWarnings("unchecked")
private void changeContainerResourceInternal(
ContainerId containerId, Resource targetResource, boolean increase)
throws YarnException, IOException {
Container container = context.getContainers().get(containerId);
// Check container existence
if (container == null) {
if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " was recently stopped on node manager.");
} else {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is not handled by this NodeManager");
}
}
// Check container state
org.apache.hadoop.yarn.server.nodemanager.
containermanager.container.ContainerState currentState =
container.getContainerState();
if (currentState != org.apache.hadoop.yarn.server.
nodemanager.containermanager.container.ContainerState.RUNNING) {
throw RPCUtil.getRemoteException("Container " + containerId.toString()
+ " is in " + currentState.name() + " state."
+ " Resource can only be changed when a container is in"
+ " RUNNING state");
}
// Check validity of the target resource.
Resource currentResource = container.getResource();
if (currentResource.equals(targetResource)) {
LOG.warn("Unable to change resource for container "
+ containerId.toString()
+ ". The target resource "
+ targetResource.toString()
+ " is the same as the current resource");
return;
}
if (increase && !Resources.fitsIn(currentResource, targetResource)) {
throw RPCUtil.getRemoteException("Unable to increase resource for "
+ "container " + containerId.toString()
+ ". The target resource "
+ targetResource.toString()
+ " is smaller than the current resource "
+ currentResource.toString());
}
if (!increase &&
(!Resources.fitsIn(Resources.none(), targetResource)
|| !Resources.fitsIn(targetResource, currentResource))) {
throw RPCUtil.getRemoteException("Unable to decrease resource for "
+ "container " + containerId.toString()
+ ". The target resource "
+ targetResource.toString()
+ " is not smaller than the current resource "
+ currentResource.toString());
}
this.readLock.lock();
try {
if (!serviceStopped) {
dispatcher.getEventHandler().handle(new ChangeContainerResourceEvent(
containerId, targetResource));
} else {
throw new YarnException(
"Unable to change container resource as the NodeManager is "
+ "in the process of shutting down");
}
} finally {
this.readLock.unlock();
}
}

@Private
Expand Down Expand Up @@ -1182,6 +1307,21 @@ public void handle(ContainerManagerEvent event) {
"Container Killed by ResourceManager"));
}
break;
case DECREASE_CONTAINERS_RESOURCE:
CMgrDecreaseContainersResourceEvent containersDecreasedEvent =
(CMgrDecreaseContainersResourceEvent) event;
for (org.apache.hadoop.yarn.api.records.Container container
: containersDecreasedEvent.getContainersToDecrease()) {
try {
changeContainerResourceInternal(container.getId(),
container.getResource(), false);
} catch (YarnException e) {
LOG.error("Unable to decrease container resource", e);
} catch (IOException e) {
LOG.error("Unable to update container resource in store", e);
}
}
break;
default:
throw new YarnRuntimeException(
"Got an unknown ContainerManagerEvent type: " + event.getType());
Expand Down
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.nodemanager.containermanager.container;

import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;

public class ChangeContainerResourceEvent extends ContainerEvent {

private Resource resource;

public ChangeContainerResourceEvent(ContainerId c, Resource resource) {
super(c, ContainerEventType.CHANGE_CONTAINER_RESOURCE);
this.resource = resource;
}

public Resource getResource() {
return this.resource;
}
}
Expand Up @@ -25,6 +25,10 @@ public enum ContainerEventType {
KILL_CONTAINER,
UPDATE_DIAGNOSTICS_MSG,
CONTAINER_DONE,
CHANGE_CONTAINER_RESOURCE,

// Producer: ContainerMonitor
CONTAINER_RESOURCE_CHANGED,

// DownloadManager
CONTAINER_INITED,
Expand Down
Expand Up @@ -191,8 +191,10 @@ public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
}

@Override
protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
protected void authorizeStartAndResourceIncreaseRequest(
NMTokenIdentifier nmTokenIdentifier,
ContainerTokenIdentifier containerTokenIdentifier,
boolean startRequest) throws YarnException {
// do nothing
}

Expand Down

0 comments on commit ffd820c

Please sign in to comment.