Skip to content

Commit

Permalink
Extract all yarn related methods to azkaban-common as shared util lib…
Browse files Browse the repository at this point in the history
…rary
  • Loading branch information
tsangz2013 committed Sep 7, 2022
1 parent f7ad185 commit a35ff0e
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package azkaban.jobtype;

import azkaban.flow.CommonJobProperties;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.YarnUtils;
import azkaban.utils.Props;
import com.google.common.base.Joiner;
import java.io.BufferedReader;
Expand All @@ -38,12 +40,8 @@
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.log4j.Logger;
import org.apache.hadoop.fs.Path;


/**
Expand Down Expand Up @@ -86,9 +84,6 @@ public class HadoopJobUtils {
public static final String DEPENDENCY_STORAGE_ROOT_PATH_PROP = "dependency.storage.path.prefix";
// Azkaban property for listing additional namenodes for delegation tokens
private static final String OTHER_NAMENODES_PROPERTY = "other_namenodes";
//Yarn resource configuration directory for the cluster where the job is scheduled by the cluster router
private static final String YARN_CONF_DIRECTORY_PROPERTY = "env.YARN_CONF_DIR";
private static final String YARN_CONF_FILENAME = "yarn-site.xml";

private HadoopJobUtils() {
}
Expand Down Expand Up @@ -345,16 +340,17 @@ public boolean accept(final File dir, final String name) {
* This method takes additional parameters to determine whether KillAllSpawnedHadoopJobs needs to
* be executed
* using doAs as a different user
*
* @param logFilePath Azkaban log file path
* @param jobProps Azkaban job props
* @param tokenFile Pass in the tokenFile if value is known. It is ok to skip if the token file
* is in the environmental variable
* @param log a usable logger
*/
public static void proxyUserKillAllSpawnedHadoopJobs(final String logFilePath,
final Props jobProps,
public static void proxyUserKillAllSpawnedHadoopJobs(final Props jobProps,
final File tokenFile, final Logger log) {

final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
log.info("Log file path is: " + logFilePath);

final Properties properties = new Properties();
properties.putAll(jobProps.getFlattened());

Expand All @@ -366,41 +362,22 @@ public static void proxyUserKillAllSpawnedHadoopJobs(final String logFilePath,
proxyUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, log, jobProps);
Set<String> allSpawnedJobAppIDs = findApplicationIdFromLog(logFilePath, log);
YarnClient yarnClient = YarnUtils.createYarnClient(jobProps);
YarnUtils.killAllAppsOnCluster(yarnClient, allSpawnedJobAppIDs, log);
return null;
}
});
} else {
HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, log, jobProps);
Set<String> allSpawnedJobAppIDs = findApplicationIdFromLog(logFilePath, log);
YarnClient yarnClient = YarnUtils.createYarnClient(jobProps);
YarnUtils.killAllAppsOnCluster(yarnClient, allSpawnedJobAppIDs, log);
}
} catch (final Throwable t) {
log.warn("something happened while trying to kill all spawned jobs", t);
}
}


/**
* Pass in a log file, this method will find all the hadoop jobs it has launched, and kills it
*
* Only works with Hadoop2
*
* @return a Set<String>. The set will contain the applicationIds that this job tried to kill.
*/
public static Set<String> killAllSpawnedHadoopJobs(final String logFilePath, final Logger log, final Props jobProps) {
final Set<String> allSpawnedJobs = findApplicationIdFromLog(logFilePath, log);
log.info("applicationIds to kill: " + allSpawnedJobs);

for (final String appId : allSpawnedJobs) {
try {
killJobOnCluster(appId, log, jobProps);
} catch (final Throwable t) {
log.warn("something happened while trying to kill this job: " + appId, t);
}
}

return allSpawnedJobs;
}

/**
* <pre>
* Takes in a log file, will grep every line to look for the application_id pattern.
Expand Down Expand Up @@ -483,36 +460,6 @@ public static Set<String> findApplicationIdFromLog(final String logFilePath, fin
return applicationIds;
}

/**
* <pre>
* Uses YarnClient to kill the job on HDFS.
* Using JobClient only works partially:
* If yarn container has started but spark job haven't, it will kill
* If spark job has started, the cancel will hang until the spark job is complete
* If the spark job is complete, it will return immediately, with a job not found on job tracker
* </pre>
*/
public static void killJobOnCluster(final String applicationId, final Logger log, final Props jobProps)
throws YarnException,
IOException {

final YarnConfiguration yarnConf = new YarnConfiguration();
final YarnClient yarnClient = YarnClient.createYarnClient();
if (jobProps.containsKey(YARN_CONF_DIRECTORY_PROPERTY)) {
yarnConf.addResource(new Path(jobProps.get(YARN_CONF_DIRECTORY_PROPERTY) + "/" + YARN_CONF_FILENAME));
}
yarnClient.init(yarnConf);
yarnClient.start();

final String[] split = applicationId.split("_");
final ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]),
Integer.parseInt(split[2]));

log.info("start klling application: " + aid);
yarnClient.killApplication(aid);
log.info("successfully killed application: " + aid);
}

