From ad3cd23aee87e6afa102d354a603e9d65188cb3f Mon Sep 17 00:00:00 2001 From: Daniel-009497 <18710890153@163.com> Date: Mon, 19 Dec 2022 18:27:08 +0800 Subject: [PATCH] YARN-11401 Separate ApllicationMaster cleanup events and launcher event into different resource pool --- .../hadoop/yarn/conf/YarnConfiguration.java | 7 ++ .../amlauncher/ApplicationMasterLauncher.java | 75 ++++++++++++++++--- 2 files changed, 70 insertions(+), 12 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 316a6421889bd..eb1c45cc9bb72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -236,8 +236,15 @@ private static void addDeprecatedKeys() { /** Number of threads used to launch/cleanup AM.*/ public static final String RM_AMLAUNCHER_THREAD_COUNT = RM_PREFIX + "amlauncher.thread-count"; + public static final int DEFAULT_RM_AMLAUNCHER_THREAD_COUNT = 50; + /** Number of threads used to cleanup AM.*/ + public static final String RM_AMCLEANUP_THREAD_COUNT = + RM_PREFIX + "amcleanup.thread-count"; + + public static final int DEFAULT_RM_AMCLEANUP_THREAD_COUNT = 50; + /** Retry times to connect with NM.*/ public static final String RM_NODEMANAGER_CONNECT_RETRIES = RM_PREFIX + "nodemanager-connect-retries"; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java index 9f4de2868a1fd..7364cb9e1e5bd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/ApplicationMasterLauncher.java @@ -40,30 +40,45 @@ public class ApplicationMasterLauncher extends AbstractService implements private static final Logger LOG = LoggerFactory.getLogger( ApplicationMasterLauncher.class); private ThreadPoolExecutor launcherPool; + private ThreadPoolExecutor cleanupPool; private LauncherThread launcherHandlingThread; + private CleanupThread cleanupHandlingThread; + - private final BlockingQueue masterEvents + private final BlockingQueue launcherEvents + = new LinkedBlockingQueue(); + private final BlockingQueue cleanupEvents = new LinkedBlockingQueue(); - protected final RMContext context; public ApplicationMasterLauncher(RMContext context) { super(ApplicationMasterLauncher.class.getName()); this.context = context; this.launcherHandlingThread = new LauncherThread(); + this.cleanupHandlingThread = new CleanupThread(); } @Override protected void serviceInit(Configuration conf) throws Exception { - int threadCount = conf.getInt( + int launcherThreadCount = conf.getInt( YarnConfiguration.RM_AMLAUNCHER_THREAD_COUNT, YarnConfiguration.DEFAULT_RM_AMLAUNCHER_THREAD_COUNT); - ThreadFactory tf = new ThreadFactoryBuilder() + ThreadFactory tfLauncher = new ThreadFactoryBuilder() .setNameFormat("ApplicationMasterLauncher #%d") .build(); - launcherPool = new ThreadPoolExecutor(threadCount, threadCount, 1, + launcherPool = new ThreadPoolExecutor(launcherThreadCount, launcherThreadCount, 1, TimeUnit.HOURS, new LinkedBlockingQueue()); - launcherPool.setThreadFactory(tf); + launcherPool.setThreadFactory(tfLauncher); + + int cleanupThreadCount = conf.getInt( + YarnConfiguration.RM_AMCLEANUP_THREAD_COUNT, + YarnConfiguration.DEFAULT_RM_AMCLEANUP_THREAD_COUNT); + ThreadFactory tfCleanup = new ThreadFactoryBuilder() + .setNameFormat("ApplicationMasterLauncher #%d") + .build(); + cleanupPool = new ThreadPoolExecutor(cleanupThreadCount, cleanupThreadCount, 1, + TimeUnit.HOURS, new LinkedBlockingQueue()); + cleanupPool.setThreadFactory(tfCleanup); Configuration newConf = new YarnConfiguration(conf); newConf.setInt(CommonConfigurationKeysPublic. @@ -77,6 +92,7 @@ protected void serviceInit(Configuration conf) throws Exception { @Override protected void serviceStart() throws Exception { launcherHandlingThread.start(); + cleanupHandlingThread.start(); super.serviceStart(); } @@ -90,19 +106,31 @@ protected Runnable createRunnableLauncher(RMAppAttempt application, private void launch(RMAppAttempt application) { Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.LAUNCH); - masterEvents.add(launcher); + launcherEvents.add(launcher); } @Override protected void serviceStop() throws Exception { launcherHandlingThread.interrupt(); + cleanupHandlingThread.interrupt(); + try { launcherHandlingThread.join(); } catch (InterruptedException ie) { - LOG.info(launcherHandlingThread.getName() + " interrupted during join ", - ie); } + LOG.info(launcherHandlingThread.getName() + + " interrupted during join ", ie); + } + + try { + cleanupHandlingThread.join(); + } catch (InterruptedException ie) { + LOG.info(cleanupHandlingThread.getName() + + " interrupted during join ", ie); + } + launcherPool.shutdown(); + cleanupPool.shutdown(); } private class LauncherThread extends Thread { @@ -115,8 +143,9 @@ public LauncherThread() { public void run() { while (!this.isInterrupted()) { Runnable toLaunch; + try { - toLaunch = masterEvents.take(); + toLaunch = launcherEvents.take(); launcherPool.execute(toLaunch); } catch (InterruptedException e) { LOG.warn(this.getClass().getName() + " interrupted. Returning."); @@ -124,11 +153,33 @@ public void run() { } } } - } + } + + private class CleanupThread extends Thread { + + public CleanupThread() { + super("ApplicationMaster Cleanup"); + } + + @Override + public void run() { + while (!this.isInterrupted()) { + Runnable toCleanup; + + try { + toCleanup = cleanupEvents.take(); + cleanupPool.execute(toCleanup); + } catch (InterruptedException e) { + LOG.warn(this.getClass().getName() + " interrupted. Returning."); + return; + } + } + } + } private void cleanup(RMAppAttempt application) { Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.CLEANUP); - masterEvents.add(launcher); + cleanupEvents.add(launcher); } @Override