Skip to content

Commit

Permalink
YARN-8175. Add support for Node Labels in SLS. Contributed by Abhishe…
Browse files Browse the repository at this point in the history
…k Modi.
  • Loading branch information
Inigo Goiri committed Jul 31, 2018
1 parent b28bdc7 commit 9fea5c9
Show file tree
Hide file tree
Showing 12 changed files with 301 additions and 138 deletions.
Expand Up @@ -60,6 +60,7 @@
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.ReservationId; import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
Expand Down Expand Up @@ -298,42 +299,32 @@ private void startNM() throws YarnException, IOException,
SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO, SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO,
SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT); SLSConfiguration.NM_RESOURCE_UTILIZATION_RATIO_DEFAULT);
// nm information (fetch from topology file, or from sls/rumen json file) // nm information (fetch from topology file, or from sls/rumen json file)
Map<String, Resource> nodeResourceMap = new HashMap<>(); Set<NodeDetails> nodeSet = null;
Set<? extends String> nodeSet;
if (nodeFile.isEmpty()) { if (nodeFile.isEmpty()) {
for (String inputTrace : inputTraces) { for (String inputTrace : inputTraces) {
switch (inputType) { switch (inputType) {
case SLS: case SLS:
nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace); nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace);
for (String node : nodeSet) {
nodeResourceMap.put(node, null);
}
break; break;
case RUMEN: case RUMEN:
nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace); nodeSet = SLSUtils.parseNodesFromRumenTrace(inputTrace);
for (String node : nodeSet) {
nodeResourceMap.put(node, null);
}
break; break;
case SYNTH: case SYNTH:
stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0])); stjp = new SynthTraceJobProducer(getConf(), new Path(inputTraces[0]));
nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(), nodeSet = SLSUtils.generateNodes(stjp.getNumNodes(),
stjp.getNumNodes()/stjp.getNodesPerRack()); stjp.getNumNodes()/stjp.getNodesPerRack());
for (String node : nodeSet) {
nodeResourceMap.put(node, null);
}
break; break;
default: default:
throw new YarnException("Input configuration not recognized, " throw new YarnException("Input configuration not recognized, "
+ "trace type should be SLS, RUMEN, or SYNTH"); + "trace type should be SLS, RUMEN, or SYNTH");
} }
} }
} else { } else {
nodeResourceMap = SLSUtils.parseNodesFromNodeFile(nodeFile, nodeSet = SLSUtils.parseNodesFromNodeFile(nodeFile,
nodeManagerResource); nodeManagerResource);
} }


if (nodeResourceMap.size() == 0) { if (nodeSet == null || nodeSet.isEmpty()) {
throw new YarnException("No node! Please configure nodes."); throw new YarnException("No node! Please configure nodes.");
} }