/**
* <pre>
* constructions a javaOpts string based on the Props, and the key given, will return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import java.io.File;

import azkaban.utils.Utils;
import joptsimple.internal.Strings;
import org.apache.log4j.Logger;
import azkaban.flow.CommonJobProperties;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.Props;
Expand Down Expand Up @@ -160,10 +158,6 @@ public void killAllSpawnedHadoopJobs(Props jobProps, final Logger logger) {
if (tokenFile == null) {
return; // do null check for tokenFile
}

final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
logger.info("Log file path is: " + logFilePath);

HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, tokenFile, logger);
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(jobProps, tokenFile, logger);
}
}
2 changes: 2 additions & 0 deletions azkaban-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ dependencies {
compileOnly deps.hadoopAuth
compileOnly deps.hadoopCommon
compileOnly deps.hadoopHdfs
compile deps.hadoopMRClientCommon
compile deps.hadoopMRClientCore
compile deps.httpclient
compile deps.jetty
compile deps.jettyUtil
Expand Down
71 changes: 71 additions & 0 deletions azkaban-common/src/main/java/azkaban/utils/YarnUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package azkaban.utils;

import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;

public class YarnUtils {

//Yarn resource configuration directory for the cluster where the job is scheduled by the cluster router
private static final String YARN_CONF_DIRECTORY_PROPERTY = "env.YARN_CONF_DIR";
private static final String YARN_CONF_FILENAME = "yarn-site.xml";

/**
* Uses YarnClient to kill the jobs one by one
*/
public static void killAllAppsOnCluster(YarnClient yarnClient, Set<String> applicationIDs,
Logger log) {
log.info(String.format("Killing applications: %s", applicationIDs));

for (final String appId : applicationIDs) {
try {
YarnUtils.killAppOnCluster(yarnClient, appId, log);
} catch (final Throwable t) {
log.warn("something happened while trying to kill this job: " + appId, t);
}
}
}

/**
* <pre>
* Uses YarnClient to kill the job on the Hadoop Yarn Cluster.
* Using JobClient only works partially:
* If yarn container has started but spark job haven't, it will kill
* If spark job has started, the cancel will hang until the spark job is complete
* If the spark job is complete, it will return immediately, with a job not found on job tracker
* </pre>
*/
public static void killAppOnCluster(final YarnClient yarnClient, final String applicationId,
final Logger log) throws YarnException, IOException {

final String[] split = applicationId.split("_");
final ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]),
Integer.parseInt(split[2]));
log.info("start killing application: " + aid);
yarnClient.killApplication(aid);
log.info("successfully killed application: " + aid);
}

/**
* Create, initialize and start a YarnClient connecting to the Yarn Cluster (resource manager),
* using the resources passed in with props.
*
* @param props the properties to create a YarnClient, the path to the "yarn-site.xml" to be used
*/
public static YarnClient createYarnClient(Props props) {
final YarnConfiguration yarnConf = new YarnConfiguration();
if (props.containsKey(YARN_CONF_DIRECTORY_PROPERTY)) {
yarnConf.addResource(
new Path(props.get(YARN_CONF_DIRECTORY_PROPERTY) + "/" + YARN_CONF_FILENAME));
}
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
return yarnClient;
}
}
63 changes: 63 additions & 0 deletions azkaban-common/src/test/java/azkaban/utils/YarnUtilsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package azkaban.utils;


import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.log4j.Logger;
import org.junit.Test;

public class YarnUtilsTest {

final private Logger log = Logger.getLogger(YarnUtilsTest.class);

@Test
public void testKillAppOnCluster() throws IOException, YarnException {
YarnClient mockClient = mock(YarnClient.class);
doNothing().when(mockClient).killApplication(any());
YarnUtils.killAppOnCluster(mockClient, "application_123_456", log);
}

@Test(expected = IndexOutOfBoundsException.class)
public void testKillAppOnClusterInvalidAppID() throws IOException, YarnException {
YarnClient mockClient = mock(YarnClient.class);
doNothing().when(mockClient).killApplication(any());
YarnUtils.killAppOnCluster(mockClient, "application+++123===456", log);
}

@Test(expected = YarnException.class)
public void testKillAppOnClusterYarnFail() throws IOException, YarnException {
YarnClient mockClient = mock(YarnClient.class);
doThrow(new YarnException("ops")).when(mockClient).killApplication(any());
YarnUtils.killAppOnCluster(mockClient, "application_123_456", log);
}

@Test
public void testKillAllAppsOnCluster() throws IOException, YarnException {
YarnClient mockClient = mock(YarnClient.class);
doNothing().when(mockClient).killApplication(any());
YarnUtils.killAllAppsOnCluster(mockClient,
ImmutableSet.of("application_123_456", "application_456_123"), log);
}

@Test
public void testKillAllAppsOnClusterFail() throws IOException, YarnException {
YarnClient mockClient = mock(YarnClient.class);
doNothing().when(mockClient).killApplication(any());
doThrow(new YarnException("ops")).when(mockClient).killApplication(
eq(ApplicationId.newInstance(4560, 1230)));

// log the error but not throw
YarnUtils.killAllAppsOnCluster(mockClient,
ImmutableSet.of("application_123_456", "application_4560_1230"), log);
}
}

0 comments on commit a35ff0e

Please sign in to comment.