Skip to content

Commit

Permalink
YARN-5938. Refactoring OpportunisticContainerAllocator to use Schedul…
Browse files Browse the repository at this point in the history
…erRequestKey instead of Priority and other misc fixes (asuresh)
  • Loading branch information
xslogic committed Dec 27, 2016
1 parent c3973e7 commit ac1e5d4
Show file tree
Hide file tree
Showing 39 changed files with 378 additions and 322 deletions.
Expand Up @@ -158,6 +158,17 @@ public int hashCode() {
return result;
}

@Override
public String toString() {
return "UpdateReq{" +
"containerId=" + getContainerId() + ", " +
"containerVersion=" + getContainerVersion() + ", " +
"targetExecType=" + getExecutionType() + ", " +
"targetCapability=" + getCapability() + ", " +
"updateType=" + getContainerUpdateType() + ", " +
"}";
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
Expand Down
Expand Up @@ -282,8 +282,8 @@ public synchronized void setAllocatedContainers(
final List<Container> containers) {
if (containers == null)
return;
// this looks like a bug because it results in append and not set
initLocalNewContainerList();
allocatedContainers.clear();
allocatedContainers.addAll(containers);
}

Expand All @@ -299,6 +299,7 @@ public synchronized void setUpdatedContainers(
if (containers == null)
return;
initLocalUpdatedContainerList();
updatedContainers.clear();
updatedContainers.addAll(containers);
}

Expand All @@ -315,6 +316,7 @@ public synchronized void setCompletedContainersStatuses(
if (containers == null)
return;
initLocalFinishedContainerList();
completedContainersStatuses.clear();
completedContainersStatuses.addAll(containers);
}

Expand Down
Expand Up @@ -87,4 +87,11 @@ public static RemoteNode newInstance(NodeId nodeId, String httpAddress) {
public int compareTo(RemoteNode other) {
return this.getNodeId().compareTo(other.getNodeId());
}

@Override
public String toString() {
return "RemoteNode{" +
"nodeId=" + getNodeId() + ", " +
"httpAddress=" + getHttpAddress() + "}";
}
}
Expand Up @@ -22,8 +22,15 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.*;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
Expand Down Expand Up @@ -192,40 +199,33 @@ public OpportunisticContainerAllocator(

/**
* Allocate OPPORTUNISTIC containers.
* @param request AllocateRequest
* @param blackList Resource BlackList Request
* @param oppResourceReqs Opportunistic Resource Requests
* @param applicationAttemptId ApplicationAttemptId
* @param opportContext App specific OpportunisticContainerContext
* @param rmIdentifier RM Identifier
* @param appSubmitter App Submitter
* @return List of Containers.
* @throws YarnException YarnException
*/
public List<Container> allocateContainers(
AllocateRequest request, ApplicationAttemptId applicationAttemptId,
public List<Container> allocateContainers(ResourceBlacklistRequest blackList,
List<ResourceRequest> oppResourceReqs,
ApplicationAttemptId applicationAttemptId,
OpportunisticContainerContext opportContext, long rmIdentifier,
String appSubmitter) throws YarnException {
// Update released containers.
List<ContainerId> releasedContainers = request.getReleaseList();
int numReleasedContainers = releasedContainers.size();
if (numReleasedContainers > 0) {
LOG.info("AttemptID: " + applicationAttemptId + " released: "
+ numReleasedContainers);
opportContext.getContainersAllocated().removeAll(releasedContainers);
}

// Update black list.
ResourceBlacklistRequest rbr = request.getResourceBlacklistRequest();
if (rbr != null) {
opportContext.getBlacklist().removeAll(rbr.getBlacklistRemovals());
opportContext.getBlacklist().addAll(rbr.getBlacklistAdditions());
if (blackList != null) {
opportContext.getBlacklist().removeAll(blackList.getBlacklistRemovals());
opportContext.getBlacklist().addAll(blackList.getBlacklistAdditions());
}

// Add OPPORTUNISTIC requests to the outstanding ones.
opportContext.addToOutstandingReqs(request.getAskList());
opportContext.addToOutstandingReqs(oppResourceReqs);

// Satisfy the outstanding OPPORTUNISTIC requests.
List<Container> allocatedContainers = new ArrayList<>();
for (Priority priority :
for (SchedulerRequestKey schedulerKey :
opportContext.getOutstandingOpReqs().descendingKeySet()) {
// Allocated containers :
// Key = Requested Capability,
Expand All @@ -234,7 +234,7 @@ public List<Container> allocateContainers(
// we need the requested capability (key) to match against
// the outstanding reqs)
Map<Resource, List<Container>> allocated = allocate(rmIdentifier,
opportContext, priority, applicationAttemptId, appSubmitter);
opportContext, schedulerKey, applicationAttemptId, appSubmitter);
for (Map.Entry<Resource, List<Container>> e : allocated.entrySet()) {
opportContext.matchAllocationToOutstandingRequest(
e.getKey(), e.getValue());
Expand All @@ -246,19 +246,22 @@ public List<Container> allocateContainers(
}

private Map<Resource, List<Container>> allocate(long rmIdentifier,
OpportunisticContainerContext appContext, Priority priority,
OpportunisticContainerContext appContext, SchedulerRequestKey schedKey,
ApplicationAttemptId appAttId, String userName) throws YarnException {
Map<Resource, List<Container>> containers = new HashMap<>();
for (ResourceRequest anyAsk :
appContext.getOutstandingOpReqs().get(priority).values()) {
appContext.getOutstandingOpReqs().get(schedKey).values()) {
allocateContainersInternal(rmIdentifier, appContext.getAppParams(),
appContext.getContainerIdGenerator(), appContext.getBlacklist(),
appAttId, appContext.getNodeMap(), userName, containers, anyAsk);
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.get(anyAsk.getCapability()).size());
if (!containers.isEmpty()) {
LOG.info("Opportunistic allocation requested for ["
+ "priority=" + anyAsk.getPriority()
+ ", allocationRequestId=" + anyAsk.getAllocationRequestId()
+ ", num_containers=" + anyAsk.getNumContainers()
+ ", capability=" + anyAsk.getCapability() + "]"
+ " allocated = " + containers.keySet());
}
}
return containers;
}
Expand All @@ -282,7 +285,9 @@ private void allocateContainersInternal(long rmIdentifier,
nodesForScheduling.add(nodeEntry.getValue());
}
if (nodesForScheduling.isEmpty()) {
LOG.warn("No nodes available for allocating opportunistic containers.");
LOG.warn("No nodes available for allocating opportunistic containers. [" +
"allNodes=" + allNodes + ", " +
"blacklist=" + blacklist + "]");
return;
}
int numAllocated = 0;
Expand Down
Expand Up @@ -18,12 +18,7 @@

package org.apache.hadoop.yarn.server.scheduler;

import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
Expand Down Expand Up @@ -52,9 +47,6 @@ public class OpportunisticContainerContext {
private static final Logger LOG = LoggerFactory
.getLogger(OpportunisticContainerContext.class);

// Currently just used to keep track of allocated containers.
// Can be used for reporting stats later.
private Set<ContainerId> containersAllocated = new HashSet<>();
private AllocationParams appParams =
new AllocationParams();
private ContainerIdGenerator containerIdGenerator =
Expand All @@ -69,13 +61,9 @@ public class OpportunisticContainerContext {
// Resource Name (host/rack/any) and capability. This mapping is required
// to match a received Container to an outstanding OPPORTUNISTIC
// ResourceRequest (ask).
private final TreeMap<Priority, Map<Resource, ResourceRequest>>
private final TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
outstandingOpReqs = new TreeMap<>();

public Set<ContainerId> getContainersAllocated() {
return containersAllocated;
}

public AllocationParams getAppParams() {
return appParams;
}
Expand Down Expand Up @@ -119,20 +107,11 @@ public Set<String> getBlacklist() {
return blacklist;
}

public TreeMap<Priority, Map<Resource, ResourceRequest>>
public TreeMap<SchedulerRequestKey, Map<Resource, ResourceRequest>>
getOutstandingOpReqs() {
return outstandingOpReqs;
}

public void updateCompletedContainers(AllocateResponse allocateResponse) {
for (ContainerStatus cs :
allocateResponse.getCompletedContainersStatuses()) {
if (cs.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
containersAllocated.remove(cs.getContainerId());
}
}
}

/**
* Takes a list of ResourceRequests (asks), extracts the key information viz.
* (Priority, ResourceName, Capability) and adds to the outstanding
Expand All @@ -144,7 +123,7 @@ public void updateCompletedContainers(AllocateResponse allocateResponse) {
*/
public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
for (ResourceRequest request : resourceAsks) {
Priority priority = request.getPriority();
SchedulerRequestKey schedulerKey = SchedulerRequestKey.create(request);

// TODO: Extend for Node/Rack locality. We only handle ANY requests now
if (!ResourceRequest.isAnyLocation(request.getResourceName())) {
Expand All @@ -156,10 +135,10 @@ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
}

Map<Resource, ResourceRequest> reqMap =
outstandingOpReqs.get(priority);
outstandingOpReqs.get(schedulerKey);
if (reqMap == null) {
reqMap = new HashMap<>();
outstandingOpReqs.put(priority, reqMap);
outstandingOpReqs.put(schedulerKey, reqMap);
}

ResourceRequest resourceRequest = reqMap.get(request.getCapability());
Expand All @@ -171,7 +150,8 @@ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
resourceRequest.getNumContainers() + request.getNumContainers());
}
if (ResourceRequest.isAnyLocation(request.getResourceName())) {
LOG.info("# of outstandingOpReqs in ANY (at priority = " + priority
LOG.info("# of outstandingOpReqs in ANY (at" +
"priority = "+ schedulerKey.getPriority()
+ ", with capability = " + request.getCapability() + " ) : "
+ resourceRequest.getNumContainers());
}
Expand All @@ -187,9 +167,9 @@ public void addToOutstandingReqs(List<ResourceRequest> resourceAsks) {
public void matchAllocationToOutstandingRequest(Resource capability,
List<Container> allocatedContainers) {
for (Container c : allocatedContainers) {
containersAllocated.add(c.getId());
SchedulerRequestKey schedulerKey = SchedulerRequestKey.extractFrom(c);
Map<Resource, ResourceRequest> asks =
outstandingOpReqs.get(c.getPriority());
outstandingOpReqs.get(schedulerKey);

if (asks == null) {
continue;
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
package org.apache.hadoop.yarn.server.scheduler;

import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Priority;
Expand Down Expand Up @@ -53,7 +53,7 @@ public static SchedulerRequestKey extractFrom(Container container) {
container.getAllocationRequestId());
}

private SchedulerRequestKey(Priority priority, long allocationRequestId) {
SchedulerRequestKey(Priority priority, long allocationRequestId) {
this.priority = priority;
this.allocationRequestId = allocationRequestId;
}
Expand Down Expand Up @@ -119,4 +119,12 @@ public int hashCode() {
getAllocationRequestId() >>> 32));
return result;
}

@Override
public String toString() {
return "SchedulerRequestKey{" +
"priority=" + priority +
", allocationRequestId=" + allocationRequestId +
'}';
}
}
Expand Up @@ -227,10 +227,10 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
.partitionAskList(request.getAllocateRequest().getAskList());

// Allocate OPPORTUNISTIC containers.
request.getAllocateRequest().setAskList(partitionedAsks.getOpportunistic());
List<Container> allocatedContainers =
containerAllocator.allocateContainers(
request.getAllocateRequest(), applicationAttemptId,
request.getAllocateRequest().getResourceBlacklistRequest(),
partitionedAsks.getOpportunistic(), applicationAttemptId,
oppContainerContext, rmIdentifier, appSubmitter);

// Prepare request for sending to RM for scheduling GUARANTEED containers.
Expand All @@ -252,18 +252,11 @@ public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
nodeTokens.put(nmToken.getNodeId(), nmToken);
}

oppContainerContext.updateCompletedContainers(dsResp.getAllocateResponse());

// Check if we have NM tokens for all the allocated containers. If not
// generate one and update the response.
updateAllocateResponse(
dsResp.getAllocateResponse(), nmTokens, allocatedContainers);

if (LOG.isDebugEnabled()) {
LOG.debug("Number of opportunistic containers currently" +
"allocated by application: " + oppContainerContext
.getContainersAllocated().size());
}
return dsResp;
}
}

0 comments on commit ac1e5d4

Please sign in to comment.