Skip to content

Commit

Permalink
[TACHYON-964] Format before starting Tachyon master
Browse files Browse the repository at this point in the history
  • Loading branch information
apc999 committed Sep 18, 2015
1 parent d80aab4 commit 7e54551
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 37 deletions.
2 changes: 1 addition & 1 deletion integration/bin/tachyon-yarn.sh
Expand Up @@ -8,4 +8,4 @@ APP_MASTER_JAR=hdfs://localhost:9000/tachyon-assemblies-0.8.0-SNAPSHOT-jar-with-
${HADOOP_HOME}/bin/hadoop fs -put -f ${TACHYON_HOME}/assembly/target/tachyon-assemblies-0.8.0-SNAPSHOT-jar-with-dependencies.jar ${APP_MASTER_JAR} ${HADOOP_HOME}/bin/hadoop fs -put -f ${TACHYON_HOME}/assembly/target/tachyon-assemblies-0.8.0-SNAPSHOT-jar-with-dependencies.jar ${APP_MASTER_JAR}


${HADOOP_HOME}/bin/yarn jar ${TACHYON_HOME}/assembly/target/tachyon-assemblies-0.8.0-SNAPSHOT-jar-with-dependencies.jar tachyon.yarn.Client -jar ${APP_MASTER_JAR} \ ${HADOOP_HOME}/bin/yarn jar ${TACHYON_HOME}/assembly/target/tachyon-assemblies-0.8.0-SNAPSHOT-jar-with-dependencies.jar tachyon.yarn.Client -jar ${APP_MASTER_JAR} \
-num_workers 1 -tachyon_home ${TACHYON_HOME} -num_workers 1 -tachyon_home ${TACHYON_HOME} -master_address localhost
49 changes: 33 additions & 16 deletions integration/src/main/java/tachyon/yarn/ApplicationMaster.java
Expand Up @@ -16,6 +16,7 @@
package tachyon.yarn; package tachyon.yarn;


import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
Expand Down Expand Up @@ -59,6 +60,7 @@ public final class ApplicationMaster implements AMRMClientAsync.CallbackHandler
private final int mNumWorkers; private final int mNumWorkers;


private final String mTachyonHome; private final String mTachyonHome;
private final String mMasterAddress;
private final YarnConfiguration mYarnConf = new YarnConfiguration(); private final YarnConfiguration mYarnConf = new YarnConfiguration();
private final TachyonConf mTachyonConf = new TachyonConf(); private final TachyonConf mTachyonConf = new TachyonConf();
/** Client to talk to Resource Manager */ /** Client to talk to Resource Manager */
Expand All @@ -72,13 +74,14 @@ public final class ApplicationMaster implements AMRMClientAsync.CallbackHandler
/** Network address of the container allocated for Tachyon master */ /** Network address of the container allocated for Tachyon master */
private String mMasterContainerNetAddress; private String mMasterContainerNetAddress;