Expand All @@ -344,20 +335,21 @@ private void startNM() throws YarnException, IOException,
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
ExecutorService executorService = Executors. ExecutorService executorService = Executors.
newFixedThreadPool(threadPoolSize); newFixedThreadPool(threadPoolSize);
for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) { for (NodeDetails nodeDetails : nodeSet) {
executorService.submit(new Runnable() { executorService.submit(new Runnable() {
@Override public void run() { @Override public void run() {
try { try {
// we randomize the heartbeat start time from zero to 1 interval // we randomize the heartbeat start time from zero to 1 interval
NMSimulator nm = new NMSimulator(); NMSimulator nm = new NMSimulator();
Resource nmResource = nodeManagerResource; Resource nmResource = nodeManagerResource;
String hostName = entry.getKey(); String hostName = nodeDetails.getHostname();
if (entry.getValue() != null) { if (nodeDetails.getNodeResource() != null) {
nmResource = entry.getValue(); nmResource = nodeDetails.getNodeResource();
} }
Set<NodeLabel> nodeLabels = nodeDetails.getLabels();
nm.init(hostName, nmResource, nm.init(hostName, nmResource,
random.nextInt(heartbeatInterval), random.nextInt(heartbeatInterval),
heartbeatInterval, rm, resourceUtilizationRatio); heartbeatInterval, rm, resourceUtilizationRatio, nodeLabels);
nmMap.put(nm.getNode().getNodeID(), nm); nmMap.put(nm.getNode().getNodeID(), nm);
runner.schedule(nm); runner.schedule(nm);
rackSet.add(nm.getNode().getRackName()); rackSet.add(nm.getNode().getRackName());
Expand Down Expand Up @@ -452,6 +444,11 @@ private void createAMForJob(Map jsonJob) throws YarnException {
jsonJob.get(SLSConfiguration.JOB_END_MS).toString()); jsonJob.get(SLSConfiguration.JOB_END_MS).toString());
} }


String jobLabelExpr = null;
if (jsonJob.containsKey(SLSConfiguration.JOB_LABEL_EXPR)) {
jobLabelExpr = jsonJob.get(SLSConfiguration.JOB_LABEL_EXPR).toString();
}

String user = (String) jsonJob.get(SLSConfiguration.JOB_USER); String user = (String) jsonJob.get(SLSConfiguration.JOB_USER);
if (user == null) { if (user == null) {
user = "default"; user = "default";
Expand Down Expand Up @@ -481,7 +478,8 @@ private void createAMForJob(Map jsonJob) throws YarnException {


for (int i = 0; i < jobCount; i++) { for (int i = 0; i < jobCount; i++) {
runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime, runNewAM(amType, user, queue, oldAppId, jobStartTime, jobFinishTime,
getTaskContainers(jsonJob), getAMContainerResource(jsonJob)); getTaskContainers(jsonJob), getAMContainerResource(jsonJob),
jobLabelExpr);
} }
} }


Expand Down Expand Up @@ -730,7 +728,7 @@ private void startAMFromSynthGenerator() throws YarnException, IOException {


runNewAM(job.getType(), user, jobQueue, oldJobId, runNewAM(job.getType(), user, jobQueue, oldJobId,
jobStartTimeMS, jobFinishTimeMS, containerList, reservationId, jobStartTimeMS, jobFinishTimeMS, containerList, reservationId,
job.getDeadline(), getAMContainerResource(null), job.getDeadline(), getAMContainerResource(null), null,
job.getParams()); job.getParams());
} }
} }
Expand Down Expand Up @@ -775,15 +773,24 @@ private void runNewAM(String jobType, String user,
Resource amContainerResource) { Resource amContainerResource) {
runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS, runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1, jobFinishTimeMS, containerList, null, -1,
amContainerResource, null); amContainerResource, null, null);
} }


private void runNewAM(String jobType, String user, private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS, String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList, long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationId reservationId, long deadline, Resource amContainerResource, Resource amContainerResource, String labelExpr) {
Map<String, String> params) { runNewAM(jobType, user, jobQueue, oldJobId, jobStartTimeMS,
jobFinishTimeMS, containerList, null, -1,
amContainerResource, labelExpr, null);
}


@SuppressWarnings("checkstyle:parameternumber")
private void runNewAM(String jobType, String user,
String jobQueue, String oldJobId, long jobStartTimeMS,
long jobFinishTimeMS, List<ContainerSimulator> containerList,
ReservationId reservationId, long deadline, Resource amContainerResource,
String labelExpr, Map<String, String> params) {
AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance( AMSimulator amSim = (AMSimulator) ReflectionUtils.newInstance(
amClassMap.get(jobType), new Configuration()); amClassMap.get(jobType), new Configuration());


Expand All @@ -799,7 +806,7 @@ private void runNewAM(String jobType, String user,
AM_ID++; AM_ID++;
amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS, amSim.init(heartbeatInterval, containerList, rm, this, jobStartTimeMS,
jobFinishTimeMS, user, jobQueue, isTracked, oldJobId, jobFinishTimeMS, user, jobQueue, isTracked, oldJobId,
runner.getStartTimeMS(), amContainerResource, params); runner.getStartTimeMS(), amContainerResource, labelExpr, params);
if(reservationId != null) { if(reservationId != null) {
// if we have a ReservationId, delegate reservation creation to // if we have a ReservationId, delegate reservation creation to
// AMSim (reservation shape is impl specific) // AMSim (reservation shape is impl specific)
Expand Down Expand Up @@ -985,4 +992,42 @@ static void printUsage() {
System.err.println(); System.err.println();
} }


/**
* Class to encapsulate all details about the node.
*/
@Private
@Unstable
public static class NodeDetails {
private String hostname;
private Resource nodeResource;
private Set<NodeLabel> labels;

public NodeDetails(String nodeHostname) {
this.hostname = nodeHostname;
}

public String getHostname() {
return hostname;
}

public void setHostname(String hostname) {
this.hostname = hostname;
}

public Resource getNodeResource() {
return nodeResource;
}

public void setNodeResource(Resource nodeResource) {
this.nodeResource = nodeResource;
}

public Set<NodeLabel> getLabels() {
return labels;
}

public void setLabels(Set<NodeLabel> labels) {
this.labels = labels;
}
}
} }
Expand Up @@ -88,6 +88,8 @@ public abstract class AMSimulator extends TaskRunner.Task {
private int responseId = 0; private int responseId = 0;
// user name // user name
private String user; private String user;
// nodelabel expression
private String nodeLabelExpression;
// queue name // queue name
protected String queue; protected String queue;
// am type // am type
Expand Down Expand Up @@ -123,7 +125,8 @@ public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager resourceManager, List<ContainerSimulator> containerList, ResourceManager resourceManager,
SLSRunner slsRunnner, long startTime, long finishTime, String simUser, SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
String simQueue, boolean tracked, String oldApp, long baseTimeMS, String simQueue, boolean tracked, String oldApp, long baseTimeMS,
Resource amResource, Map<String, String> params) { Resource amResource, String nodeLabelExpr,
Map<String, String> params) {
super.init(startTime, startTime + 1000000L * heartbeatInterval, super.init(startTime, startTime + 1000000L * heartbeatInterval,
heartbeatInterval); heartbeatInterval);
this.user = simUser; this.user = simUser;
Expand All @@ -136,6 +139,7 @@ public void init(int heartbeatInterval,
this.traceStartTimeMS = startTime; this.traceStartTimeMS = startTime;
this.traceFinishTimeMS = finishTime; this.traceFinishTimeMS = finishTime;
this.amContainerResource = amResource; this.amContainerResource = amResource;
this.nodeLabelExpression = nodeLabelExpr;
} }


/** /**
Expand Down Expand Up @@ -327,6 +331,9 @@ private void submitApp(ReservationId reservationId)
conLauContext.setServiceData(new HashMap<>()); conLauContext.setServiceData(new HashMap<>());
appSubContext.setAMContainerSpec(conLauContext); appSubContext.setAMContainerSpec(conLauContext);
appSubContext.setResource(amContainerResource); appSubContext.setResource(amContainerResource);
if (nodeLabelExpression != null) {
appSubContext.setNodeLabelExpression(nodeLabelExpression);
}


if(reservationId != null) { if(reservationId != null) {
appSubContext.setReservationID(reservationId); appSubContext.setReservationID(reservationId);
Expand Down
Expand Up @@ -126,10 +126,11 @@ public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue, long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS, boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, Map<String, String> params) { Resource amContainerResource, String nodeLabelExpr,
Map<String, String> params) {
super.init(heartbeatInterval, containerList, rm, se, super.init(heartbeatInterval, containerList, rm, se,
traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId, traceStartTime, traceFinishTime, user, queue, isTracked, oldAppId,
baselineStartTimeMS, amContainerResource, params); baselineStartTimeMS, amContainerResource, nodeLabelExpr, params);
amtype = "mapreduce"; amtype = "mapreduce";


// get map/reduce tasks // get map/reduce tasks
Expand Down
Expand Up @@ -96,10 +96,11 @@ public void init(int heartbeatInterval,
List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se, List<ContainerSimulator> containerList, ResourceManager rm, SLSRunner se,
long traceStartTime, long traceFinishTime, String user, String queue, long traceStartTime, long traceFinishTime, String user, String queue,
boolean isTracked, String oldAppId, long baselineStartTimeMS, boolean isTracked, String oldAppId, long baselineStartTimeMS,
Resource amContainerResource, Map<String, String> params) { Resource amContainerResource, String nodeLabelExpr,
Map<String, String> params) {
super.init(heartbeatInterval, containerList, rm, se, traceStartTime, super.init(heartbeatInterval, containerList, rm, se, traceStartTime,
traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS, traceFinishTime, user, queue, isTracked, oldAppId, baselineStartTimeMS,
amContainerResource, params); amContainerResource, nodeLabelExpr, params);
amtype = "stream"; amtype = "stream";


allStreams.addAll(containerList); allStreams.addAll(containerList);
Expand Down
Expand Up @@ -104,6 +104,7 @@ public static Resource getAMContainerResource(Configuration conf) {
public static final String JOB_START_MS = JOB_PREFIX + "start.ms"; public static final String JOB_START_MS = JOB_PREFIX + "start.ms";
public static final String JOB_END_MS = JOB_PREFIX + "end.ms"; public static final String JOB_END_MS = JOB_PREFIX + "end.ms";
public static final String JOB_QUEUE_NAME = JOB_PREFIX + "queue.name"; public static final String JOB_QUEUE_NAME = JOB_PREFIX + "queue.name";
public static final String JOB_LABEL_EXPR = JOB_PREFIX + "label.expression";
public static final String JOB_USER = JOB_PREFIX + "user"; public static final String JOB_USER = JOB_PREFIX + "user";
public static final String JOB_COUNT = JOB_PREFIX + "count"; public static final String JOB_COUNT = JOB_PREFIX + "count";
public static final String JOB_TASKS = JOB_PREFIX + "tasks"; public static final String JOB_TASKS = JOB_PREFIX + "tasks";
Expand Down
Expand Up @@ -23,6 +23,7 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue; import java.util.concurrent.DelayQueue;


Expand All @@ -35,6 +36,7 @@
import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState; import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceUtilization; import org.apache.hadoop.yarn.api.records.ResourceUtilization;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
Expand Down Expand Up @@ -78,7 +80,7 @@ public class NMSimulator extends TaskRunner.Task {


public void init(String nodeIdStr, Resource nodeResource, int dispatchTime, public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
int heartBeatInterval, ResourceManager pRm, int heartBeatInterval, ResourceManager pRm,
float pResourceUtilizationRatio) float pResourceUtilizationRatio, Set<NodeLabel> labels)
throws IOException, YarnException { throws IOException, YarnException {
super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval, super.init(dispatchTime, dispatchTime + 1000000L * heartBeatInterval,
heartBeatInterval); heartBeatInterval);
Expand All @@ -102,13 +104,22 @@ public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
Records.newRecord(RegisterNodeManagerRequest.class); Records.newRecord(RegisterNodeManagerRequest.class);
req.setNodeId(node.getNodeID()); req.setNodeId(node.getNodeID());
req.setResource(node.getTotalCapability()); req.setResource(node.getTotalCapability());
req.setNodeLabels(labels);
req.setHttpPort(80); req.setHttpPort(80);
RegisterNodeManagerResponse response = this.rm.getResourceTrackerService() RegisterNodeManagerResponse response = this.rm.getResourceTrackerService()
.registerNodeManager(req); .registerNodeManager(req);
masterKey = response.getNMTokenMasterKey(); masterKey = response.getNMTokenMasterKey();
this.resourceUtilizationRatio = pResourceUtilizationRatio; this.resourceUtilizationRatio = pResourceUtilizationRatio;
} }


public void init(String nodeIdStr, Resource nodeResource, int dispatchTime,
int heartBeatInterval, ResourceManager pRm,
float pResourceUtilizationRatio)
throws IOException, YarnException {
init(nodeIdStr, nodeResource, dispatchTime, heartBeatInterval, pRm,
pResourceUtilizationRatio, null);
}

@Override @Override
public void firstStep() { public void firstStep() {
// do nothing // do nothing
Expand Down

0 comments on commit 9fea5c9

Please sign in to comment.