Skip to content

Commit

Permalink
Redo: Extract all yarn related methods to azkaban-common as shared ut…
Browse files Browse the repository at this point in the history
…il library (#3145)

* Extract all yarn related methods to azkaban-common as shared util library

* extract duplicate code to a static method

* amend header for test file
  • Loading branch information
tsangz2013 committed Sep 28, 2022
1 parent e069212 commit f34a223
Show file tree
Hide file tree
Showing 5 changed files with 198 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package azkaban.jobtype;

import azkaban.flow.CommonJobProperties;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.YarnUtils;
import azkaban.utils.Props;
import com.google.common.base.Joiner;
import java.io.BufferedReader;
Expand All @@ -38,12 +40,8 @@
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.log4j.Logger;
import org.apache.hadoop.fs.Path;


/**
Expand Down Expand Up @@ -86,9 +84,6 @@ public class HadoopJobUtils {
public static final String DEPENDENCY_STORAGE_ROOT_PATH_PROP = "dependency.storage.path.prefix";
// Azkaban property for listing additional namenodes for delegation tokens
private static final String OTHER_NAMENODES_PROPERTY = "other_namenodes";
//Yarn resource configuration directory for the cluster where the job is scheduled by the cluster router
private static final String YARN_CONF_DIRECTORY_PROPERTY = "env.YARN_CONF_DIR";
private static final String YARN_CONF_FILENAME = "yarn-site.xml";

private HadoopJobUtils() {
}
Expand All @@ -114,10 +109,9 @@ public static void cancelHadoopTokens(final HadoopSecurityManager hadoopSecurity
}

/**
* The same as {@link #addAdditionalNamenodesToProps}, but assumes that the
* calling job is MapReduce-based and so uses the
* {@link #MAPREDUCE_JOB_OTHER_NAMENODES} from a {@link Configuration} object
* to get the list of additional namenodes.
* The same as {@link #addAdditionalNamenodesToProps}, but assumes that the calling job is
* MapReduce-based and so uses the {@link #MAPREDUCE_JOB_OTHER_NAMENODES} from a {@link
* Configuration} object to get the list of additional namenodes.
*
* @param props Props to add the new Namenode URIs to.
* @see #addAdditionalNamenodesToProps(Props, String)
Expand All @@ -133,14 +127,14 @@ public static void addAdditionalNamenodesToPropsFromMRJob(final Props props, fin
}

/**
* Takes the list of other Namenodes from which to fetch delegation tokens,
* the {@link #OTHER_NAMENODES_PROPERTY} property, from Props and inserts it
* back with the addition of the the potentially JobType-specific Namenode URIs
* from additionalNamenodes. Modifies props in-place.
* Takes the list of other Namenodes from which to fetch delegation tokens, the {@link
* #OTHER_NAMENODES_PROPERTY} property, from Props and inserts it back with the addition of the
* the potentially JobType-specific Namenode URIs from additionalNamenodes. Modifies props
* in-place.
*
* @param props Props to add the new Namenode URIs to.
* @param additionalNamenodes Comma-separated list of Namenode URIs from which to fetch
* delegation tokens.
* @param props Props to add the new Namenode URIs to.
* @param additionalNamenodes Comma-separated list of Namenode URIs from which to fetch delegation
* tokens.
*/
public static void addAdditionalNamenodesToProps(final Props props,
final String additionalNamenodes) {
Expand Down Expand Up @@ -341,20 +335,18 @@ public boolean accept(final File dir, final String name) {
}

/**
* This method is a decorator around the KillAllSpawnedHadoopJobs method.
* This method takes additional parameters to determine whether KillAllSpawnedHadoopJobs needs to
* be executed
* using doAs as a different user
* This method is a decorator around the KillAllSpawnedHadoopJobs method. This method takes
* additional parameters to determine whether KillAllSpawnedHadoopJobs needs to be executed using
* doAs as a different user
*
* @param logFilePath Azkaban log file path
* @param jobProps Azkaban job props
* @param jobProps Azkaban job props
* @param tokenFile Pass in the tokenFile if value is known. It is ok to skip if the token file
* is in the environmental variable
* @param log a usable logger
* is in the environmental variable
* @param log a usable logger
*/
public static void proxyUserKillAllSpawnedHadoopJobs(final String logFilePath,
final Props jobProps,
public static void proxyUserKillAllSpawnedHadoopJobs(final Props jobProps,
final File tokenFile, final Logger log) {

final Properties properties = new Properties();
properties.putAll(jobProps.getFlattened());

Expand All @@ -366,39 +358,24 @@ public static void proxyUserKillAllSpawnedHadoopJobs(final String logFilePath,
proxyUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, log, jobProps);
findAndKillYarnApps(jobProps, log);
return null;
}
});
} else {
HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, log, jobProps);
findAndKillYarnApps(jobProps, log);
}
} catch (final Throwable t) {
log.warn("something happened while trying to kill all spawned jobs", t);
}
}


