Skip to content

Commit

Permalink
YARN-3116. RM notifies NM whether a container is an AM container or n…
Browse files Browse the repository at this point in the history
…ormal task container. Contributed by Giovanni Matteo Fumarola.
  • Loading branch information
zjshen14 committed Jul 11, 2015
1 parent 47f4c54 commit 1ea3629
Show file tree
Hide file tree
Showing 18 changed files with 253 additions and 22 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -134,6 +134,9 @@ Release 2.8.0 - UNRELEASED
YARN-1012. Report NM aggregated container resource utilization in heartbeat.
(Inigo Goiri via kasha)

YARN-3116. RM notifies NM whether a container is an AM container or normal
task container. (Giovanni Matteo Fumarola via zjshen)

IMPROVEMENTS

YARN-644. Basic null check is not performed on passed in arguments before
Expand Down
Expand Up @@ -35,14 +35,23 @@ public class ContainerContext {
private final String user;
private final ContainerId containerId;
private final Resource resource;
private final ContainerType containerType;

@Private
@Unstable
public ContainerContext(String user, ContainerId containerId,
Resource resource) {
this(user, containerId, resource, ContainerType.TASK);
}

@Private
@Unstable
public ContainerContext(String user, ContainerId containerId,
Resource resource, ContainerType containerType) {
this.user = user;
this.containerId = containerId;
this.resource = resource;
this.containerType = containerType;
}

/**
Expand Down Expand Up @@ -72,4 +81,14 @@ public ContainerId getContainerId() {
public Resource getResource() {
return resource;
}

/**
* Get {@link ContainerType} the type of the container
* being initialized or stopped.
*
* @return the type of the container
*/
public ContainerType getContainerType() {
return containerType;
}
}
Expand Up @@ -41,4 +41,11 @@ public ContainerInitializationContext(String user, ContainerId containerId,
super(user, containerId, resource);
}

@Private
@Unstable
public ContainerInitializationContext(String user, ContainerId containerId,
Resource resource, ContainerType containerType) {
super(user, containerId, resource, containerType);
}

}
Expand Up @@ -41,4 +41,11 @@ public ContainerTerminationContext(String user, ContainerId containerId,
super(user, containerId, resource);
}

@Private
@Unstable
public ContainerTerminationContext(String user, ContainerId containerId,
Resource resource, ContainerType containerType) {
super(user, containerId, resource, containerType);
}

}
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.api;

/**
* Container property encoding allocation and execution semantics.
*
* <p>
* The container types are the following:
* <ul>
* <li>{@link #APPLICATION_MASTER}
* <li>{@link #TASK}
* </ul>
* </p>
*/
public enum ContainerType {
APPLICATION_MASTER, TASK
}
Expand Up @@ -264,6 +264,11 @@ message NodeLabelProto {
optional bool isExclusive = 2 [default = true];
}

enum ContainerTypeProto {
APPLICATION_MASTER = 1;
TASK = 2;
}

////////////////////////////////////////////////////////////////////////
////// From AM_RM_Protocol /////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////
Expand Down
Expand Up @@ -53,7 +53,9 @@
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationRequestInterpreterProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationAttemptStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.YarnApplicationStateProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.server.api.ContainerType;

import com.google.protobuf.ByteString;

Expand Down Expand Up @@ -270,4 +272,14 @@ public static LogAggregationStatus convertFromProtoFormat(
return LogAggregationStatus.valueOf(e.name().replace(
LOG_AGGREGATION_STATUS_PREFIX, ""));
}

/*
* ContainerType
*/
public static ContainerTypeProto convertToProtoFormat(ContainerType e) {
return ContainerTypeProto.valueOf(e.name());
}
public static ContainerType convertFromProtoFormat(ContainerTypeProto e) {
return ContainerType.valueOf(e.name());
}
}
Expand Up @@ -39,13 +39,15 @@
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.PriorityPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerTypeProto;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
import org.apache.hadoop.yarn.server.api.ContainerType;

