Skip to content

Commit

Permalink
OOZIE-2916 Set a job name for the MR Action's child job (asasvari)
Browse files Browse the repository at this point in the history
  • Loading branch information
asasvari committed Sep 11, 2017
1 parent efc7a82 commit 5e1c9d3
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 66 deletions.
Expand Up @@ -147,6 +147,7 @@ public class JavaActionExecutor extends ActionExecutor {
private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
private static final String HADOOP_JOB_NAME = "mapred.job.name";
private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
private static final String OOZIE_ACTION_NAME = "oozie.action.name";

private static int maxActionOutputLen;
private static int maxExternalStatsSize;
Expand Down Expand Up @@ -944,6 +945,7 @@ public void submitLauncher(FileSystem actionFs, final Context context, WorkflowA

// action job configuration
Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
addAppNameContext(action, context);
setupActionConf(actionConf, context, actionXml, appPathRoot);
LOG.debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
Expand Down Expand Up @@ -1072,6 +1074,19 @@ public void submitLauncher(FileSystem actionFs, final Context context, WorkflowA
}
}

protected void addAppNameContext(WorkflowAction action, Context context) {
String oozieActionName = String.format("oozie:launcher:T=%s:W=%s:A=%s:ID=%s",
getType(),
context.getWorkflow().getAppName(),
action.getName(),
context.getWorkflow().getId());
context.setVar(OOZIE_ACTION_NAME, oozieActionName);
}

protected String getAppName(Context context) {
return context.getVar(OOZIE_ACTION_NAME);
}

private Credentials ensureCredentials(final Credentials credentials) {
if (credentials == null) {
LOG.debug("No credentials present, creating a new one.");
Expand Down Expand Up @@ -1129,13 +1144,10 @@ private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId ap

ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);

String jobName = XLog.format(
"oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
context.getWorkflow().getAppName(), actionName,
context.getWorkflow().getId());
String appName = getAppName(context);

appContext.setApplicationId(appId);
appContext.setApplicationName(jobName);
appContext.setApplicationName(appName);
appContext.setApplicationType("Oozie Launcher");
Priority pri = Records.newRecord(Priority.class);
int priority = 0; // TODO: OYA: Add a constant or a config
Expand Down
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand All @@ -50,6 +51,7 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
public static final String OOZIE_MAPREDUCE_UBER_JAR_ENABLE = "oozie.action.mapreduce.uber.jar.enable";
private static final String STREAMING_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.StreamingMain";
public static final String JOB_END_NOTIFICATION_URL = "job.end.notification.url";
private static final String MAPREDUCE_JOB_NAME = "mapreduce.job.name";
private XLog log = XLog.getLog(getClass());

public MapReduceActionExecutor() {
Expand Down Expand Up @@ -161,6 +163,7 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element
regularMR = true;
}
}
setJobName(actionConf, context);
actionConf = super.setupActionConf(actionConf, context, actionXml, appPath);

// For "regular" (not streaming or pipes) MR jobs
Expand Down Expand Up @@ -205,6 +208,13 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element
return actionConf;
}

private void setJobName(Configuration actionConf, Context context) {
String jobName = getAppName(context);
if (jobName != null) {
actionConf.set(MAPREDUCE_JOB_NAME, jobName.replace("oozie:launcher", "oozie:action"));
}
}

