Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract all yarn related methods to azkaban-common as shared util library #3145

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package azkaban.jobtype;

import azkaban.flow.CommonJobProperties;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.YarnUtils;
import azkaban.utils.Props;
import com.google.common.base.Joiner;
import java.io.BufferedReader;
Expand All @@ -38,12 +40,8 @@
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.log4j.Logger;
import org.apache.hadoop.fs.Path;


/**
Expand Down Expand Up @@ -86,9 +84,6 @@ public class HadoopJobUtils {
public static final String DEPENDENCY_STORAGE_ROOT_PATH_PROP = "dependency.storage.path.prefix";
// Azkaban property for listing additional namenodes for delegation tokens
private static final String OTHER_NAMENODES_PROPERTY = "other_namenodes";
//Yarn resource configuration directory for the cluster where the job is scheduled by the cluster router
private static final String YARN_CONF_DIRECTORY_PROPERTY = "env.YARN_CONF_DIR";
private static final String YARN_CONF_FILENAME = "yarn-site.xml";

private HadoopJobUtils() {
}
Expand All @@ -114,10 +109,9 @@ public static void cancelHadoopTokens(final HadoopSecurityManager hadoopSecurity
}

/**
* The same as {@link #addAdditionalNamenodesToProps}, but assumes that the
* calling job is MapReduce-based and so uses the
* {@link #MAPREDUCE_JOB_OTHER_NAMENODES} from a {@link Configuration} object
* to get the list of additional namenodes.
* The same as {@link #addAdditionalNamenodesToProps}, but assumes that the calling job is
* MapReduce-based and so uses the {@link #MAPREDUCE_JOB_OTHER_NAMENODES} from a {@link
* Configuration} object to get the list of additional namenodes.
*
* @param props Props to add the new Namenode URIs to.
* @see #addAdditionalNamenodesToProps(Props, String)
Expand All @@ -133,14 +127,14 @@ public static void addAdditionalNamenodesToPropsFromMRJob(final Props props, fin
}

/**
* Takes the list of other Namenodes from which to fetch delegation tokens,
* the {@link #OTHER_NAMENODES_PROPERTY} property, from Props and inserts it
* back with the addition of the the potentially JobType-specific Namenode URIs
* from additionalNamenodes. Modifies props in-place.
* Takes the list of other Namenodes from which to fetch delegation tokens, the {@link
* #OTHER_NAMENODES_PROPERTY} property, from Props and inserts it back with the addition of the
* the potentially JobType-specific Namenode URIs from additionalNamenodes. Modifies props
* in-place.
*
* @param props Props to add the new Namenode URIs to.
* @param additionalNamenodes Comma-separated list of Namenode URIs from which to fetch
* delegation tokens.
* @param props Props to add the new Namenode URIs to.
* @param additionalNamenodes Comma-separated list of Namenode URIs from which to fetch delegation
* tokens.
*/
public static void addAdditionalNamenodesToProps(final Props props,
final String additionalNamenodes) {
Expand Down Expand Up @@ -341,20 +335,18 @@ public boolean accept(final File dir, final String name) {
}

/**
* This method is a decorator around the KillAllSpawnedHadoopJobs method.
* This method takes additional parameters to determine whether KillAllSpawnedHadoopJobs needs to
* be executed
* using doAs as a different user
* This method is a decorator around the KillAllSpawnedHadoopJobs method. This method takes
* additional parameters to determine whether KillAllSpawnedHadoopJobs needs to be executed using
* doAs as a different user
*
* @param logFilePath Azkaban log file path
* @param jobProps Azkaban job props
* @param jobProps Azkaban job props
* @param tokenFile Pass in the tokenFile if value is known. It is ok to skip if the token file
* is in the environmental variable
* @param log a usable logger
* is in the environmental variable
* @param log a usable logger
*/
public static void proxyUserKillAllSpawnedHadoopJobs(final String logFilePath,
final Props jobProps,
public static void proxyUserKillAllSpawnedHadoopJobs(final Props jobProps,
final File tokenFile, final Logger log) {

final Properties properties = new Properties();
properties.putAll(jobProps.getFlattened());

Expand All @@ -366,39 +358,24 @@ public static void proxyUserKillAllSpawnedHadoopJobs(final String logFilePath,
proxyUser.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, log, jobProps);
findAndKillYarnApps(jobProps, log);
return null;
}
});
} else {
HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, log, jobProps);
djaiswal83 marked this conversation as resolved.
Show resolved Hide resolved
findAndKillYarnApps(jobProps, log);
}
} catch (final Throwable t) {
log.warn("something happened while trying to kill all spawned jobs", t);
}
}