public ApplicationMaster(int numWorkers, String tachyonHome) { public ApplicationMaster(int numWorkers, String tachyonHome, String masterAddress) {
mMasterCpu = mTachyonConf.getInt(Constants.MASTER_RESOURCE_CPU); mMasterCpu = mTachyonConf.getInt(Constants.MASTER_RESOURCE_CPU);
mMasterMem = (int) mTachyonConf.getBytes(Constants.MASTER_RESOURCE_MEM) / Constants.MB; mMasterMem = (int) mTachyonConf.getBytes(Constants.MASTER_RESOURCE_MEM) / Constants.MB;
mWorkerCpu = mTachyonConf.getInt(Constants.WORKER_RESOURCE_CPU); mWorkerCpu = mTachyonConf.getInt(Constants.WORKER_RESOURCE_CPU);
mWorkerMem = (int) mTachyonConf.getBytes(Constants.WORKER_RESOURCE_MEM) / Constants.MB; mWorkerMem = (int) mTachyonConf.getBytes(Constants.WORKER_RESOURCE_MEM) / Constants.MB;
mNumWorkers = numWorkers; mNumWorkers = numWorkers;
mTachyonHome = tachyonHome; mTachyonHome = tachyonHome;
mMasterAddress = masterAddress;
mMasterContainerAllocated = false; mMasterContainerAllocated = false;
mNumAllocatedWorkerContainers = 0; mNumAllocatedWorkerContainers = 0;
} }
Expand All @@ -91,7 +94,9 @@ public static void main(String[] args) {
LOG.info("Starting Application Master with args " + Arrays.toString(args)); LOG.info("Starting Application Master with args " + Arrays.toString(args));
final int numWorkers = Integer.valueOf(args[0]); final int numWorkers = Integer.valueOf(args[0]);
final String tachyonHome = args[1]; final String tachyonHome = args[1];
ApplicationMaster applicationMaster = new ApplicationMaster(numWorkers, tachyonHome); final String masterAddress = args[2];
ApplicationMaster applicationMaster =
new ApplicationMaster(numWorkers, tachyonHome, masterAddress);
applicationMaster.start(); applicationMaster.start();
applicationMaster.requestContainers(); applicationMaster.requestContainers();
applicationMaster.stop(); applicationMaster.stop();
Expand Down Expand Up @@ -157,9 +162,11 @@ public void requestContainers() throws Exception {
masterResource.setMemory(mMasterMem); masterResource.setMemory(mMasterMem);
masterResource.setVirtualCores(mMasterCpu); masterResource.setVirtualCores(mMasterCpu);


String[] nodes = {mMasterAddress};

// Make container request for Tachyon master to ResourceManager // Make container request for Tachyon master to ResourceManager
ContainerRequest masterContainerAsk = ContainerRequest masterContainerAsk =
new ContainerRequest(masterResource, null /* any hosts */, null /* any racks */, priority); new ContainerRequest(masterResource, nodes, null /* any racks */, priority);
LOG.info("Making resource request for Tachyon master"); LOG.info("Making resource request for Tachyon master");
mRMClient.addContainerRequest(masterContainerAsk); mRMClient.addContainerRequest(masterContainerAsk);


Expand Down Expand Up @@ -203,19 +210,29 @@ public void stop() {
} }


private void launchTachyonMasterContainers(List<Container> containers) { private void launchTachyonMasterContainers(List<Container> containers) {
String startScript = PathUtils.concatPath(mTachyonHome, "bin", "tachyon-start.sh"); final String formatCommand =
String command = new CommandBuilder(PathUtils.concatPath(mTachyonHome, "bin", "tachyon"))
startScript + " master" + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" .addArg("format").addArg("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout")
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"; .addArg("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr").toString();

final String startCommand =
new CommandBuilder(PathUtils.concatPath(mTachyonHome, "bin", "tachyon-start.sh"))
.addArg("master").addArg("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout")
.addArg("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr").toString();

List<String> commands = new ArrayList<String>();
commands.add(formatCommand);
commands.add("&&");
commands.add(startCommand);


for (Container container : containers) { for (Container container : containers) {
try { try {
// Launch container by create ContainerLaunchContext // Launch container by create ContainerLaunchContext
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
ctx.setCommands(Collections.singletonList(command)); ctx.setCommands(commands);
LOG.info("Launching container {} for Tachyon master on {} ", LOG.info("Launching container {} for Tachyon master on {} ", container.getId(),
container.getId(), container.getNodeHttpAddress()); container.getNodeHttpAddress());
LOG.info("--------- with master command: " + command); LOG.info("--------- with master command: " + commands);
mNMClient.startContainer(container, ctx); mNMClient.startContainer(container, ctx);
String containerUri = container.getNodeHttpAddress(); // in the form of 1.2.3.4:8042 String containerUri = container.getNodeHttpAddress(); // in the form of 1.2.3.4:8042
mMasterContainerNetAddress = containerUri.split(":")[0]; mMasterContainerNetAddress = containerUri.split(":")[0];
Expand All @@ -240,19 +257,19 @@ private void launchTachyonWorkerContainers(List<Container> containers) {
FormatUtils.getSizeFromBytes((long) mWorkerMem * Constants.MB)); FormatUtils.getSizeFromBytes((long) mWorkerMem * Constants.MB));


for (Container container : containers) { for (Container container : containers) {
if (mNumAllocatedWorkerContainers >= mNumWorkers) {
break;
}
try { try {
// Launch container by create ContainerLaunchContext // Launch container by create ContainerLaunchContext
ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class);
ctx.setCommands(Collections.singletonList(command)); ctx.setCommands(Collections.singletonList(command));
ctx.setEnvironment(environmentMap); ctx.setEnvironment(environmentMap);
LOG.info("Launching container {} for Tachyon worker {} on {} ", LOG.info("Launching container {} for Tachyon worker {} on {} ", container.getId(),
container.getId(), mNumAllocatedWorkerContainers, container.getNodeHttpAddress()); mNumAllocatedWorkerContainers, container.getNodeHttpAddress());
LOG.info("--------- with worker command: " + command); LOG.info("--------- with worker command: " + command);
mNMClient.startContainer(container, ctx); mNMClient.startContainer(container, ctx);
mNumAllocatedWorkerContainers ++; mNumAllocatedWorkerContainers ++;
if (mNumAllocatedWorkerContainers >= mNumWorkers) {
return;
}
} catch (Exception ex) { } catch (Exception ex) {
LOG.error("Error launching container " + container.getId() + " " + ex); LOG.error("Error launching container " + container.getId() + " " + ex);
} }
Expand Down
37 changes: 17 additions & 20 deletions integration/src/main/java/tachyon/yarn/Client.java
Expand Up @@ -20,7 +20,6 @@
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.Vector;


import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.GnuParser;
Expand Down Expand Up @@ -107,7 +106,10 @@ public final class Client {
private long mClientTimeout; private long mClientTimeout;
/** Number of tachyon workers. */ /** Number of tachyon workers. */
private int mNumWorkers; private int mNumWorkers;
/** Tachyon home path on YARN containers. */
private String mTachyonHome; private String mTachyonHome;
/** Address to run Tachyon master. */
private String mMasterAddress;
/** Id of the application */ /** Id of the application */
private ApplicationId mAppId; private ApplicationId mAppId;
/** Command line options */ /** Command line options */
Expand All @@ -126,6 +128,7 @@ public Client() {
+ "application master"); + "application master");
mOptions.addOption("jar", true, "Jar file containing the application master"); mOptions.addOption("jar", true, "Jar file containing the application master");
mOptions.addOption("tachyon_home", true, "Path to Tachyon home dir on YARN slave machines"); mOptions.addOption("tachyon_home", true, "Path to Tachyon home dir on YARN slave machines");
mOptions.addOption("master_address", true, "Address to run Tachyon master");
mOptions.addOption("debug", false, "Dump out debug information"); mOptions.addOption("debug", false, "Dump out debug information");
mOptions.addOption("help", false, "Print usage"); mOptions.addOption("help", false, "Print usage");
mOptions.addOption("num_workers", true, "Number of tachyon workers to launch"); mOptions.addOption("num_workers", true, "Number of tachyon workers to launch");
Expand Down Expand Up @@ -178,13 +181,16 @@ public boolean parseArgs(String[] args) throws ParseException {
printUsage(); printUsage();
return false; return false;
} }
if (!cliParser.hasOption("jar") || !cliParser.hasOption("tachyon_home")) { if (!cliParser.hasOption("jar") || !cliParser.hasOption("tachyon_home")
|| !cliParser.hasOption("master_address")) {
printUsage(); printUsage();
return false; return false;
} }


mHDFSAppMasterJar = cliParser.getOptionValue("jar"); mHDFSAppMasterJar = cliParser.getOptionValue("jar");
mTachyonHome = cliParser.getOptionValue("tachyon_home"); mTachyonHome = cliParser.getOptionValue("tachyon_home");
mMasterAddress = cliParser.getOptionValue("master_address");

mAppName = cliParser.getOptionValue("appname", "Tachyon"); mAppName = cliParser.getOptionValue("appname", "Tachyon");
mAmPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); mAmPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0"));
mAmQueue = cliParser.getOptionValue("queue", "default"); mAmQueue = cliParser.getOptionValue("queue", "default");
Expand Down Expand Up @@ -265,24 +271,15 @@ private void checkClusterResource(GetNewApplicationResponse appResponse) {
} }


private void setupContainerLaunchContext() throws IOException { private void setupContainerLaunchContext() throws IOException {
Vector<CharSequence> vargs = new Vector<CharSequence>(30); final String amCommand =
vargs.add(Environment.JAVA_HOME.$$() + "/bin/java"); new CommandBuilder(Environment.JAVA_HOME.$$() + "/bin/java").addArg("-Xmx256M")
vargs.add("-Xmx256M"); .addArg(mAppMasterMainClass).addArg(mNumWorkers).addArg(mTachyonHome)
vargs.add(mAppMasterMainClass); .addArg(mMasterAddress)
vargs.add(String.valueOf(mNumWorkers)); .addArg("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout")
vargs.add(mTachyonHome); .addArg("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr").toString();
vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout");
vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"); System.out.println("AM command: " + amCommand);

mAmContainer.setCommands(Collections.singletonList(amCommand));
// Get final command
StringBuilder commandBuilder = new StringBuilder();
for (CharSequence str : vargs) {
commandBuilder.append(str).append(" ");
}
String command = commandBuilder.toString();

System.out.println("AM command: " + command);
mAmContainer.setCommands(Collections.singletonList(command));


// Setup jar for ApplicationMaster // Setup jar for ApplicationMaster
LocalResource appMasterJar = Records.newRecord(LocalResource.class); LocalResource appMasterJar = Records.newRecord(LocalResource.class);
Expand Down

0 comments on commit 7e54551

Please sign in to comment.