/**
* Pass in a log file, this method will find all the hadoop jobs it has launched, and kills it
*
* Only works with Hadoop2
*
* @return a Set<String>. The set will contain the applicationIds that this job tried to kill.
*/
public static Set<String> killAllSpawnedHadoopJobs(final String logFilePath, final Logger log, final Props jobProps) {
final Set<String> allSpawnedJobs = findApplicationIdFromLog(logFilePath, log);
log.info("applicationIds to kill: " + allSpawnedJobs);

for (final String appId : allSpawnedJobs) {
try {
killJobOnCluster(appId, log, jobProps);
} catch (final Throwable t) {
log.warn("something happened while trying to kill this job: " + appId, t);
}
}

return allSpawnedJobs;
private static void findAndKillYarnApps(Props jobProps, Logger log) {
final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
log.info("Log file path is: " + logFilePath);
Set<String> allSpawnedJobAppIDs = findApplicationIdFromLog(logFilePath, log);
YarnClient yarnClient = YarnUtils.createYarnClient(jobProps);
YarnUtils.killAllAppsOnCluster(yarnClient, allSpawnedJobAppIDs, log);
}

/**
Expand Down Expand Up @@ -483,36 +460,6 @@ public static Set<String> findApplicationIdFromLog(final String logFilePath, fin
return applicationIds;
}

/**
* <pre>
* Uses YarnClient to kill the job on HDFS.
* Using JobClient only works partially:
* If yarn container has started but spark job haven't, it will kill
* If spark job has started, the cancel will hang until the spark job is complete
* If the spark job is complete, it will return immediately, with a job not found on job tracker
* </pre>
*/
public static void killJobOnCluster(final String applicationId, final Logger log, final Props jobProps)
throws YarnException,
IOException {

final YarnConfiguration yarnConf = new YarnConfiguration();
final YarnClient yarnClient = YarnClient.createYarnClient();
if (jobProps.containsKey(YARN_CONF_DIRECTORY_PROPERTY)) {
yarnConf.addResource(new Path(jobProps.get(YARN_CONF_DIRECTORY_PROPERTY) + "/" + YARN_CONF_FILENAME));
}
yarnClient.init(yarnConf);
yarnClient.start();

final String[] split = applicationId.split("_");
final ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]),
Integer.parseInt(split[2]));

log.info("start klling application: " + aid);
yarnClient.killApplication(aid);
log.info("successfully killed application: " + aid);
}