/**
* Pass in a log file, this method will find all the hadoop jobs it has launched, and kills it
*
* Only works with Hadoop2
*
* @return a Set<String>. The set will contain the applicationIds that this job tried to kill.
*/
public static Set<String> killAllSpawnedHadoopJobs(final String logFilePath, final Logger log, final Props jobProps) {
final Set<String> allSpawnedJobs = findApplicationIdFromLog(logFilePath, log);
log.info("applicationIds to kill: " + allSpawnedJobs);

for (final String appId : allSpawnedJobs) {
try {
killJobOnCluster(appId, log, jobProps);
} catch (final Throwable t) {
log.warn("something happened while trying to kill this job: " + appId, t);
}
}

return allSpawnedJobs;
private static void findAndKillYarnApps(Props jobProps, Logger log) {
final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
log.info("Log file path is: " + logFilePath);
Set<String> allSpawnedJobAppIDs = findApplicationIdFromLog(logFilePath, log);
YarnClient yarnClient = YarnUtils.createYarnClient(jobProps);
YarnUtils.killAllAppsOnCluster(yarnClient, allSpawnedJobAppIDs, log);
}

/**
Expand Down Expand Up @@ -483,36 +460,6 @@ public static Set<String> findApplicationIdFromLog(final String logFilePath, fin
return applicationIds;
}

/**
* <pre>
* Uses YarnClient to kill the job on HDFS.
* Using JobClient only works partially:
* If yarn container has started but spark job haven't, it will kill
* If spark job has started, the cancel will hang until the spark job is complete
* If the spark job is complete, it will return immediately, with a job not found on job tracker
* </pre>
*/
public static void killJobOnCluster(final String applicationId, final Logger log, final Props jobProps)
throws YarnException,
IOException {

final YarnConfiguration yarnConf = new YarnConfiguration();
final YarnClient yarnClient = YarnClient.createYarnClient();
if (jobProps.containsKey(YARN_CONF_DIRECTORY_PROPERTY)) {
yarnConf.addResource(new Path(jobProps.get(YARN_CONF_DIRECTORY_PROPERTY) + "/" + YARN_CONF_FILENAME));
}
yarnClient.init(yarnConf);
yarnClient.start();

final String[] split = applicationId.split("_");
final ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]),
Integer.parseInt(split[2]));

log.info("start klling application: " + aid);
yarnClient.killApplication(aid);
log.info("successfully killed application: " + aid);
}