import com.google.protobuf.TextFormat;


/**
* TokenIdentifier for a container. Encodes {@link ContainerId},
* {@link Resource} needed by the container and the target NMs host-address.
Expand All @@ -66,14 +68,24 @@ public ContainerTokenIdentifier(ContainerId containerID,
int masterKeyId, long rmIdentifier, Priority priority, long creationTime) {
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
rmIdentifier, priority, creationTime, null,
CommonNodeLabelsManager.NO_LABEL);
CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK);
}

public ContainerTokenIdentifier(ContainerId containerID, String hostName,
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression) {
ContainerTokenIdentifierProto.Builder builder =
this(containerID, hostName, appSubmitter, r, expiryTimeStamp, masterKeyId,
rmIdentifier, priority, creationTime, logAggregationContext,
nodeLabelExpression, ContainerType.TASK);
}

public ContainerTokenIdentifier(ContainerId containerID, String hostName,
String appSubmitter, Resource r, long expiryTimeStamp, int masterKeyId,
long rmIdentifier, Priority priority, long creationTime,
LogAggregationContext logAggregationContext, String nodeLabelExpression,
ContainerType containerType) {
ContainerTokenIdentifierProto.Builder builder =
ContainerTokenIdentifierProto.newBuilder();
if (containerID != null) {
builder.setContainerId(((ContainerIdPBImpl)containerID).getProto());
Expand All @@ -99,7 +111,8 @@ public ContainerTokenIdentifier(ContainerId containerID, String hostName,
if (nodeLabelExpression != null) {
builder.setNodeLabelExpression(nodeLabelExpression);
}

builder.setContainerType(convertToProtoFormat(containerType));

proto = builder.build();
}

Expand Down Expand Up @@ -156,7 +169,18 @@ public long getCreationTime() {
public long getRMIdentifier() {
return proto.getRmIdentifier();
}


/**
* Get the ContainerType of container to allocate
* @return ContainerType
*/
public ContainerType getContainerType(){
if (!proto.hasContainerType()) {
return null;
}
return convertFromProtoFormat(proto.getContainerType());
}

public ContainerTokenIdentifierProto getProto() {
return proto;
}
Expand Down Expand Up @@ -232,4 +256,13 @@ public boolean equals(Object other) {
public String toString() {
return TextFormat.shortDebugString(getProto());
}

private ContainerTypeProto convertToProtoFormat(ContainerType containerType) {
return ProtoUtils.convertToProtoFormat(containerType);
}

private ContainerType convertFromProtoFormat(
ContainerTypeProto containerType) {
return ProtoUtils.convertFromProtoFormat(containerType);
}
}
Expand Up @@ -50,6 +50,7 @@ message ContainerTokenIdentifierProto {
optional int64 creationTime = 9;
optional LogAggregationContextProto logAggregationContext = 10;
optional string nodeLabelExpression = 11;
optional ContainerTypeProto containerType = 12;
}

message ClientToAMTokenIdentifierProto {
Expand Down
Expand Up @@ -33,10 +33,12 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.YARNDelegationTokenIdentifierProto;
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -201,6 +203,12 @@ public void testContainerTokenIdentifier() throws IOException {
anotherToken.getCreationTime(), creationTime);

Assert.assertNull(anotherToken.getLogAggregationContext());

Assert.assertEquals(CommonNodeLabelsManager.NO_LABEL,
anotherToken.getNodeLabelExpression());

Assert.assertEquals(ContainerType.TASK,
anotherToken.getContainerType());
}

@Test
Expand Down Expand Up @@ -347,4 +355,49 @@ public void testParseTimelineDelegationTokenIdentifierRenewer() throws IOExcepti
Assert.assertEquals(new Text("yarn"), token.getRenewer());
}

