Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Complete.

  • Loading branch information...
commit c8bdc373c12bd36d8956aa140628734a503cd8dd 1 parent f70be14
@acmurthy acmurthy authored
View
5 .gitignore
@@ -4,3 +4,8 @@
*.jar
*.war
*.ear
+
+.classpath
+.project
+.settings
+target
View
21 pom.xml
@@ -0,0 +1,21 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>com.hortonworks</groupId>
+ <artifactId>simple-yarn-app</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <name>simple-yarn-app</name>
+ <url>http://maven.apache.org</url>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ <version>2.1.1-SNAPSHOT</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.1.1-SNAPSHOT</version>
+ </dependency>
+ </dependencies>
+</project>
View
92 src/main/java/com/hortonworks/simpleyarnapp/ApplicationMaster.java
@@ -0,0 +1,92 @@
+package com.hortonworks.simpleyarnapp;
+
+import java.util.Collections;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.util.Records;
+
+public class ApplicationMaster {
+
+ public static void main(String[] args) throws Exception {
+
+ final String command = args[0];
+ final int n = Integer.valueOf(args[1]);
+
+ // Initialize clients to ResourceManager and NodeManagers
+ Configuration conf = new Configuration();
+
+ AMRMClient<ContainerRequest> rmClient = AMRMClient.createAMRMClient();
+ rmClient.init(conf);
+ rmClient.start();
+
+ NMClient nmClient = NMClient.createNMClient();
+ nmClient.init(conf);
+ nmClient.start();
+
+ // Register with ResourceManager
+ rmClient.registerApplicationMaster("", 0, "");
+
+ // Priority for worker containers - priorities are intra-application
+ Priority priority = Records.newRecord(Priority.class);
+ priority.setPriority(0);
+
+ // Resource requirements for worker containers
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(128);
+ capability.setVirtualCores(1);
+
+ // Make container requests to ResourceManager
+ for (int i = 0; i < n; ++i) {
+ ContainerRequest containerAsk = new ContainerRequest(capability, null, null, priority);
+ rmClient.addContainerRequest(containerAsk);
+ }
+
+ // Obtain allocated containers and launch
+ int allocatedContainers = 0;
+ while (allocatedContainers < n) {
+ AllocateResponse response = rmClient.allocate(0);
+ for (Container container : response.getAllocatedContainers()) {
+ ++allocatedContainers;
+
+ // Launch container by create ContainerLaunchContext
+ ContainerLaunchContext ctx =
+ Records.newRecord(ContainerLaunchContext.class);
+ ctx.setCommands(
+ Collections.singletonList(
+ command +
+ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
+ ));
+ nmClient.startContainer(container, ctx);
+ }
+ Thread.sleep(100);
+ }
+
+ // Now wait for containers to complete
+ int completedContainers = 0;
+ while (completedContainers < n) {
+ AllocateResponse response = rmClient.allocate(completedContainers/n);
+ for (ContainerStatus status : response.getCompletedContainersStatuses()) {
+ if (status.getExitStatus() == 0) {
+ ++completedContainers;
+ }
+ }
+ Thread.sleep(100);
+ }
+
+ // Un-register with ResourceManager
+ rmClient.unregisterApplicationMaster(
+ FinalApplicationStatus.SUCCEEDED, "", "");
+ }
+}
View
136 src/main/java/com/hortonworks/simpleyarnapp/Client.java
@@ -0,0 +1,136 @@
+package com.hortonworks.simpleyarnapp;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+
+public class Client {
+
+ Configuration conf = new YarnConfiguration();
+
+ public void run(String[] args) throws Exception {
+ final String command = args[0];
+ final int n = Integer.valueOf(args[1]);
+ final Path jarPath = new Path(args[2]);
+
+ // Create yarnClient
+ YarnConfiguration conf = new YarnConfiguration();
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(conf);
+ yarnClient.start();
+
+ // Create application via yarnClient
+ YarnClientApplication app = yarnClient.createApplication();
+
+ // Set up the container launch context for the application master
+ ContainerLaunchContext amContainer =
+ Records.newRecord(ContainerLaunchContext.class);
+ amContainer.setCommands(
+ Collections.singletonList(
+ "$JAVA_HOME/bin/java" +
+ " -Xmx256M" +
+ " com.hortonworks.simpleyarnapp.ApplicationMaster" +
+ " " + command +
+ " " + String.valueOf(n) +
+ " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" +
+ " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr"
+ )
+ );
+
+ // Setup jar for ApplicationMaster
+ LocalResource appMasterJar = Records.newRecord(LocalResource.class);
+ setupAppMasterJar(jarPath, appMasterJar);
+ amContainer.setLocalResources(
+ Collections.singletonMap("simpleapp.jar", appMasterJar));
+
+ // Setup CLASSPATH for ApplicationMaster
+ Map<String, String> appMasterEnv = new HashMap<String, String>();
+ setupAppMasterEnv(appMasterEnv);
+ amContainer.setEnvironment(appMasterEnv);
+
+ // Set up resource type requirements for ApplicationMaster
+ Resource capability = Records.newRecord(Resource.class);
+ capability.setMemory(256);
+ capability.setVirtualCores(1);
+
+ // Finally, set-up ApplicationSubmissionContext for the application
+ ApplicationSubmissionContext appContext =
+ app.getApplicationSubmissionContext();
+ appContext.setApplicationName("simple-yarn-app"); // application name
+ appContext.setAMContainerSpec(amContainer);
+ appContext.setResource(capability);
+ appContext.setQueue("default"); // queue
+
+ // Submit application
+ ApplicationId appId = appContext.getApplicationId();
+ System.out.println("Submitting application " + appId);
+ yarnClient.submitApplication(appContext);
+
+ ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+ YarnApplicationState appState = appReport.getYarnApplicationState();
+ while (appState != YarnApplicationState.FINISHED &&
+ appState != YarnApplicationState.KILLED &&
+ appState != YarnApplicationState.FAILED) {
+ Thread.sleep(100);
+ appReport = yarnClient.getApplicationReport(appId);
+ appState = appReport.getYarnApplicationState();
+ }
+
+ System.out.println(
+ "Application " + appId + " finished with" +
+ " state " + appState +
+ " at " + appReport.getFinishTime());
+
+ }
+
+ private void setupAppMasterJar(Path jarPath, LocalResource appMasterJar) throws IOException {
+ FileStatus jarStat = FileSystem.get(conf).getFileStatus(jarPath);
+ appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath));
+ appMasterJar.setSize(jarStat.getLen());
+ appMasterJar.setTimestamp(jarStat.getModificationTime());
+ appMasterJar.setType(LocalResourceType.FILE);
+ appMasterJar.setVisibility(LocalResourceVisibility.PUBLIC);
+ }
+
+ private void setupAppMasterEnv(Map<String, String> appMasterEnv) {
+ for (String c : conf.getStrings(
+ YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+ Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(),
+ c.trim());
+ }
+ Apps.addToEnvironment(appMasterEnv,
+ Environment.CLASSPATH.name(),
+ Environment.PWD.$() + File.separator + "*");
+ }
+
+ public static void main(String[] args) throws Exception {
+ Client c = new Client();
+ c.run(args);
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.