/**
* <pre>
* constructions a javaOpts string based on the Props, and the key given, will return
Expand All @@ -535,10 +482,10 @@ public static String javaOptStringFromAzkabanProps(final Props props, final Stri
* Filter a collection of String commands to match a whitelist regex and not match a blacklist
* regex.
*
* @param commands Collection of commands to be filtered
* @param commands Collection of commands to be filtered
* @param whitelistRegex whitelist regex to work as inclusion criteria
* @param blacklistRegex blacklist regex to work as exclusion criteria
* @param log logger to report violation
* @param log logger to report violation
* @return filtered list of matching. Empty list if no command match all the criteria.
*/
public static List<String> filterCommands(final Collection<String> commands,
Expand Down Expand Up @@ -582,7 +529,7 @@ public static String javaOptStringFromHadoopConfiguration(final Configuration co
* Construct a CSV of tags for the Hadoop application.
*
* @param props job properties
* @param keys list of keys to construct tags from.
* @param keys list of keys to construct tags from.
* @return a CSV of tags
*/
public static String constructHadoopTags(final Props props, final String[] keys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@
import java.io.File;

import azkaban.utils.Utils;
import joptsimple.internal.Strings;
import org.apache.log4j.Logger;
import azkaban.flow.CommonJobProperties;
import azkaban.security.commons.HadoopSecurityManager;
import azkaban.security.commons.HadoopSecurityManagerException;
import azkaban.utils.Props;
Expand Down Expand Up @@ -160,10 +158,6 @@ public void killAllSpawnedHadoopJobs(Props jobProps, final Logger logger) {
if (tokenFile == null) {
return; // do null check for tokenFile
}

final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
logger.info("Log file path is: " + logFilePath);

HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, tokenFile, logger);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not good: duplicated info in the 2 parameters

HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(jobProps, tokenFile, logger);
}
}
2 changes: 2 additions & 0 deletions azkaban-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ dependencies {
compileOnly deps.hadoopAuth
compileOnly deps.hadoopCommon
compileOnly deps.hadoopHdfs
compile deps.hadoopMRClientCommon
compile deps.hadoopMRClientCore
compile deps.httpclient
compile deps.jetty
compile deps.jettyUtil
Expand Down
86 changes: 86 additions & 0 deletions azkaban-common/src/main/java/azkaban/utils/YarnUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2022 LinkedIn Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package azkaban.utils;
Copy link
Collaborator Author

@tsangz2013 tsangz2013 Sep 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically moved from old HadoopJobUtils, but follow the idea of yarnClient can be passed in

tsangz2013 marked this conversation as resolved.
Show resolved Hide resolved

import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Logger;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;

public class YarnUtils {

//Yarn resource configuration directory for the cluster where the job is scheduled by the cluster router
private static final String YARN_CONF_DIRECTORY_PROPERTY = "env.YARN_CONF_DIR";
private static final String YARN_CONF_FILENAME = "yarn-site.xml";

/**
* Uses YarnClient to kill the jobs one by one
*/
public static void killAllAppsOnCluster(YarnClient yarnClient, Set<String> applicationIDs,
Logger log) {
log.info(String.format("Killing applications: %s", applicationIDs));

for (final String appId : applicationIDs) {
try {
YarnUtils.killAppOnCluster(yarnClient, appId, log);
} catch (final Throwable t) {
log.warn("something happened while trying to kill this job: " + appId, t);
}
}
}

/**
* <pre>
* Uses YarnClient to kill the job on the Hadoop Yarn Cluster.
* Using JobClient only works partially:
* If yarn container has started but spark job haven't, it will kill
* If spark job has started, the cancel will hang until the spark job is complete
* If the spark job is complete, it will return immediately, with a job not found on job tracker
* </pre>
*/
public static void killAppOnCluster(final YarnClient yarnClient, final String applicationId,
final Logger log) throws YarnException, IOException {

final String[] split = applicationId.split("_");
final ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]),
Integer.parseInt(split[2]));
log.info("start killing application: " + aid);
yarnClient.killApplication(aid);
log.info("successfully killed application: " + aid);
}

/**
* Create, initialize and start a YarnClient connecting to the Yarn Cluster (resource manager),
* using the resources passed in with props.
*
* @param props the properties to create a YarnClient, the path to the "yarn-site.xml" to be used
*/
public static YarnClient createYarnClient(Props props) {
final YarnConfiguration yarnConf = new YarnConfiguration();
if (props.containsKey(YARN_CONF_DIRECTORY_PROPERTY)) {
yarnConf.addResource(
new Path(props.get(YARN_CONF_DIRECTORY_PROPERTY) + "/" + YARN_CONF_FILENAME));
}
final YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(yarnConf);
yarnClient.start();
return yarnClient;
}
}