/**
* <pre>
* constructions a javaOpts string based on the Props, and the key given, will return
Expand All @@ -535,10 +482,10 @@ public static String javaOptStringFromAzkabanProps(final Props props, final Stri
* Filter a collection of String commands to match a whitelist regex and not match a blacklist
* regex.
*
* @param commands Collection of commands to be filtered
* @param commands Collection of commands to be filtered
* @param whitelistRegex whitelist regex to work as inclusion criteria
* @param blacklistRegex blacklist regex to work as exclusion criteria
* @param log logger to report violation
* @param log logger to report violation
* @return filtered list of matching. Empty list if no command match all the criteria.
*/
public static List<String> filterCommands(final Collection<String> commands,
Expand Down Expand Up @@ -582,7 +529,7 @@ public static String javaOptStringFromHadoopConfiguration(final Configuration co
* Construct a CSV of tags for the Hadoop application.
*
* @param props job properties
* @param keys list of keys to construct tags from.
* @param keys list of keys to construct tags from.
* @return a CSV of tags
*/
public static String constructHadoopTags(final Props props, final String[] keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import java.io.File;

import azkaban.utils.Utils;
import joptsimple.internal.Strings;
import org.apache.log4j.Logger;
import azkaban.flow.CommonJobProperties;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.Props;
Expand Down Expand Up @@ -160,10 +158,6 @@ public void killAllSpawnedHadoopJobs(Props jobProps, final Logger logger) {
if (tokenFile == null) {
return; // do null check for tokenFile
}

final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
logger.info("Log file path is: " + logFilePath);

HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, tokenFile, logger);
HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(jobProps, tokenFile, logger);
}
}
2 changes: 2 additions & 0 deletions azkaban-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ dependencies {
compileOnly deps.hadoopAuth
compileOnly deps.hadoopCommon
compileOnly deps.hadoopHdfs
compile deps.hadoopMRClientCommon
compile deps.hadoopMRClientCore
compile deps.httpclient
compile deps.jetty
compile deps.jettyUtil
Expand Down
86 changes: 86 additions & 0 deletions azkaban-common/src/main/java/azkaban/utils/YarnUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2022 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.utils;

import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;

public class YarnUtils {

//Yarn resource configuration directory for the cluster where the job is scheduled by the cluster router
private static final String YARN_CONF_DIRECTORY_PROPERTY = "env.YARN_CONF_DIR";
private static final String YARN_CONF_FILENAME = "yarn-site.xml";

/**
* Uses YarnClient to kill the jobs one by one
*/
public static void killAllAppsOnCluster(YarnClient yarnClient, Set<String> applicationIDs,
Logger log) {
log.info(String.format("Killing applications: %s", applicationIDs));

for (final String appId : applicationIDs) {
try {
YarnUtils.killAppOnCluster(yarnClient, appId, log);
} catch (final Throwable t) {
log.warn("something happened while trying to kill this job: " + appId, t);
}
}
}

/**
* <pre>
* Uses YarnClient to kill the job on the Hadoop Yarn Cluster.
* Using JobClient only works partially:
* If yarn container has started but spark job haven't, it will kill
* If spark job has started, the cancel will hang until the spark job is complete
* If the spark job is complete, it will return immediately, with a job not found on job tracker
* </pre>
*/
public static void killAppOnCluster(final YarnClient yarnClient, final String applicationId,
final Logger log) throws YarnException, IOException {

final String[] split = applicationId.split("_");
final ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]),
Integer.parseInt(split[2]));
log.info("start killing application: " + aid);
yarnClient.killApplication(aid);
log.info("successfully killed application: " + aid);
}

/**
* Create, initialize and start a YarnClient connecting to the Yarn Cluster (resource manager),
* using the resources passed in with props.
*
* @param props the properties to create a YarnClient, the path to the "yarn-site.xml" to be used
*/
public static YarnClient createYarnClient(Props props) {
final YarnConfiguration yarnConf = new YarnConfiguration();
if (props.containsKey(YARN_CONF_DIRECTORY_PROPERTY)) {
yarnConf.addResource(
new Path(props.get(YARN_CONF_DIRECTORY_PROPERTY) + "/" + YARN_CONF_FILENAME));
}
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
return yarnClient;
}
}

0 comments on commit f34a223

Please sign in to comment.