Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge pull request #18 from flyicewolf/master

Delete DistributeCache & add basic skeleton of LocalJobRunner
  • Loading branch information...
commit f51db820afcf7f61fe3021a6e181135b32196797 2 parents b6676d2 + 46bcc13
@flyicewolf flyicewolf authored
Showing with 891 additions and 449 deletions.
  1. +6 −2 hadoop-dragon-core/pom.xml
  2. +103 −275 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonApps.java
  3. +2 −0  hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonConfig.java
  4. +4 −4 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/ResourceMgrDelegate.java
  5. +38 −124 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/YarnRunner.java
  6. +57 −40 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Job.java
  7. +6 −0 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Mission.java
  8. +34 −0 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Task.java
  9. +27 −0 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/TaskAttempt.java
  10. +83 −0 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/local/LocalJob.java
  11. +256 −0 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/local/LocalJobRunner.java
  12. +72 −0 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/local/LocalJobRunnerMetrics.java
  13. +18 −4 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/records/JobId.java
  14. +21 −0 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/records/TaskAttemptId.java
  15. +17 −0 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/records/TaskId.java
  16. +147 −0 hadoop-dragon-core/src/test/java/org/apache/hadoop/realtime/TestYarnRunner.java
View
8 hadoop-dragon-core/pom.xml
@@ -45,6 +45,12 @@
<artifactId>guava</artifactId>
<version>11.0.2</version>
</dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.8.5</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
@@ -72,7 +78,6 @@
</execution>
</executions>
</plugin>
-
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
@@ -115,7 +120,6 @@
</execution>
</executions>
</plugin>
-
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
View
378 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonApps.java
@@ -18,33 +18,36 @@
package org.apache.hadoop.realtime;
-import java.io.BufferedReader;
-import java.io.File;
import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URL;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Vector;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.ContainerLogAppender;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.Apps;
-import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
/**
* Helper class for Dragon applications
@@ -52,318 +55,143 @@
@Private
@Unstable
public class DragonApps extends Apps {
- public static final String JOB = "job";
- public static final String TASK = "task";
- public static final String ATTEMPT = "attempt";
- public static void setupDistributedCache(Configuration conf,
- Map<String, LocalResource> localResources) throws IOException {
-
- // Cache archives
- parseDistributedCache(conf, localResources, LocalResourceType.ARCHIVE,
- getCacheArchives(conf), getArchiveTimestamps(conf),
- getFileSizes(conf, DragonJobConfig.CACHE_ARCHIVES_SIZES),
- getArchiveVisibilities(conf), getArchiveClassPaths(conf));
-
- // Cache files
- parseDistributedCache(conf, localResources, LocalResourceType.FILE,
- getCacheFiles(conf), getFileTimestamps(conf),
- getFileSizes(conf, DragonJobConfig.CACHE_FILES_SIZES),
- getFileVisibilities(conf), getFileClassPaths(conf));
- }
-
- private static void parseDistributedCache(Configuration conf,
- Map<String, LocalResource> localResources, LocalResourceType type,
- URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[],
- Path[] pathsToPutOnClasspath) throws IOException {
-
- if (uris != null) {
- // Sanity check
- if ((uris.length != timestamps.length) || (uris.length != sizes.length)
- || (uris.length != visibilities.length)) {
- throw new IllegalArgumentException("Invalid specification for "
- + "distributed-cache artifacts of type " + type + " :" + " #uris="
- + uris.length + " #timestamps=" + timestamps.length
- + " #visibilities=" + visibilities.length);
- }
-
- Map<String, Path> classPaths = new HashMap<String, Path>();
- if (pathsToPutOnClasspath != null) {
- for (Path p : pathsToPutOnClasspath) {
- FileSystem remoteFS = p.getFileSystem(conf);
- p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory()));
- classPaths.put(p.toUri().getPath().toString(), p);
- }
- }
- for (int i = 0; i < uris.length; ++i) {
- URI u = uris[i];
- Path p = new Path(u);
- FileSystem remoteFS = p.getFileSystem(conf);
- p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
- remoteFS.getWorkingDirectory()));
- // Add URI fragment or just the filename
- Path name = new Path((null == u.getFragment()) ? p.getName()
- : u.getFragment());
- if (name.isAbsolute()) {
- throw new IllegalArgumentException("Resource name must be relative");
- }
- String linkName = name.toUri().getPath();
- localResources.put(linkName, BuilderUtils.newLocalResource(p.toUri(),
- type, visibilities[i] ? LocalResourceVisibility.PUBLIC
- : LocalResourceVisibility.PRIVATE, sizes[i], timestamps[i]));
- }
- }
- }
-
- private static void setDragonFrameworkClasspath(
- Map<String, String> environment, Configuration conf) throws IOException {
- InputStream classpathFileStream = null;
- BufferedReader reader = null;
- try {
- // Get yarn mapreduce-app classpath from generated classpath
- // Works if compile time env is same as runtime. Mainly tests.
- ClassLoader thisClassLoader = Thread.currentThread()
- .getContextClassLoader();
- String mrAppGeneratedClasspathFile = "mrapp-generated-classpath";
- classpathFileStream = thisClassLoader
- .getResourceAsStream(mrAppGeneratedClasspathFile);
-
- // Put the file itself on classpath for tasks.
- URL classpathResource = thisClassLoader
- .getResource(mrAppGeneratedClasspathFile);
- if (classpathResource != null) {
- String classpathElement = classpathResource.getFile();
- if (classpathElement.contains("!")) {
- classpathElement = classpathElement.substring(0,
- classpathElement.indexOf("!"));
- } else {
- classpathElement = new File(classpathElement).getParent();
- }
- Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
- classpathElement);
- }
-
- if (classpathFileStream != null) {
- reader = new BufferedReader(new InputStreamReader(classpathFileStream));
- String cp = reader.readLine();
- if (cp != null) {
- Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
- cp.trim());
- }
- }
-
- // Add standard Hadoop classes
- for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
- .split(",")) {
- Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
- c.trim());
- }
- } finally {
- if (classpathFileStream != null) {
- classpathFileStream.close();
- }
- if (reader != null) {
- reader.close();
- }
- }
- // TODO: Remove duplicates.
- }
+ private static final RecordFactory recordFactory = RecordFactoryProvider
+ .getRecordFactory(null);
public static void setClasspath(Map<String, String> environment,
Configuration conf) throws IOException {
- boolean userClassesTakesPrecedence = conf.getBoolean(
- DragonJobConfig.DRAGON_JOB_USER_CLASSPATH_FIRST, false);
-
- if (!userClassesTakesPrecedence) {
- DragonApps.setDragonFrameworkClasspath(environment, conf);
+ // Add standard Hadoop classes
+ for (String c : conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH)
+ .split(",")) {
+ Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c.trim());
}
+ // Add Job_Jar
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
DragonJobConfig.JOB_JAR);
+ // Add current path
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
Environment.PWD.$() + Path.SEPARATOR + "*");
- if (userClassesTakesPrecedence) {
- DragonApps.setDragonFrameworkClasspath(environment, conf);
- }
- }
-
- private static final String STAGING_CONSTANT = ".staging";
-
- public static Path getStagingAreaDir(Configuration conf, String user) {
- return new Path(conf.get(DragonJobConfig.DRAGON_AM_STAGING_DIR)
- + Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
- }
-
- /**
- * Add the JVM system properties necessary to configure
- * {@link ContainerLogAppender}.
- *
- * @param logLevel
- * the desired log level (eg INFO/WARN/DEBUG)
- * @param logSize
- * See {@link ContainerLogAppender#setTotalLogFileSize(long)}
- * @param vargs
- * the argument list to append to
- */
- public static void addLog4jSystemProperties(String logLevel, long logSize,
- List<String> vargs) {
- vargs.add("-Dlog4j.configuration=container-log4j.properties");
- vargs.add("-D" + DragonJobConfig.TASK_LOG_DIR + "="
- + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
- vargs.add("-D" + DragonJobConfig.TASK_LOG_SIZE + "=" + logSize);
- vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
}
/**
- * Get cache archives set in the Configuration.
*
+ * @param applicationId
* @param conf
- * The configuration which contains the archives
- * @return A URI array of the caches set in the Configuration
+ * @param amContainer
+ * @return
* @throws IOException
*/
- public static URI[] getCacheArchives(Configuration conf) throws IOException {
- return StringUtils.stringToURI(conf
- .getStrings(DragonJobConfig.CACHE_ARCHIVES));
+ public static ApplicationSubmissionContext newApplicationSubmissionContext(
+ ApplicationId applicationId, Configuration conf,
+ ContainerLaunchContext amContainer) throws IOException {
+ ApplicationSubmissionContext appContext = recordFactory
+ .newRecordInstance(ApplicationSubmissionContext.class);
+ appContext.setApplicationId(applicationId);
+ appContext
+ .setUser(UserGroupInformation.getCurrentUser().getShortUserName());
+ appContext.setQueue(conf.get(DragonJobConfig.QUEUE_NAME,
+ YarnConfiguration.DEFAULT_QUEUE_NAME));
+ appContext.setApplicationName(conf.get(DragonJobConfig.JOB_NAME,
+ YarnConfiguration.DEFAULT_APPLICATION_NAME));
+ appContext.setAMContainerSpec(amContainer);
+ return appContext;
}
/**
- * Get cache files set in the Configuration.
*
* @param conf
- * The configuration which contains the files
- * @return A URI array of the files set in the Configuration
- * @throws IOException
+ * @return
*/
- public static URI[] getCacheFiles(Configuration conf) throws IOException {
- return StringUtils
- .stringToURI(conf.getStrings(DragonJobConfig.CACHE_FILES));
+ public static Map<ApplicationAccessType, String>
+ setupACLs(Configuration conf) {
+ Map<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>(
+ 2);
+ acls.put(ApplicationAccessType.VIEW_APP, conf.get(
+ DragonJobConfig.JOB_ACL_VIEW_JOB,
+ DragonJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+ acls.put(ApplicationAccessType.MODIFY_APP, conf.get(
+ DragonJobConfig.JOB_ACL_MODIFY_JOB,
+ DragonJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+ return acls;
}
/**
- * Get the timestamps of the archives.
*
- * @param conf
- * The configuration which stored the timestamps
- * @return a string array of timestamps
+ * @param fs
+ * @param p
+ * @return
* @throws IOException
*/
- public static long[] getArchiveTimestamps(Configuration conf) {
- return parseTimeStamps(conf
- .getStrings(DragonJobConfig.CACHE_ARCHIVES_TIMESTAMPS));
+ public static LocalResource createApplicationResource(FileContext fs, Path p)
+ throws IOException {
+ LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
+ FileStatus rsrcStat = fs.getFileStatus(p);
+ rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
+ .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
+ rsrc.setSize(rsrcStat.getLen());
+ rsrc.setTimestamp(rsrcStat.getModificationTime());
+ rsrc.setType(LocalResourceType.FILE);
+ rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
+ return rsrc;
}
/**
- * Get the timestamps of the files.
+ * Setup the memory ApplicationMaster used.
*
* @param conf
- * The configuration which stored the timestamps
- * @return a string array of timestamps
- * @throws IOException
+ * @return
*/
- public static long[] getFileTimestamps(Configuration conf) {
- return parseTimeStamps(conf
- .getStrings(DragonJobConfig.CACHE_FILE_TIMESTAMPS));
+ public static Resource setupResources(Configuration conf) {
+ Resource capability = recordFactory.newRecordInstance(Resource.class);
+ capability.setMemory(conf.getInt(DragonJobConfig.DRAGON_AM_VMEM_MB,
+ DragonJobConfig.DEFAULT_DRAGON_AM_VMEM_MB));
+ return capability;
}
/**
- * Get the file entries in classpath as an array of Path. Used by internal
- * DistributedCache code.
+ * Setup the commands run ApplicationMaster
*
* @param conf
- * Configuration that contains the classpath setting
+ * @return
*/
- public static Path[] getFileClassPaths(Configuration conf) {
- ArrayList<String> list = (ArrayList<String>) conf
- .getStringCollection(DragonJobConfig.CLASSPATH_FILES);
- if (list.size() == 0) {
- return null;
- }
- Path[] paths = new Path[list.size()];
- for (int i = 0; i < list.size(); i++) {
- paths[i] = new Path(list.get(i));
+ public static List<String> setupCommands(Configuration conf) {
+ Vector<CharSequence> vargs = new Vector<CharSequence>(8);
+ vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+ addLog4jSystemProperties(conf, vargs);
+ vargs.add(conf.get(DragonJobConfig.DRAGON_AM_COMMAND_OPTS,
+ DragonJobConfig.DEFAULT_DRAGON_AM_COMMAND_OPTS));
+ vargs.add(DragonJobConfig.APPLICATION_MASTER_CLASS);
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
+ + Path.SEPARATOR + ApplicationConstants.STDERR);
+ // Final commmand
+ StringBuilder mergedCommand = new StringBuilder();
+ for (CharSequence str : vargs) {
+ mergedCommand.append(str).append(" ");
}
- return paths;
+ List<String> vargsFinal = new ArrayList<String>(8);
+ vargsFinal.add(mergedCommand.toString());
+ return vargsFinal;
}
/**
- * Get the archive entries in classpath as an array of Path. Used by internal
- * DistributedCache code.
- *
- * @param conf
- * Configuration that contains the classpath setting
- */
- public static Path[] getArchiveClassPaths(Configuration conf) {
- ArrayList<String> list = (ArrayList<String>) conf
- .getStringCollection(DragonJobConfig.CLASSPATH_ARCHIVES);
- if (list.size() == 0) {
- return null;
- }
- Path[] paths = new Path[list.size()];
- for (int i = 0; i < list.size(); i++) {
- paths[i] = new Path(list.get(i));
- }
- return paths;
- }
-
- private static long[] getFileSizes(Configuration conf, String key) {
- String[] strs = conf.getStrings(key);
- if (strs == null) {
- return null;
- }
- long[] result = new long[strs.length];
- for (int i = 0; i < strs.length; ++i) {
- result[i] = Long.parseLong(strs[i]);
- }
- return result;
- }
-
- /**
- * Get the booleans on whether the files are public or not. Used by internal
- * DistributedCache and MapReduce code.
- *
- * @param conf
- * The configuration which stored the timestamps
- * @return a string array of booleans
- * @throws IOException
- */
- public static boolean[] getFileVisibilities(Configuration conf) {
- return parseBooleans(conf
- .getStrings(DragonJobConfig.CACHE_FILE_VISIBILITIES));
- }
-
- /**
- * Get the booleans on whether the archives are public or not. Used by
- * internal DistributedCache and MapReduce code.
+ * Add the JVM system properties necessary to configure
+ * {@link ContainerLogAppender}.
*
* @param conf
- * The configuration which stored the timestamps
- * @return a string array of booleans
+ * @param vargs
*/
- public static boolean[] getArchiveVisibilities(Configuration conf) {
- return parseBooleans(conf
- .getStrings(DragonJobConfig.CACHE_ARCHIVES_VISIBILITIES));
- }
-
- private static long[] parseTimeStamps(String[] strs) {
- if (null == strs) {
- return null;
- }
- long[] result = new long[strs.length];
- for (int i = 0; i < strs.length; ++i) {
- result[i] = Long.parseLong(strs[i]);
- }
- return result;
- }
-
- private static boolean[] parseBooleans(String[] strs) {
- if (null == strs) {
- return null;
- }
- boolean[] result = new boolean[strs.length];
- for (int i = 0; i < strs.length; ++i) {
- result[i] = Boolean.parseBoolean(strs[i]);
- }
- return result;
+ public static void addLog4jSystemProperties(Configuration conf,
+ Vector<CharSequence> vargs) {
+ long logSize = conf.getLong(DragonJobConfig.TASK_USERLOG_LIMIT, 0) * 1024;
+ String logLevel = conf.get(DragonJobConfig.DRAGON_AM_LOG_LEVEL,
+ DragonJobConfig.DEFAULT_DRAGON_AM_LOG_LEVEL);
+ vargs.add("-Dlog4j.configuration=container-log4j.properties");
+ vargs.add("-D" + DragonJobConfig.TASK_LOG_DIR + "="
+ + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ vargs.add("-D" + DragonJobConfig.TASK_LOG_SIZE + "=" + logSize);
+ vargs.add("-Dhadoop.root.logger=" + logLevel + ",CLA");
}
}
View
2  hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/DragonConfig.java
@@ -41,5 +41,7 @@
*/
public static final String LOCAL_CACHE_KEEP_AROUND_PCT =
"dragon.cache.local.keep.pct";
+ public static final String SYSTEM_DIR = null;
+ public static final String STAGING_AREA_ROOT = null;
}
View
8 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/ResourceMgrDelegate.java
@@ -136,7 +136,6 @@ public JobId getNewJobID() throws IOException, InterruptedException {
public String getSystemDir() throws IOException, InterruptedException {
Path sysDir = new Path(DragonJobConfig.JOB_SUBMIT_DIR);
- // FileContext.getFileContext(conf).delete(sysDir, true);
return sysDir.toString();
}
@@ -175,11 +174,12 @@ public ApplicationReport getApplicationReport(ApplicationId appId)
public ApplicationId getApplicationId() {
return applicationId;
}
-
+ private static final String STAGING_CONSTANT = ".staging";
public String getStagingAreaDir() throws IOException, InterruptedException {
- // Path path = new Path(MRJobConstants.JOB_SUBMIT_DIR);
String user = UserGroupInformation.getCurrentUser().getShortUserName();
- Path path = DragonApps.getStagingAreaDir(conf, user);
+ Path path= new Path(
+ conf.get(DragonJobConfig.DRAGON_AM_STAGING_DIR) +
+ Path.SEPARATOR + user + Path.SEPARATOR + STAGING_CONSTANT);
LOG.debug("getStagingAreaDir: dir=" + path);
return path.toString();
}
View
162 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/YarnRunner.java
@@ -1,53 +1,55 @@
/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
*
*/
package org.apache.hadoop.realtime;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Vector;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.realtime.protocol.records.GetJobReportRequest;
import org.apache.hadoop.realtime.records.JobId;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.YarnException;
-import org.apache.hadoop.yarn.api.ApplicationConstants;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
-import org.apache.hadoop.yarn.util.ConverterUtils;
public class YarnRunner implements DragonJobService {
private static final Log LOG = LogFactory.getLog(YarnRunner.class);
-
- private final RecordFactory recordFactory = RecordFactoryProvider
- .getRecordFactory(null);
private ResourceMgrDelegate resMgrDelegate;
private Configuration conf;
private final FileContext defaultFileContext;
@@ -96,13 +98,11 @@ public boolean submitJob(JobId jobId, String jobSubmitDir, Credentials ts)
} catch (IOException e) {
throw new YarnException(e);
}
- // Construct necessary information to start the MR AM
+ // Construct necessary information to start the Dragon AM
ApplicationSubmissionContext appContext = createApplicationSubmissionContext(
- conf, jobSubmitDir, ts);
-
+ jobSubmitDir, ts);
// Submit to ResourceManager
ApplicationId applicationId = resMgrDelegate.submitApplication(appContext);
-
ApplicationReport appMaster = resMgrDelegate
.getApplicationReport(applicationId);
String diagnostics = (appMaster == null ? "application report is null"
@@ -112,7 +112,6 @@ public boolean submitJob(JobId jobId, String jobSubmitDir, Credentials ts)
|| appMaster.getYarnApplicationState() == YarnApplicationState.KILLED) {
throw new IOException("Failed to run job : " + diagnostics);
}
-
return true;
}
@@ -127,51 +126,30 @@ public String getStagingAreaDir() throws IOException, InterruptedException {
}
public ApplicationSubmissionContext createApplicationSubmissionContext(
- Configuration jobConf, String jobSubmitDir, Credentials ts)
- throws IOException {
+ String jobSubmitDir, Credentials ts) throws IOException {
ApplicationId applicationId = resMgrDelegate.getApplicationId();
- // Setup resource requirements
- Resource capability = recordFactory.newRecordInstance(Resource.class);
- capability.setMemory(conf.getInt(DragonJobConfig.DRAGON_AM_VMEM_MB,
- DragonJobConfig.DEFAULT_DRAGON_AM_VMEM_MB));
- LOG.debug("AppMaster capability = " + capability);
+ // Setup capability
+ Resource capability = DragonApps.setupResources(conf);
// Setup LocalResources
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+ // JonConf
Path jobConfPath = new Path(jobSubmitDir, DragonJobConfig.JOB_CONF_FILE);
-
- URL yarnUrlForJobSubmitDir = ConverterUtils
- .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem()
- .resolvePath(
- defaultFileContext.makeQualified(new Path(jobSubmitDir))));
- LOG.debug("Creating setup context, jobSubmitDir url is "
- + yarnUrlForJobSubmitDir);
-
localResources.put(DragonJobConfig.JOB_CONF_FILE,
- createApplicationResource(defaultFileContext, jobConfPath));
- if (jobConf.get(DragonJobConfig.JAR) != null) {
- localResources.put(
- DragonJobConfig.JOB_JAR,
- createApplicationResource(defaultFileContext, new Path(jobSubmitDir,
+ DragonApps.createApplicationResource(defaultFileContext, jobConfPath));
+
+ // JobJar
+ if (conf.get(DragonJobConfig.JAR) != null) {
+ localResources.put(DragonJobConfig.JOB_JAR, DragonApps
+ .createApplicationResource(defaultFileContext, new Path(jobSubmitDir,
DragonJobConfig.JOB_JAR)));
} else {
- // Job jar may be null. For e.g, for pipes, the job jar is the hadoop
- // mapreduce jar itself which is already on the classpath.
LOG.info("Job jar is not present. "
+ "Not adding any jar to the list of resources.");
}
- // TODO gross hack
- for (String s : new String[] { DragonJobConfig.JOB_METAINFO,
- DragonJobConfig.APPLICATION_TOKENS_FILE }) {
- localResources.put(
- DragonJobConfig.JOB_SUBMIT_DIR + "/" + s,
- createApplicationResource(defaultFileContext, new Path(jobSubmitDir,
- s)));
- }
-
// Setup security tokens
ByteBuffer securityTokens = null;
if (UserGroupInformation.isSecurityEnabled()) {
@@ -179,90 +157,26 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
ts.writeTokenStorageToStream(dob);
securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
}
-
// Setup the command to run the AM
- List<String> vargs = new ArrayList<String>(8);
- vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
+ List<String> vargs = DragonApps.setupCommands(conf);
- // TODO: why do we use 'conf' some places and 'jobConf' others?
- long logSize = conf.getLong(DragonJobConfig.TASK_USERLOG_LIMIT, 0) * 1024;
- String logLevel = jobConf.get(DragonJobConfig.DRAGON_AM_LOG_LEVEL,
- DragonJobConfig.DEFAULT_DRAGON_AM_LOG_LEVEL);
- DragonApps.addLog4jSystemProperties(logLevel, logSize, vargs);
-
- vargs.add(conf.get(DragonJobConfig.DRAGON_AM_COMMAND_OPTS,
- DragonJobConfig.DEFAULT_DRAGON_AM_COMMAND_OPTS));
-
- vargs.add(DragonJobConfig.APPLICATION_MASTER_CLASS);
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + Path.SEPARATOR + ApplicationConstants.STDOUT);
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR
- + Path.SEPARATOR + ApplicationConstants.STDERR);
-
- Vector<String> vargsFinal = new Vector<String>(8);
- // Final commmand
- StringBuilder mergedCommand = new StringBuilder();
- for (CharSequence str : vargs) {
- mergedCommand.append(str).append(" ");
- }
- vargsFinal.add(mergedCommand.toString());
-
- LOG.debug("Command to launch container for ApplicationMaster is : "
- + mergedCommand);
+ // Setup environment
+ Map<String, String> environment = new HashMap<String, String>();
// Setup the CLASSPATH in environment
- // i.e. add { Hadoop jars, job jar, CWD } to classpath.
- Map<String, String> environment = new HashMap<String, String>();
DragonApps.setClasspath(environment, conf);
- // Parse distributed cache
- DragonApps.setupDistributedCache(jobConf, localResources);
-
- Map<ApplicationAccessType, String> acls = new HashMap<ApplicationAccessType, String>(
- 2);
- acls.put(ApplicationAccessType.VIEW_APP, jobConf.get(
- DragonJobConfig.JOB_ACL_VIEW_JOB,
- DragonJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
- acls.put(ApplicationAccessType.MODIFY_APP, jobConf.get(
- DragonJobConfig.JOB_ACL_MODIFY_JOB,
- DragonJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+ // Setup the ACLs
+ Map<ApplicationAccessType, String> acls = DragonApps.setupACLs(conf);
- // Setup ContainerLaunchContext for AM container
+ // Generate ContainerLaunchContext for AM container
ContainerLaunchContext amContainer = BuilderUtils
.newContainerLaunchContext(null, UserGroupInformation.getCurrentUser()
.getShortUserName(), capability, localResources, environment,
- vargsFinal, null, securityTokens, acls);
-
- // Set up the ApplicationSubmissionContext
- ApplicationSubmissionContext appContext = recordFactory
- .newRecordInstance(ApplicationSubmissionContext.class);
- appContext.setApplicationId(applicationId); // ApplicationId
- appContext.setUser( // User name
- UserGroupInformation.getCurrentUser().getShortUserName());
- appContext.setQueue( // Queue name
- jobConf.get(DragonJobConfig.QUEUE_NAME,
- YarnConfiguration.DEFAULT_QUEUE_NAME));
- appContext.setApplicationName( // Job name
- jobConf.get(DragonJobConfig.JOB_NAME,
- YarnConfiguration.DEFAULT_APPLICATION_NAME));
- appContext.setCancelTokensWhenComplete(conf.getBoolean(
- DragonJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true));
- appContext.setAMContainerSpec(amContainer); // AM Container
-
- return appContext;
- }
+ vargs, null, securityTokens, acls);
- private LocalResource createApplicationResource(FileContext fs, Path p)
- throws IOException {
- LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class);
- FileStatus rsrcStat = fs.getFileStatus(p);
- rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs
- .getDefaultFileSystem().resolvePath(rsrcStat.getPath())));
- rsrc.setSize(rsrcStat.getLen());
- rsrc.setTimestamp(rsrcStat.getModificationTime());
- rsrc.setType(LocalResourceType.FILE);
- rsrc.setVisibility(LocalResourceVisibility.APPLICATION);
- return rsrc;
+ return DragonApps.newApplicationSubmissionContext(applicationId, conf,
+ amContainer);
}
}
View
97 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Job.java
@@ -17,57 +17,74 @@
*/
package org.apache.hadoop.realtime.job;
+import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.realtime.records.JobId;
+import org.apache.hadoop.realtime.records.TaskId;
import org.apache.hadoop.security.Credentials;
/**
*/
public interface Job {
- /**
- * Get the unique ID for the job.
- *
- * @return the object with the job id
- */
- public JobId getJobId();
+ /**
+ * Get the unique ID for the job.
+ *
+ * @return the object with the job id
+ */
+ public JobId getJobId();
+
+ /**
+ * Get the user-specified job name. This is only used to identify the job to
+ * the user.
+ *
+ * @return the job's name, defaulting to "".
+ */
+ public String getJobName();
+
+ /**
+ * Get the user-specified queue name. This is only used to identify the job to
+ * the user.
+ *
+ * @return the job's name, defaulting to "".
+ */
+ public String getQueue();
+
+ /**
+ * Get credentials for the job.
+ *
+ * @return credentials for the job
+ */
+ public Credentials getCredentials();
- /**
- * Get the user-specified job name. This is only used to identify the job to
- * the user.
- *
- * @return the job's name, defaulting to "".
- */
- public String getJobName();
+ /**
+ * Return the configuration for the job.
+ *
+ * @return the shared configuration object
+ */
+ public Configuration getConfiguration();
- /**
- * Get the user-specified queue name. This is only used to identify the job to
- * the user.
- *
- * @return the job's name, defaulting to "".
- */
- public String getQueue();
+ /**
+ * Get the reported username for this job.
+ *
+ * @return the username
+ */
+ public String getUser();
- /**
- * Get credentials for the job.
- *
- * @return credentials for the job
- */
- public Credentials getCredentials();
+ /**
+ * Get all tasks of this job
+ *
+ * @return the list of task
+ */
+ public List<Task> getTasks();
- /**
- * Return the configuration for the job.
- *
- * @return the shared configuration object
- */
- public Configuration getConfiguration();
+ /**
+ * Get task with given taskId
+ *
+ * @param taskID
+ * @return
+ */
+ public Task getTask(TaskId taskId);
- /**
- * Get the reported username for this job.
- *
- * @return the username
- */
- public String getUser();
-
-
}
View
6 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Mission.java
@@ -0,0 +1,6 @@
+package org.apache.hadoop.realtime.job;
+
+public interface Mission {
+
+ public void run();
+}
View
34 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/Task.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.realtime.job;
+
+import java.util.Map;
+import org.apache.hadoop.realtime.records.TaskAttemptId;
+import org.apache.hadoop.realtime.records.TaskId;
+
+public interface Task {
+
+ TaskId getId();
+
+ Map<TaskAttemptId, TaskAttempt> getAttempts();
+
+ TaskAttempt getAttempt(TaskAttemptId attemptId);
+
+ Mission getMisstion();
+
+}
View
27 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/job/TaskAttempt.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.realtime.job;
+
+import java.util.List;
+import org.apache.hadoop.realtime.records.TaskAttemptId;
+
+public interface TaskAttempt {
+ TaskAttemptId getId();
+
+ List<String> getDiagnostics();
+}
View
83 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/local/LocalJob.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.realtime.local;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.realtime.job.Job;
+import org.apache.hadoop.realtime.job.Task;
+import org.apache.hadoop.realtime.records.JobId;
+import org.apache.hadoop.realtime.records.TaskId;
+import org.apache.hadoop.security.Credentials;
+
+public class LocalJob implements Job {
+
+ public LocalJob(Configuration conf) {
+
+ }
+
+ @Override
+ public JobId getJobId() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getJobName() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getQueue() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Credentials getCredentials() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public String getUser() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public List<Task> getTasks() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Task getTask(TaskId taskId) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
View
256 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/local/LocalJobRunner.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.realtime.local;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.realtime.DragonConfig;
+import org.apache.hadoop.realtime.DragonJobService;
+import org.apache.hadoop.realtime.job.Job;
+import org.apache.hadoop.realtime.job.Task;
+import org.apache.hadoop.realtime.records.JobId;
+import org.apache.hadoop.realtime.records.TaskAttemptId;
+import org.apache.hadoop.realtime.records.TaskId;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+public class LocalJobRunner implements DragonJobService {
+ private static final Log LOG = LogFactory.getLog(LocalJobRunner.class);
+
+ /** The maximum number of map tasks to run in parallel in LocalJobRunner */
+ public static final String LOCAL_MAX_TASKS = "dragon.local.tasks.maximum";
+ private static final String jobDir = "localRunner/";
+
+ private Configuration conf;
+ private JobId jobId;
+ private ApplicationId appId;
+ private Job job;
+ private LocalJobRunnerMetrics localMetrics;
+
+ private FileSystem fs;
+ final Random rand = new Random();
+
+ public LocalJobRunner(Configuration conf) throws IOException {
+ this.conf = conf;
+ this.fs = FileSystem.getLocal(conf);
+ }
+
+ @Override
+ public JobId getNewJobId() throws IOException, InterruptedException {
+ jobId = new JobId(appId);
+ return jobId;
+ }
+
+ @Override
+ public boolean submitJob(JobId jobId, String jobSubmitDir, Credentials ts)
+ throws IOException, InterruptedException {
+ job = new LocalJob(conf);
+
+ return false;
+ }
+
+ @Override
+ public String getSystemDir() throws IOException, InterruptedException {
+ Path sysDir = new Path(conf.get(DragonConfig.SYSTEM_DIR,
+ "/tmp/hadoop/dragon/system"));
+ return fs.makeQualified(sysDir).toString();
+ }
+
+ @Override
+ public String getStagingAreaDir() throws IOException, InterruptedException {
+ Path stagingRootDir = new Path(conf.get(DragonConfig.STAGING_AREA_ROOT,
+ "/tmp/hadoop/mapred/staging"));
+ UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+ String user;
+ if (ugi != null) {
+ user = ugi.getShortUserName() + rand.nextInt();
+ } else {
+ user = "dummy" + rand.nextInt();
+ }
+ return fs.makeQualified(new Path(stagingRootDir, user + "/.staging"))
+ .toString();
+ }
+
+ protected class JobRunnable implements Runnable {
+ private Job job;
+
+ public JobRunnable(Job job) {
+ this.job = job;
+ }
+
+ @Override
+ public void run() {
+ try {
+ List<TaskRunnable> taskRunnables = getTaskRunnables(job, jobId);
+ ExecutorService taskService = createTaskExecutor(taskRunnables.size());
+ // Start populating the executor with work units.
+ // They may begin running immediately (in other threads).
+ for (Runnable r : taskRunnables) {
+ taskService.submit(r);
+ }
+ try {
+ taskService.shutdown(); // Instructs queue to drain.
+ // Wait for tasks to finish; do not use a time-based timeout.
+ // (See http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6179024)
+ LOG.info("Waiting for tasks");
+ taskService.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
+ } catch (InterruptedException ie) {
+ // Cancel all threads.
+ taskService.shutdownNow();
+ throw ie;
+ }
+ LOG.info("Task executor complete.");
+ for (TaskRunnable r : taskRunnables) {
+ if (r.storedException != null) {
+ throw new Exception(r.storedException);
+ }
+ }
+ } catch (Throwable t) {
+ LOG.warn(jobId, t);
+ } finally {
+ try {
+ // TODO: clean workDir
+ } catch (Exception e) {
+ LOG.warn("Error cleaning up " + jobId + ": " + e);
+ }
+ }
+
+ }
+ }
+
+ /**
+ * Create Runnables to encapsulate tasks for use by the executor service.
+ *
+ * @param jobId
+ * the job id
+ * @param mapOutputFiles
+ * a mapping from task attempts to output files
+ * @return a List of Runnables, one per task.
+ */
+ protected List<TaskRunnable> getTaskRunnables(Job job, JobId jobId) {
+ int numTasks = 0;
+ ArrayList<TaskRunnable> list = new ArrayList<TaskRunnable>();
+ for (Task task : job.getTasks()) {
+ list.add(new TaskRunnable(task, numTasks++, jobId));
+ }
+ return list;
+ }
+
+ /**
+ * Creates the executor service used to run tasks.
+ *
+ * @param numTasks
+ * the total number of map tasks to be run
+ * @return an ExecutorService instance that handles map tasks
+ */
+ protected ExecutorService createTaskExecutor(int numTasks) {
+ // Determine the size of the thread pool to use
+ int maxThreads = conf.getInt(LOCAL_MAX_TASKS, 1);
+ if (maxThreads < 1) {
+ throw new IllegalArgumentException("Configured " + LOCAL_MAX_TASKS
+ + " must be >= 1");
+ }
+ maxThreads = Math.min(maxThreads, numTasks);
+ maxThreads = Math.max(maxThreads, 1); // In case of no tasks.
+
+ LOG.debug("Starting thread pool executor.");
+ LOG.debug("Max local threads: " + maxThreads);
+ LOG.debug("Tasks to process: " + numTasks);
+
+ // Create a new executor service to drain the work queue.
+ ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
+ "LocalJobRunner Task Executor #%d").build();
+ ExecutorService executor = Executors.newFixedThreadPool(maxThreads, tf);
+ return executor;
+ }
+
+ /**
+ * A Runnable instance that handles a map task to be run by an executor.
+ */
+ protected class TaskRunnable implements Runnable {
+ private final Task task;
+ private final int taskId;
+ private final JobId jobId;
+ private final Configuration localConf;
+
+ public volatile Throwable storedException;
+
+ public TaskRunnable(Task task, int taskId, JobId jobId) {
+ this.task = task;
+ this.taskId = taskId;
+ this.jobId = jobId;
+ this.localConf = new Configuration(conf);
+ }
+
+ public void run() {
+ try {
+ TaskAttemptId attemptId = new TaskAttemptId(new TaskId(jobId, taskId),
+ 0);
+ LOG.info("Starting task: " + taskId);
+ setupChildMapredLocalDirs(task, localConf);
+ localMetrics.launchTask(attemptId);
+ task.getMisstion().run();
+ localMetrics.completeTask(attemptId);
+ LOG.info("Finishing task: " + taskId);
+ } catch (Throwable e) {
+ this.storedException = e;
+ }
+ }
+
+ }
+
+ void setupChildMapredLocalDirs(Task t, Configuration conf) {
+ String[] localDirs = conf.getTrimmedStrings(DragonConfig.LOCAL_DIR);
+ String jobId = t.getId().getJobId().toString();
+ String taskId = t.getId().toString();
+ String user = job.getUser();
+ StringBuffer childMapredLocalDir = new StringBuffer(localDirs[0]
+ + Path.SEPARATOR + getLocalTaskDir(user, jobId, taskId));
+ for (int i = 1; i < localDirs.length; i++) {
+ childMapredLocalDir.append("," + localDirs[i] + Path.SEPARATOR
+ + getLocalTaskDir(user, jobId, taskId));
+ }
+ LOG.debug(DragonConfig.LOCAL_DIR + " for child : " + childMapredLocalDir);
+ conf.set(DragonConfig.LOCAL_DIR, childMapredLocalDir.toString());
+ }
+
+ static final String SUBDIR = jobDir;
+ static final String JOBCACHE = "jobcache";
+
+ static String getLocalTaskDir(String user, String jobid, String taskid) {
+ String taskDir = SUBDIR + Path.SEPARATOR + user + Path.SEPARATOR + JOBCACHE
+ + Path.SEPARATOR + jobid + Path.SEPARATOR + taskid;
+ return taskDir;
+ }
+
+}
View
72 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/local/LocalJobRunnerMetrics.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.realtime.local;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.realtime.records.JobId;
+import org.apache.hadoop.realtime.records.TaskAttemptId;
+
+class LocalJobRunnerMetrics implements Updater {
+ private final MetricsRecord metricsRecord;
+
+ private int numTasksLaunched = 0;
+ private int numTasksCompleted = 0;
+ private int numWaitingTasks = 0;
+
+ public LocalJobRunnerMetrics(Configuration conf) {
+ MetricsContext context = MetricsUtil.getContext("dragon");
+ metricsRecord = MetricsUtil.createRecord(context, "jobtracker");
+ context.registerUpdater(this);
+ }
+
+ /**
+ * Since this object is a registered updater, this method will be called
+ * periodically, e.g. every 5 seconds.
+ */
+ public void doUpdates(MetricsContext unused) {
+ synchronized (this) {
+ metricsRecord.incrMetric("task_launched", numTasksLaunched);
+ metricsRecord.incrMetric("task_completed", numTasksCompleted);
+ metricsRecord.incrMetric("waiting_tasks", numWaitingTasks);
+
+ numTasksLaunched = 0;
+ numTasksCompleted = 0;
+ numWaitingTasks = 0;
+ }
+ metricsRecord.update();
+ }
+
+ public synchronized void launchTask(TaskAttemptId taskAttemptId) {
+ ++numTasksLaunched;
+ decWaitingTasks(taskAttemptId.getTaskId().getJobId(), 1);
+ }
+
+ public synchronized void completeTask(TaskAttemptId taskAttemptId) {
+ ++numTasksCompleted;
+ }
+
+ private synchronized void decWaitingTasks(JobId id, int task) {
+ numWaitingTasks -= task;
+ }
+
+}
View
22 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/records/JobId.java
@@ -23,13 +23,27 @@
*/
public class JobId {
- private ApplicationId applicationId;
+ private ApplicationId appId;
+ private int id;
public JobId(ApplicationId applicationId) {
- this.applicationId = applicationId;
+ this.appId = applicationId;
}
- public ApplicationId getApplicationId() {
- return applicationId;
+ public ApplicationId getAppId() {
+ return appId;
}
+
+ public void setAppId(ApplicationId appId) {
+ this.appId = appId;
+ }
+
+ public int getId() {
+ return id;
+ }
+
+ public void setId(int id) {
+ this.id = id;
+ }
+
}
View
21 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/records/TaskAttemptId.java
@@ -20,5 +20,26 @@
public class TaskAttemptId {
+ private TaskId taskId;
+ private int id;
+
+ public TaskAttemptId(TaskId taskId,int attemptId){
+
+ }
+
+ public int getId() {
+ return id;
+ }
+ public void setId(int id) {
+ this.id = id;
+ }
+ public void setTaskId(TaskId taskId) {
+ this.taskId = taskId;
+ }
+
+ public TaskId getTaskId(){
+ return taskId;
+ }
+
}
View
17 hadoop-dragon-core/src/main/java/org/apache/hadoop/realtime/records/TaskId.java
@@ -21,4 +21,21 @@
*/
public class TaskId {
+ private JobId jobId;
+ private int id;
+
+ public int getId() {
+ return id;
+ }
+ public void setId(int id) {
+ this.id = id;
+ }
+ public void setJobId(JobId jobId) {
+ this.jobId = jobId;
+ }
+ public TaskId(JobId jobId,int id){
+ }
+ public JobId getJobId(){
+ return jobId;
+ }
}
View
147 hadoop-dragon-core/src/test/java/org/apache/hadoop/realtime/TestYarnRunner.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.realtime;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.realtime.records.JobId;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.api.ClientRMProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test YarnRunner and make sure the client side plugin works
+ * fine
+ */
+public class TestYarnRunner extends TestCase {
+ private static final Log LOG = LogFactory.getLog(TestYarnRunner.class);
+ private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ private YarnRunner yarnRunner;
+ private ResourceMgrDelegate resourceMgrDelegate;
+ private YarnConfiguration conf;
+ private ApplicationId appId;
+ private JobId jobId;
+ private File testWorkDir =
+ new File("target", TestYarnRunner.class.getName());
+ private ApplicationSubmissionContext submissionContext;
+ private static final String failString = "Rejected job";
+
+ @Before
+ public void setUp() throws Exception {
+ resourceMgrDelegate = mock(ResourceMgrDelegate.class);
+ conf = new YarnConfiguration();
+ yarnRunner = new YarnRunner(conf, resourceMgrDelegate);
+ yarnRunner = spy(yarnRunner);
+ submissionContext = mock(ApplicationSubmissionContext.class);
+ doAnswer(
+ new Answer<ApplicationSubmissionContext>() {
+ @Override
+ public ApplicationSubmissionContext answer(InvocationOnMock invocation)
+ throws Throwable {
+ return submissionContext;
+ }
+ }
+ ).when(yarnRunner).createApplicationSubmissionContext(
+ any(String.class), any(Credentials.class));
+
+ appId = recordFactory.newRecordInstance(ApplicationId.class);
+ appId.setClusterTimestamp(System.currentTimeMillis());
+ appId.setId(1);
+ jobId = new JobId(appId);
+ if (testWorkDir.exists()) {
+ FileContext.getLocalFSFileContext().delete(new Path(testWorkDir.toString()), true);
+ }
+ testWorkDir.mkdirs();
+ }
+
+ @Test
+ public void testResourceMgrDelegate() throws Exception {
+ /* we not want a mock of resourcemgr deleagte */
+ ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class);
+ ResourceMgrDelegate delegate = new ResourceMgrDelegate(conf, clientRMProtocol);
+ /* make sure kill calls finish application master */
+ when(clientRMProtocol.forceKillApplication(any(KillApplicationRequest.class)))
+ .thenReturn(null);
+ delegate.killApplication(appId);
+ verify(clientRMProtocol).forceKillApplication(any(KillApplicationRequest.class));
+
+
+ /* make sure getapplication report is called */
+ when(clientRMProtocol.getApplicationReport(any(GetApplicationReportRequest.class)))
+ .thenReturn(recordFactory.newRecordInstance(GetApplicationReportResponse.class));
+ delegate.getApplicationReport(appId);
+ verify(clientRMProtocol).getApplicationReport(any(GetApplicationReportRequest.class));
+
+ /* make sure metrics is called */
+ GetClusterMetricsResponse clusterMetricsResponse = recordFactory.newRecordInstance
+ (GetClusterMetricsResponse.class);
+ clusterMetricsResponse.setClusterMetrics(recordFactory.newRecordInstance(
+ YarnClusterMetrics.class));
+ when(clientRMProtocol.getClusterMetrics(any(GetClusterMetricsRequest.class)))
+ .thenReturn(clusterMetricsResponse);
+ delegate.getClusterMetrics();
+ verify(clientRMProtocol).getClusterMetrics(any(GetClusterMetricsRequest.class));
+
+
+ GetNewApplicationResponse newAppResponse = recordFactory.newRecordInstance(
+ GetNewApplicationResponse.class);
+ newAppResponse.setApplicationId(appId);
+ when(clientRMProtocol.getNewApplication(any(GetNewApplicationRequest.class))).
+ thenReturn(newAppResponse);
+ delegate.getNewJobID();
+ verify(clientRMProtocol).getNewApplication(any(GetNewApplicationRequest.class));
+
+
+ }
+}
Please sign in to comment.
Something went wrong with that request. Please try again.