Skip to content

Commit

Permalink
YARN-8137. Parallelize node addition in SLS. Contributed by Abhishek …
Browse files Browse the repository at this point in the history
…Modi.
  • Loading branch information
Inigo Goiri committed Apr 20, 2018
1 parent 860cc28 commit fd24fd0
Showing 1 changed file with 39 additions and 18 deletions.
Expand Up @@ -33,6 +33,9 @@
import java.util.Set; import java.util.Set;
import java.util.Collections; import java.util.Collections;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;


import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -85,6 +88,7 @@
import org.apache.hadoop.yarn.util.UTCClock; import org.apache.hadoop.yarn.util.UTCClock;
import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources; import org.apache.hadoop.yarn.util.resource.Resources;
import org.eclipse.jetty.util.ConcurrentHashSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -96,9 +100,10 @@ public class SLSRunner extends Configured implements Tool {
private static TaskRunner runner = new TaskRunner(); private static TaskRunner runner = new TaskRunner();
private String[] inputTraces; private String[] inputTraces;
private Map<String, Integer> queueAppNumMap; private Map<String, Integer> queueAppNumMap;
private int poolSize;


// NM simulator // NM simulator
private HashMap<NodeId, NMSimulator> nmMap; private Map<NodeId, NMSimulator> nmMap;
private Resource nodeManagerResource; private Resource nodeManagerResource;
private String nodeFile; private String nodeFile;


Expand Down Expand Up @@ -158,7 +163,7 @@ public void setConf(Configuration conf) {
} }


private void init(Configuration tempConf) throws ClassNotFoundException { private void init(Configuration tempConf) throws ClassNotFoundException {
nmMap = new HashMap<>(); nmMap = new ConcurrentHashMap<>();
queueAppNumMap = new HashMap<>(); queueAppNumMap = new HashMap<>();
amMap = new ConcurrentHashMap<>(); amMap = new ConcurrentHashMap<>();
amClassMap = new HashMap<>(); amClassMap = new HashMap<>();
Expand All @@ -167,7 +172,7 @@ private void init(Configuration tempConf) throws ClassNotFoundException {
setConf(tempConf); setConf(tempConf);


// runner // runner
int poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE, poolSize = tempConf.getInt(SLSConfiguration.RUNNER_POOL_SIZE,
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT); SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
SLSRunner.runner.setQueueSize(poolSize); SLSRunner.runner.setQueueSize(poolSize);
// <AMType, Class> map // <AMType, Class> map
Expand Down Expand Up @@ -283,7 +288,8 @@ protected ApplicationMasterLauncher createAMLauncher() {
rm.start(); rm.start();
} }


private void startNM() throws YarnException, IOException { private void startNM() throws YarnException, IOException,
InterruptedException {
// nm configuration // nm configuration
int heartbeatInterval = getConf().getInt( int heartbeatInterval = getConf().getInt(
SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS, SLSConfiguration.NM_HEARTBEAT_INTERVAL_MS,
Expand Down Expand Up @@ -333,21 +339,36 @@ private void startNM() throws YarnException, IOException {


// create NM simulators // create NM simulators
Random random = new Random(); Random random = new Random();
Set<String> rackSet = new HashSet<String>(); Set<String> rackSet = new ConcurrentHashSet<>();
int threadPoolSize = Math.max(poolSize,
SLSConfiguration.RUNNER_POOL_SIZE_DEFAULT);
ExecutorService executorService = Executors.
newFixedThreadPool(threadPoolSize);
for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) { for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) {
// we randomize the heartbeat start time from zero to 1 interval executorService.submit(new Runnable() {
NMSimulator nm = new NMSimulator(); @Override public void run() {
Resource nmResource = nodeManagerResource; try {
String hostName = entry.getKey(); // we randomize the heartbeat start time from zero to 1 interval
if (entry.getValue() != null) { NMSimulator nm = new NMSimulator();
nmResource = entry.getValue(); Resource nmResource = nodeManagerResource;
} String hostName = entry.getKey();
nm.init(hostName, nmResource, random.nextInt(heartbeatInterval), if (entry.getValue() != null) {
heartbeatInterval, rm, resourceUtilizationRatio); nmResource = entry.getValue();
nmMap.put(nm.getNode().getNodeID(), nm); }
runner.schedule(nm); nm.init(hostName, nmResource,
rackSet.add(nm.getNode().getRackName()); random.nextInt(heartbeatInterval),
heartbeatInterval, rm, resourceUtilizationRatio);
nmMap.put(nm.getNode().getNodeID(), nm);
runner.schedule(nm);
rackSet.add(nm.getNode().getRackName());
} catch (IOException | YarnException e) {
LOG.error("Got an error while adding node", e);
}
}
});
} }
executorService.shutdown();
executorService.awaitTermination(10, TimeUnit.MINUTES);
numRacks = rackSet.size(); numRacks = rackSet.size();
numNMs = nmMap.size(); numNMs = nmMap.size();
} }
Expand Down Expand Up @@ -839,7 +860,7 @@ private void printSimulationInfo() {
(long)(Math.ceil(maxRuntime / 1000.0))); (long)(Math.ceil(maxRuntime / 1000.0)));
} }


public HashMap<NodeId, NMSimulator> getNmMap() { public Map<NodeId, NMSimulator> getNmMap() {
return nmMap; return nmMap;
} }


Expand Down

0 comments on commit fd24fd0

Please sign in to comment.