@Override
public void end(Context context, WorkflowAction action) throws ActionExecutorException {
super.end(context, action);
Expand Down
Expand Up @@ -33,7 +33,6 @@
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
Expand Down
1 change: 1 addition & 0 deletions release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.0.0 release (trunk - unreleased)

OOZIE-2916 Set a job name for the MR Action's child job (asasvari)
OOZIE-2858 HiveMain, ShellMain and SparkMain should not overwrite properties and config files locally (gezapeti)
OOZIE-3035 HDFS HA and log aggregation: getting HDFS delegation token from YARN renewer within JavaActionExecutor (andras.piros via pbacsko)
OOZIE-3026 fix openjpa enhancer stage during build for logging (dbdist13, andras.piros via pbacsko)
Expand Down
Expand Up @@ -53,6 +53,7 @@
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.streaming.StreamJob;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.util.ConverterUtils;
Expand Down Expand Up @@ -468,7 +469,7 @@ private void _testSubmitWithCredentials(String name, String actionXml) throws Ex
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
Configuration conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
String user = conf.get("user.name");
JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
JobClient jobClient = getHadoopAccessorService().createJobClient(user, conf);
org.apache.hadoop.mapreduce.JobID jobID = TypeConverter.fromYarn(
ConverterUtils.toApplicationId(externalChildIDs));
final RunningJob mrJob = jobClient.getJob(JobID.downgrade(jobID));
Expand Down Expand Up @@ -515,10 +516,7 @@ public void testMapReduce() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
Expand All @@ -535,10 +533,7 @@ public void testMapReduceActionError() throws Exception {
Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt")));
ow.write("dummy\n");
ow.write("dummy\n");
ow.close();
writeDummyInput(fs, outputDir);

String actionXml = "<map-reduce>" +
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
Expand All @@ -562,10 +557,7 @@ public void testMapReduceWithConfigClass() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

Path jobXml = new Path(getFsTestCaseDir(), "action.xml");
XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
Expand All @@ -590,10 +582,7 @@ public void testMapReduceWithConfigClassNotFound() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
Expand All @@ -618,10 +607,7 @@ public void testMapReduceWithConfigClassThrowException() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

XConfiguration conf = getMapReduceConfig(inputDir.toString(), outputDir.toString());
conf.setBoolean("oozie.test.throw.exception", true); // causes OozieActionConfiguratorForTest to throw an exception
Expand All @@ -647,10 +633,7 @@ public void testMapReduceActionKill() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
Expand All @@ -675,10 +658,7 @@ public void testMapReduceWithCredentials() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
Expand Down Expand Up @@ -739,10 +719,7 @@ public void _testMapReduceWithUberJar() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
Expand Down Expand Up @@ -799,6 +776,37 @@ public void testMapReduceWithUberJarDisabled() throws Exception {
}
}

public void testJobNameSetForMapReduceChild() throws Exception {
Services serv = Services.get();
serv.getConf().setBoolean("oozie.action.mapreduce.uber.jar.enable", true);

final FileSystem fs = getFileSystem();
final Path inputDir = new Path(getFsTestCaseDir(), "input");
final Path outputDir = new Path(getFsTestCaseDir(), "output");
writeDummyInput(fs, inputDir);

final String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
+ getMapReduceUberJarConfig(inputDir.toString(), outputDir.toString()).toXmlString(false) + "</map-reduce>";

final String extId = _testSubmit(MAP_REDUCE, actionXml);
final ApplicationId appId = ConverterUtils.toApplicationId(extId);
final Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
final String name = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getName();
assertTrue(name.contains("oozie:action"));
}

private void writeDummyInput(FileSystem fs, Path inputDir) throws IOException {
Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
}

private HadoopAccessorService getHadoopAccessorService() {
return Services.get().get(HadoopAccessorService.class);
}

public void testMapReduceWithUberJarEnabled() throws Exception {
Services serv = Services.get();
boolean originalUberJarDisabled = serv.getConf().getBoolean("oozie.action.mapreduce.uber.jar.enable", false);
Expand Down Expand Up @@ -827,10 +835,7 @@ private void runStreamingWordCountJob(Path inputDir, Path outputDir, XConfigurat
OutputStream os = fs.create(new Path(getAppPath(), streamingJar));
IOUtils.copyStream(is, os);

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + " <streaming>" + " <mapper>cat</mapper>"
Expand Down Expand Up @@ -917,10 +922,7 @@ public void testPipes() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

String actionXml = "<map-reduce>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>" + " <pipes>" + " <program>" + programPath
Expand All @@ -944,10 +946,7 @@ public void testSetExecutionStats_when_user_has_specified_stats_write_TRUE() thr
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

// set user stats write property as true explicitly in the
// configuration.
Expand Down Expand Up @@ -1006,10 +1005,7 @@ public void testSetExecutionStats_when_user_has_specified_stats_write_FALSE() th
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

// set user stats write property as false explicitly in the
// configuration.
Expand Down Expand Up @@ -1062,10 +1058,7 @@ public void testEndWithoutConfiguration() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

// set user stats write property as false explicitly in the
// configuration.
Expand Down Expand Up @@ -1132,11 +1125,7 @@ public void testSetMapredJobName() throws Exception {
Path inputDir = new Path(getFsTestCaseDir(), "input");
Path outputDir = new Path(getFsTestCaseDir(), "output");

Writer w = new OutputStreamWriter(fs.create(new Path(inputDir,
"data.txt")));
w.write("dummy\n");
w.write("dummy\n");
w.close();
writeDummyInput(fs, inputDir);

XConfiguration mrConfig = getMapReduceConfig(inputDir.toString(),
outputDir.toString());
Expand Down Expand Up @@ -1172,8 +1161,8 @@ public void testSetMapredJobName() throws Exception {
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());

Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
final YarnClient yarnClient = Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), conf);
Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
final YarnClient yarnClient = getHadoopAccessorService().createYarnClient(getTestUser(), conf);
ApplicationReport report = yarnClient.getApplicationReport(ConverterUtils.toApplicationId(externalChildIDs));
// Assert Mapred job name has been set
assertEquals(mapredJobName, report.getName());
Expand Down

0 comments on commit 5e1c9d3

Please sign in to comment.