@Test
public void testAMContainerTokenIdentifier() throws IOException {
ContainerId containerID = ContainerId.newContainerId(
ApplicationAttemptId.newInstance(ApplicationId.newInstance(
1, 1), 1), 1);
String hostName = "host0";
String appSubmitter = "usr0";
Resource r = Resource.newInstance(1024, 1);
long expiryTimeStamp = 1000;
int masterKeyId = 1;
long rmIdentifier = 1;
Priority priority = Priority.newInstance(1);
long creationTime = 1000;

ContainerTokenIdentifier token =
new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r,
expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.APPLICATION_MASTER);

ContainerTokenIdentifier anotherToken = new ContainerTokenIdentifier();

byte[] tokenContent = token.getBytes();
DataInputBuffer dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
anotherToken.readFields(dib);

Assert.assertEquals(ContainerType.APPLICATION_MASTER,
anotherToken.getContainerType());

token =
new ContainerTokenIdentifier(containerID, hostName, appSubmitter, r,
expiryTimeStamp, masterKeyId, rmIdentifier, priority, creationTime,
null, CommonNodeLabelsManager.NO_LABEL, ContainerType.TASK);

anotherToken = new ContainerTokenIdentifier();

tokenContent = token.getBytes();
dib = new DataInputBuffer();
dib.reset(tokenContent, tokenContent.length);
anotherToken.readFields(dib);

Assert.assertEquals(ContainerType.TASK,
anotherToken.getContainerType());
}

}
Expand Up @@ -225,7 +225,8 @@ public void handle(AuxServicesEvent event) {
try {
serv.initializeContainer(new ContainerInitializationContext(
event.getUser(), event.getContainer().getContainerId(),
event.getContainer().getResource()));
event.getContainer().getResource(), event.getContainer()
.getContainerTokenIdentifier().getContainerType()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.CONTAINER_INIT, th);
Expand All @@ -237,7 +238,8 @@ public void handle(AuxServicesEvent event) {
try {
serv.stopContainer(new ContainerTerminationContext(
event.getUser(), event.getContainer().getContainerId(),
event.getContainer().getResource()));
event.getContainer().getResource(), event.getContainer()
.getContainerTokenIdentifier().getContainerType()));
} catch (Throwable th) {
logWarningWhenAuxServiceThrowExceptions(serv,
AuxServicesEventType.CONTAINER_STOP, th);
Expand Down
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.ContainerType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
Expand All @@ -61,7 +62,6 @@
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultiset;
Expand Down Expand Up @@ -467,13 +467,26 @@ public List<NMToken> getNMTokenList() {
.hasNext();) {
RMContainer rmContainer = i.next();
Container container = rmContainer.getContainer();
ContainerType containerType = ContainerType.TASK;
// The working knowledge is that masterContainer for AM is null as it
// itself is the master container.
RMAppAttempt appAttempt =
rmContext
.getRMApps()
.get(
container.getId().getApplicationAttemptId()
.getApplicationId()).getCurrentAppAttempt();
if (appAttempt.getMasterContainer() == null
&& appAttempt.getSubmissionContext().getUnmanagedAM() == false) {
containerType = ContainerType.APPLICATION_MASTER;
}
try {
// create container token and NMToken altogether.
container.setContainerToken(rmContext.getContainerTokenSecretManager()
.createContainerToken(container.getId(), container.getNodeId(),
getUser(), container.getResource(), container.getPriority(),
rmContainer.getCreationTime(), this.logAggregationContext,
rmContainer.getNodeLabelExpression()));
rmContainer.getNodeLabelExpression(), containerType));
NMToken nmToken =
rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
getApplicationAttemptId(), container);
Expand Down
Expand Up @@ -829,7 +829,8 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
RMAppAttempt rmAppAttempt =
csContext.getRMContext().getRMApps()
.get(application.getApplicationId()).getCurrentAppAttempt();
if (null == rmAppAttempt.getMasterContainer()) {
if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
&& null == rmAppAttempt.getMasterContainer()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip allocating AM container to app_attempt="
+ application.getApplicationAttemptId()
Expand Down

0 comments on commit 1ea3629

Please sign in to comment.