diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java index 124617d275..d3bcebd170 100644 --- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java +++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopJobUtils.java @@ -15,8 +15,10 @@ */ package azkaban.jobtype; +import azkaban.flow.CommonJobProperties; import azkaban.security.commons.HadoopSecurityManager; import azkaban.security.commons.HadoopSecurityManagerException; +import azkaban.utils.YarnUtils; import azkaban.utils.Props; import com.google.common.base.Joiner; import java.io.BufferedReader; @@ -38,12 +40,8 @@ import java.util.regex.Pattern; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.client.api.YarnClient; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.log4j.Logger; -import org.apache.hadoop.fs.Path; /** @@ -86,9 +84,6 @@ public class HadoopJobUtils { public static final String DEPENDENCY_STORAGE_ROOT_PATH_PROP = "dependency.storage.path.prefix"; // Azkaban property for listing additional namenodes for delegation tokens private static final String OTHER_NAMENODES_PROPERTY = "other_namenodes"; - //Yarn resource configuration directory for the cluster where the job is scheduled by the cluster router - private static final String YARN_CONF_DIRECTORY_PROPERTY = "env.YARN_CONF_DIR"; - private static final String YARN_CONF_FILENAME = "yarn-site.xml"; private HadoopJobUtils() { } @@ -114,10 +109,9 @@ public static void cancelHadoopTokens(final HadoopSecurityManager hadoopSecurity } /** - * The same as {@link #addAdditionalNamenodesToProps}, but assumes that the - * calling job is MapReduce-based and so uses the - * {@link #MAPREDUCE_JOB_OTHER_NAMENODES} from a {@link Configuration} object - * to get the list of additional namenodes. + * The same as {@link #addAdditionalNamenodesToProps}, but assumes that the calling job is + * MapReduce-based and so uses the {@link #MAPREDUCE_JOB_OTHER_NAMENODES} from a {@link + * Configuration} object to get the list of additional namenodes. * * @param props Props to add the new Namenode URIs to. * @see #addAdditionalNamenodesToProps(Props, String) @@ -133,14 +127,14 @@ public static void addAdditionalNamenodesToPropsFromMRJob(final Props props, fin } /** - * Takes the list of other Namenodes from which to fetch delegation tokens, - * the {@link #OTHER_NAMENODES_PROPERTY} property, from Props and inserts it - * back with the addition of the the potentially JobType-specific Namenode URIs - * from additionalNamenodes. Modifies props in-place. + * Takes the list of other Namenodes from which to fetch delegation tokens, the {@link + * #OTHER_NAMENODES_PROPERTY} property, from Props and inserts it back with the addition of the + * the potentially JobType-specific Namenode URIs from additionalNamenodes. Modifies props + * in-place. * - * @param props Props to add the new Namenode URIs to. - * @param additionalNamenodes Comma-separated list of Namenode URIs from which to fetch - * delegation tokens. + * @param props Props to add the new Namenode URIs to. + * @param additionalNamenodes Comma-separated list of Namenode URIs from which to fetch delegation + * tokens. */ public static void addAdditionalNamenodesToProps(final Props props, final String additionalNamenodes) { @@ -341,20 +335,18 @@ public boolean accept(final File dir, final String name) { } /** - * This method is a decorator around the KillAllSpawnedHadoopJobs method. - * This method takes additional parameters to determine whether KillAllSpawnedHadoopJobs needs to - * be executed - * using doAs as a different user + * This method is a decorator around the KillAllSpawnedHadoopJobs method. This method takes + * additional parameters to determine whether KillAllSpawnedHadoopJobs needs to be executed using + * doAs as a different user * - * @param logFilePath Azkaban log file path - * @param jobProps Azkaban job props + * @param jobProps Azkaban job props * @param tokenFile Pass in the tokenFile if value is known. It is ok to skip if the token file - * is in the environmental variable - * @param log a usable logger + * is in the environmental variable + * @param log a usable logger */ - public static void proxyUserKillAllSpawnedHadoopJobs(final String logFilePath, - final Props jobProps, + public static void proxyUserKillAllSpawnedHadoopJobs(final Props jobProps, final File tokenFile, final Logger log) { + final Properties properties = new Properties(); properties.putAll(jobProps.getFlattened()); @@ -366,39 +358,24 @@ public static void proxyUserKillAllSpawnedHadoopJobs(final String logFilePath, proxyUser.doAs(new PrivilegedExceptionAction() { @Override public Void run() throws Exception { - HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, log, jobProps); + findAndKillYarnApps(jobProps, log); return null; } }); } else { - HadoopJobUtils.killAllSpawnedHadoopJobs(logFilePath, log, jobProps); + findAndKillYarnApps(jobProps, log); } } catch (final Throwable t) { log.warn("something happened while trying to kill all spawned jobs", t); } } - - /** - * Pass in a log file, this method will find all the hadoop jobs it has launched, and kills it - * - * Only works with Hadoop2 - * - * @return a Set. The set will contain the applicationIds that this job tried to kill. - */ - public static Set killAllSpawnedHadoopJobs(final String logFilePath, final Logger log, final Props jobProps) { - final Set allSpawnedJobs = findApplicationIdFromLog(logFilePath, log); - log.info("applicationIds to kill: " + allSpawnedJobs); - - for (final String appId : allSpawnedJobs) { - try { - killJobOnCluster(appId, log, jobProps); - } catch (final Throwable t) { - log.warn("something happened while trying to kill this job: " + appId, t); - } - } - - return allSpawnedJobs; + private static void findAndKillYarnApps(Props jobProps, Logger log) { + final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE); + log.info("Log file path is: " + logFilePath); + Set allSpawnedJobAppIDs = findApplicationIdFromLog(logFilePath, log); + YarnClient yarnClient = YarnUtils.createYarnClient(jobProps); + YarnUtils.killAllAppsOnCluster(yarnClient, allSpawnedJobAppIDs, log); } /** @@ -483,36 +460,6 @@ public static Set findApplicationIdFromLog(final String logFilePath, fin return applicationIds; } - /** - *
-   * Uses YarnClient to kill the job on HDFS.
-   * Using JobClient only works partially:
-   *   If yarn container has started but spark job haven't, it will kill
-   *   If spark job has started, the cancel will hang until the spark job is complete
-   *   If the spark job is complete, it will return immediately, with a job not found on job tracker
-   * 
- */ - public static void killJobOnCluster(final String applicationId, final Logger log, final Props jobProps) - throws YarnException, - IOException { - - final YarnConfiguration yarnConf = new YarnConfiguration(); - final YarnClient yarnClient = YarnClient.createYarnClient(); - if (jobProps.containsKey(YARN_CONF_DIRECTORY_PROPERTY)) { - yarnConf.addResource(new Path(jobProps.get(YARN_CONF_DIRECTORY_PROPERTY) + "/" + YARN_CONF_FILENAME)); - } - yarnClient.init(yarnConf); - yarnClient.start(); - - final String[] split = applicationId.split("_"); - final ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]), - Integer.parseInt(split[2])); - - log.info("start klling application: " + aid); - yarnClient.killApplication(aid); - log.info("successfully killed application: " + aid); - } - /** *
    * constructions a javaOpts string based on the Props, and the key given, will return
@@ -535,10 +482,10 @@ public static String javaOptStringFromAzkabanProps(final Props props, final Stri
    * Filter a collection of String commands to match a whitelist regex and not match a blacklist
    * regex.
    *
-   * @param commands Collection of commands to be filtered
+   * @param commands       Collection of commands to be filtered
    * @param whitelistRegex whitelist regex to work as inclusion criteria
    * @param blacklistRegex blacklist regex to work as exclusion criteria
-   * @param log logger to report violation
+   * @param log            logger to report violation
    * @return filtered list of matching. Empty list if no command match all the criteria.
    */
   public static List filterCommands(final Collection commands,
@@ -582,7 +529,7 @@ public static String javaOptStringFromHadoopConfiguration(final Configuration co
    * Construct a CSV of tags for the Hadoop application.
    *
    * @param props job properties
-   * @param keys list of keys to construct tags from.
+   * @param keys  list of keys to construct tags from.
    * @return a CSV of tags
    */
   public static String constructHadoopTags(final Props props, final String[] keys) {
diff --git a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopProxy.java b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopProxy.java
index dda76e46ac..22dba5c793 100644
--- a/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopProxy.java
+++ b/az-hadoop-jobtype-plugin/src/main/java/azkaban/jobtype/HadoopProxy.java
@@ -20,9 +20,7 @@
 import java.io.File;
 
 import azkaban.utils.Utils;
-import joptsimple.internal.Strings;
 import org.apache.log4j.Logger;
-import azkaban.flow.CommonJobProperties;
 import azkaban.security.commons.HadoopSecurityManager;
 import azkaban.security.commons.HadoopSecurityManagerException;
 import azkaban.utils.Props;
@@ -160,10 +158,6 @@ public void killAllSpawnedHadoopJobs(Props jobProps, final Logger logger) {
     if (tokenFile == null) {
       return; // do null check for tokenFile
     }
-
-    final String logFilePath = jobProps.getString(CommonJobProperties.JOB_LOG_FILE);
-    logger.info("Log file path is: " + logFilePath);
-
-    HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(logFilePath, jobProps, tokenFile, logger);
+    HadoopJobUtils.proxyUserKillAllSpawnedHadoopJobs(jobProps, tokenFile, logger);
   }
 }
diff --git a/azkaban-common/build.gradle b/azkaban-common/build.gradle
index f7fdccba74..c90a357fa3 100644
--- a/azkaban-common/build.gradle
+++ b/azkaban-common/build.gradle
@@ -29,6 +29,8 @@ dependencies {
     compileOnly deps.hadoopAuth
     compileOnly deps.hadoopCommon
     compileOnly deps.hadoopHdfs
+    compile deps.hadoopMRClientCommon
+    compile deps.hadoopMRClientCore
     compile deps.httpclient
     compile deps.jetty
     compile deps.jettyUtil
diff --git a/azkaban-common/src/main/java/azkaban/utils/YarnUtils.java b/azkaban-common/src/main/java/azkaban/utils/YarnUtils.java
new file mode 100644
index 0000000000..b8015f2a11
--- /dev/null
+++ b/azkaban-common/src/main/java/azkaban/utils/YarnUtils.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2022 LinkedIn Corp.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package azkaban.utils;
+
+import java.io.IOException;
+import java.util.Set;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+public class YarnUtils {
+
+  //Yarn resource configuration directory for the cluster where the job is scheduled by the cluster router
+  private static final String YARN_CONF_DIRECTORY_PROPERTY = "env.YARN_CONF_DIR";
+  private static final String YARN_CONF_FILENAME = "yarn-site.xml";
+
+  /**
+   * Uses YarnClient to kill the jobs one by one
+   */
+  public static void killAllAppsOnCluster(YarnClient yarnClient, Set applicationIDs,
+      Logger log) {
+    log.info(String.format("Killing applications: %s", applicationIDs));
+
+    for (final String appId : applicationIDs) {
+      try {
+        YarnUtils.killAppOnCluster(yarnClient, appId, log);
+      } catch (final Throwable t) {
+        log.warn("something happened while trying to kill this job: " + appId, t);
+      }
+    }
+  }
+
+  /**
+   * 
+   * Uses YarnClient to kill the job on the Hadoop Yarn Cluster.
+   * Using JobClient only works partially:
+   *   If yarn container has started but spark job haven't, it will kill
+   *   If spark job has started, the cancel will hang until the spark job is complete
+   *   If the spark job is complete, it will return immediately, with a job not found on job tracker
+   * 
+ */ + public static void killAppOnCluster(final YarnClient yarnClient, final String applicationId, + final Logger log) throws YarnException, IOException { + + final String[] split = applicationId.split("_"); + final ApplicationId aid = ApplicationId.newInstance(Long.parseLong(split[1]), + Integer.parseInt(split[2])); + log.info("start killing application: " + aid); + yarnClient.killApplication(aid); + log.info("successfully killed application: " + aid); + } + + /** + * Create, initialize and start a YarnClient connecting to the Yarn Cluster (resource manager), + * using the resources passed in with props. + * + * @param props the properties to create a YarnClient, the path to the "yarn-site.xml" to be used + */ + public static YarnClient createYarnClient(Props props) { + final YarnConfiguration yarnConf = new YarnConfiguration(); + if (props.containsKey(YARN_CONF_DIRECTORY_PROPERTY)) { + yarnConf.addResource( + new Path(props.get(YARN_CONF_DIRECTORY_PROPERTY) + "/" + YARN_CONF_FILENAME)); + } + final YarnClient yarnClient = YarnClient.createYarnClient(); + yarnClient.init(yarnConf); + yarnClient.start(); + return yarnClient; + } +} diff --git a/azkaban-common/src/test/java/azkaban/utils/YarnUtilsTest.java b/azkaban-common/src/test/java/azkaban/utils/YarnUtilsTest.java new file mode 100644 index 0000000000..91124c4f22 --- /dev/null +++ b/azkaban-common/src/test/java/azkaban/utils/YarnUtilsTest.java @@ -0,0 +1,78 @@ +/* + * Copyright 2022 LinkedIn Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package azkaban.utils; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableSet; +import java.io.IOException; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.log4j.Logger; +import org.junit.Test; + +public class YarnUtilsTest { + + final private Logger log = Logger.getLogger(YarnUtilsTest.class); + + @Test + public void testKillAppOnCluster() throws IOException, YarnException { + YarnClient mockClient = mock(YarnClient.class); + doNothing().when(mockClient).killApplication(any()); + YarnUtils.killAppOnCluster(mockClient, "application_123_456", log); + } + + @Test(expected = IndexOutOfBoundsException.class) + public void testKillAppOnClusterInvalidAppID() throws IOException, YarnException { + YarnClient mockClient = mock(YarnClient.class); + doNothing().when(mockClient).killApplication(any()); + YarnUtils.killAppOnCluster(mockClient, "application+++123===456", log); + } + + @Test(expected = YarnException.class) + public void testKillAppOnClusterYarnFail() throws IOException, YarnException { + YarnClient mockClient = mock(YarnClient.class); + doThrow(new YarnException("ops")).when(mockClient).killApplication(any()); + YarnUtils.killAppOnCluster(mockClient, "application_123_456", log); + } + + @Test + public void testKillAllAppsOnCluster() throws IOException, YarnException { + YarnClient mockClient = mock(YarnClient.class); + doNothing().when(mockClient).killApplication(any()); + YarnUtils.killAllAppsOnCluster(mockClient, + ImmutableSet.of("application_123_456", "application_456_123"), log); + } + + @Test + public void testKillAllAppsOnClusterFail() throws IOException, YarnException { + YarnClient mockClient = mock(YarnClient.class); + doNothing().when(mockClient).killApplication(any()); + doThrow(new YarnException("ops")).when(mockClient).killApplication( + eq(ApplicationId.newInstance(4560, 1230))); + + // log the error but not throw + YarnUtils.killAllAppsOnCluster(mockClient, + ImmutableSet.of("application_123_456", "application_4560_1230"), log); + } +}