Skip to content

Commit

Permalink
Inject azkaban configurations into hadoop application tags (#1904)
Browse files Browse the repository at this point in the history
Hadoop/YARN has support for application tags, which MR/Spark can include in its job submission. This PR will add some azkaban metadata into MR config so MR can include them in the YARN application tags when it submits to YARN.
  • Loading branch information
hungj authored and kunkun-tang committed Aug 9, 2018
1 parent d2db592 commit 2be255d
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,11 @@ public HadoopHiveJob(String jobid, Props sysProps, Props jobProps, Logger log)

@Override
public void run() throws Exception {
String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+ HadoopJobUtils.MAPREDUCE_JOB_TAGS,
HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,11 @@ private void mergeClassPaths(List<String> classPath,

@Override
public void run() throws Exception {
String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+ HadoopJobUtils.MAPREDUCE_JOB_TAGS,
HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package azkaban.jobtype;

import com.google.common.base.Joiner;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
Expand Down Expand Up @@ -91,6 +93,9 @@ public class HadoopJobUtils {
// MapReduce config for specifying additional namenodes for delegation tokens
public static final String MAPREDUCE_JOB_OTHER_NAMENODES = "mapreduce.job.hdfs-servers";

// MapReduce config for mapreduce job tags
public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";

// Azkaban property for listing additional namenodes for delegation tokens
private static final String OTHER_NAMENODES_PROPERTY = "other_namenodes";

Expand Down Expand Up @@ -593,4 +598,21 @@ public static String javaOptStringFromHadoopConfiguration(Configuration conf, St
}
return String.format("-D%s=%s", key, value);
}

/**
* Construct a CSV of tags for the Hadoop application.
*
* @param List of keys to construct tags from.
* @return a CSV of tags
*/
public static String constructHadoopTags(Props props, String[] keys) {
String[] keysAndValues = new String[keys.length];
for (int i = 0; i < keys.length; i++) {
if (props.containsKey(keys[i])) {
keysAndValues[i] = keys[i] + ":" + props.get(keys[i]);
}
}
Joiner joiner = Joiner.on(',').skipNulls();
return joiner.join(keysAndValues);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public HadoopPigJob(String jobid, Props sysProps, Props jobProps, Logger log)

@Override
public void run() throws Exception {
String[] tagKeys = new String[] { CommonJobProperties.EXEC_ID,
CommonJobProperties.FLOW_ID, CommonJobProperties.PROJECT_NAME };
getJobProps().put(HadoopConfigurationInjector.INJECT_PREFIX
+ HadoopJobUtils.MAPREDUCE_JOB_TAGS,
HadoopJobUtils.constructHadoopTags(getJobProps(), tagKeys));
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(),
getWorkingDirectory());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package azkaban.jobtype;

import static org.assertj.core.api.Assertions.assertThat;

import azkaban.utils.Props;

import org.junit.Before;
import org.junit.Test;

/**
* Test class for constructHadoopTags method in HadoopJobUtils
*/
public class TestHadoopJobUtilsConstructHadoopTags {

private Props props;

@Before
public void beforeMethod() {
props = new Props();
}

@Test
public void testNoTags() {
String[] tags = new String[0];
assertThat(HadoopJobUtils.constructHadoopTags(props, tags)).isEqualTo("");
}

@Test
public void testWithTags() {
String tag0 = "tag0";
String tag1 = "tag1";
props.put(tag0, "val0");
props.put(tag1, "val1");
String[] tags = new String[] { tag0, tag1 };
assertThat(HadoopJobUtils.constructHadoopTags(props, tags))
.isEqualTo("tag0:val0,tag1:val1");
}

@Test
public void testWithNonExistentTags() {
String tag0 = "tag0";
String tag1 = "tag1";
String tag2 = "tag2";
props.put(tag0, "val0");
props.put(tag2, "val2");
String[] tags = new String[] { tag0, tag1, tag2 };
assertThat(HadoopJobUtils.constructHadoopTags(props, tags))
.isEqualTo("tag0:val0,tag2:val2");
}
}

0 comments on commit 2be255d

Please sign in to comment.