From 5e1c9d362afe1b2c6423a386aeac7f04d3337f65 Mon Sep 17 00:00:00 2001 From: Attila Sasvari Date: Mon, 11 Sep 2017 12:00:58 +0200 Subject: [PATCH] OOZIE-2916 Set a job name for the MR Action's child job (asasvari) --- .../action/hadoop/JavaActionExecutor.java | 22 +++- .../hadoop/MapReduceActionExecutor.java | 10 ++ .../action/hadoop/TestJavaActionExecutor.java | 1 - release-log.txt | 1 + .../hadoop/TestMapReduceActionExecutor.java | 109 ++++++++---------- 5 files changed, 77 insertions(+), 66 deletions(-) diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index bca79aa052..49fd4b8560 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -147,6 +147,7 @@ public class JavaActionExecutor extends ActionExecutor { private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain"; private static final String HADOOP_JOB_NAME = "mapred.job.name"; private static final Set DISALLOWED_PROPERTIES = new HashSet(); + private static final String OOZIE_ACTION_NAME = "oozie.action.name"; private static int maxActionOutputLen; private static int maxExternalStatsSize; @@ -944,6 +945,7 @@ public void submitLauncher(FileSystem actionFs, final Context context, WorkflowA // action job configuration Configuration actionConf = loadHadoopDefaultResources(context, actionXml); + addAppNameContext(action, context); setupActionConf(actionConf, context, actionXml, appPathRoot); LOG.debug("Setting LibFilesArchives "); setLibFilesArchives(context, actionXml, appPathRoot, actionConf); @@ -1072,6 +1074,19 @@ public void submitLauncher(FileSystem actionFs, final Context context, WorkflowA } } + protected void addAppNameContext(WorkflowAction action, Context context) { + String oozieActionName = String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s", + getType(), + context.getWorkflow().getAppName(), + action.getName(), + context.getWorkflow().getId()); + context.setVar(OOZIE_ACTION_NAME, oozieActionName); + } + + protected String getAppName(Context context) { + return context.getVar(OOZIE_ACTION_NAME); + } + private Credentials ensureCredentials(final Credentials credentials) { if (credentials == null) { LOG.debug("No credentials present, creating a new one."); @@ -1129,13 +1144,10 @@ private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId ap ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class); - String jobName = XLog.format( - "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(), - context.getWorkflow().getAppName(), actionName, - context.getWorkflow().getId()); + String appName = getAppName(context); appContext.setApplicationId(appId); - appContext.setApplicationName(jobName); + appContext.setApplicationName(appName); appContext.setApplicationType("Oozie Launcher"); Priority pri = Records.newRecord(Priority.class); int priority = 0; // TODO: OYA: Add a constant or a config diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java index 338e508571..22d5526532 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -50,6 +51,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor { public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable"; private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain"; public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url"; + private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name"; private XLog log = XLog.getLog(getClass()); public MapReduceActionExecutor() { @@ -161,6 +163,7 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element regularMR = true; } } + setJobName(actionConf, context); actionConf = super.setupActionConf(actionConf, context, actionXml, appPath); // For "regular" (not streaming or pipes) MR jobs @@ -205,6 +208,13 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element return actionConf; } + private void setJobName(Configuration actionConf, Context context) { + String jobName = getAppName(context); + if (jobName != null) { + actionConf.set(MAPREDUCE_JOB_NAME, jobName.replace("oozie:launcher", "oozie:action")); + } + } + @Override public void end(Context context, WorkflowAction action) throws ActionExecutorException { super.end(context, action); diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index ce674adf9c..d1d78fd527 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -33,7 +33,6 @@ import java.util.Collections; import java.util.Date; import java.util.EnumSet; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Properties; diff --git a/release-log.txt b/release-log.txt index 82a10aa77a..e1ef0df727 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-2916 Set a job name for the MR Action's child job (asasvari) OOZIE-2858 HiveMain, ShellMain and SparkMain should not overwrite properties and config files locally (gezapeti) OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko) OOZIE-3026 fix openjpa enhancer stage during build for logging (dbdist13, andras.piros via pbacsko) diff --git a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java index 2c92f41f5e..f460b6bd11 100644 --- a/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java +++ b/sharelib/streaming/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionExecutor.java @@ -53,6 +53,7 @@ import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.streaming.StreamJob; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.client.api.YarnClient; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -468,7 +469,7 @@ private void _testSubmitWithCredentials(String name, String actionXml) throws Ex assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml)); String user = conf.get("user.name"); - JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf); + JobClient jobClient = getHadoopAccessorService().createJobClient(user, conf); org.apache.hadoop.mapreduce.JobID jobID = TypeConverter.fromYarn( ConverterUtils.toApplicationId(externalChildIDs)); final RunningJob mrJob = jobClient.getJob(JobID.downgrade(jobID)); @@ -515,10 +516,7 @@ public void testMapReduce() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); String actionXml = "" + "" + getJobTrackerUri() + "" + "" + getNameNodeUri() + "" @@ -535,10 +533,7 @@ public void testMapReduceActionError() throws Exception { Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); w.write("dummy\n"); w.write("dummy\n"); - Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt"))); - ow.write("dummy\n"); - ow.write("dummy\n"); - ow.close(); + writeDummyInput(fs, outputDir); String actionXml = "" + "" + getJobTrackerUri() + "" + @@ -562,10 +557,7 @@ public void testMapReduceWithConfigClass() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); Path jobXml = new Path(getFsTestCaseDir(), "action.xml"); XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString()); @@ -590,10 +582,7 @@ public void testMapReduceWithConfigClassNotFound() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); String actionXml = "" + "" + getJobTrackerUri() + "" + "" + getNameNodeUri() + "" @@ -618,10 +607,7 @@ public void testMapReduceWithConfigClassThrowException() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString()); conf.setBoolean("oozie.test.throw.exception", true); // causes OozieActionConfiguratorForTest to throw an exception @@ -647,10 +633,7 @@ public void testMapReduceActionKill() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); String actionXml = "" + "" + getJobTrackerUri() + "" + "" + getNameNodeUri() + "" @@ -675,10 +658,7 @@ public void testMapReduceWithCredentials() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); String actionXml = "" + "" + getJobTrackerUri() + "" + "" + getNameNodeUri() + "" @@ -739,10 +719,7 @@ public void _testMapReduceWithUberJar() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); String actionXml = "" + "" + getJobTrackerUri() + "" + "" + getNameNodeUri() + "" @@ -799,6 +776,37 @@ public void testMapReduceWithUberJarDisabled() throws Exception { } } + public void testJobNameSetForMapReduceChild() throws Exception { + Services serv = Services.get(); + serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true); + + final FileSystem fs = getFileSystem(); + final Path inputDir = new Path(getFsTestCaseDir(), "input"); + final Path outputDir = new Path(getFsTestCaseDir(), "output"); + writeDummyInput(fs, inputDir); + + final String actionXml = "" + "" + getJobTrackerUri() + "" + "" + + getNameNodeUri() + "" + + getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + ""; + + final String extId = _testSubmit(MAP_REDUCE, actionXml); + final ApplicationId appId = ConverterUtils.toApplicationId(extId); + final Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri()); + final String name = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getName(); + assertTrue(name.contains("oozie:action")); + } + + private void writeDummyInput(FileSystem fs, Path inputDir) throws IOException { + Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); + w.write("dummy\n"); + w.write("dummy\n"); + w.close(); + } + + private HadoopAccessorService getHadoopAccessorService() { + return Services.get().get(HadoopAccessorService.class); + } + public void testMapReduceWithUberJarEnabled() throws Exception { Services serv = Services.get(); boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false); @@ -827,10 +835,7 @@ private void runStreamingWordCountJob(Path inputDir, Path outputDir, XConfigurat OutputStream os = fs.create(new Path(getAppPath(), streamingJar)); IOUtils.copyStream(is, os); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); String actionXml = "" + "" + getJobTrackerUri() + "" + "" + getNameNodeUri() + "" + " " + " cat" @@ -917,10 +922,7 @@ public void testPipes() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); String actionXml = "" + "" + getJobTrackerUri() + "" + "" + getNameNodeUri() + "" + " " + " " + programPath @@ -944,10 +946,7 @@ public void testSetExecutionStats_when_user_has_specified_stats_write_TRUE() thr Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); // set user stats write property as true explicitly in the // configuration. @@ -1006,10 +1005,7 @@ public void testSetExecutionStats_when_user_has_specified_stats_write_FALSE() th Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); // set user stats write property as false explicitly in the // configuration. @@ -1062,10 +1058,7 @@ public void testEndWithoutConfiguration() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); // set user stats write property as false explicitly in the // configuration. @@ -1132,11 +1125,7 @@ public void testSetMapredJobName() throws Exception { Path inputDir = new Path(getFsTestCaseDir(), "input"); Path outputDir = new Path(getFsTestCaseDir(), "output"); - Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, - "data.txt"))); - w.write("dummy\n"); - w.write("dummy\n"); - w.close(); + writeDummyInput(fs, inputDir); XConfiguration mrConfig = getMapReduceConfig(inputDir.toString(), outputDir.toString()); @@ -1172,8 +1161,8 @@ public void testSetMapredJobName() throws Exception { ae.end(context, context.getAction()); assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus()); - Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri()); - final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), conf); + Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri()); + final YarnClient yarnClient = getHadoopAccessorService().createYarnClient(getTestUser(), conf); ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(externalChildIDs)); // Assert Mapred job name has been set assertEquals(mapredJobName, report.getName());