Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,15 @@ private static void addDeprecatedKeys() {
/** Number of threads used to launch/cleanup AM.*/
public static final String RM_AMLAUNCHER_THREAD_COUNT =
RM_PREFIX + "amlauncher.thread-count";

public static final int DEFAULT_RM_AMLAUNCHER_THREAD_COUNT = 50;

/** Number of threads used to cleanup AM.*/
public static final String RM_AMCLEANUP_THREAD_COUNT =
RM_PREFIX + "amcleanup.thread-count";

public static final int DEFAULT_RM_AMCLEANUP_THREAD_COUNT = 50;

/** Retry times to connect with NM.*/
public static final String RM_NODEMANAGER_CONNECT_RETRIES =
RM_PREFIX + "nodemanager-connect-retries";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,30 +40,45 @@ public class ApplicationMasterLauncher extends AbstractService implements
private static final Logger LOG = LoggerFactory.getLogger(
ApplicationMasterLauncher.class);
private ThreadPoolExecutor launcherPool;
private ThreadPoolExecutor cleanupPool;
private LauncherThread launcherHandlingThread;
private CleanupThread cleanupHandlingThread;


private final BlockingQueue<Runnable> masterEvents
private final BlockingQueue<Runnable> launcherEvents
= new LinkedBlockingQueue<Runnable>();
private final BlockingQueue<Runnable> cleanupEvents
= new LinkedBlockingQueue<Runnable>();

protected final RMContext context;

public ApplicationMasterLauncher(RMContext context) {
super(ApplicationMasterLauncher.class.getName());
this.context = context;
this.launcherHandlingThread = new LauncherThread();
this.cleanupHandlingThread = new CleanupThread();
}

@Override
protected void serviceInit(Configuration conf) throws Exception {
int threadCount = conf.getInt(
int launcherThreadCount = conf.getInt(
YarnConfiguration.RM_AMLAUNCHER_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_AMLAUNCHER_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder()
ThreadFactory tfLauncher = new ThreadFactoryBuilder()
.setNameFormat("ApplicationMasterLauncher #%d")
.build();
launcherPool = new ThreadPoolExecutor(threadCount, threadCount, 1,
launcherPool = new ThreadPoolExecutor(launcherThreadCount, launcherThreadCount, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
launcherPool.setThreadFactory(tf);
launcherPool.setThreadFactory(tfLauncher);

int cleanupThreadCount = conf.getInt(
YarnConfiguration.RM_AMCLEANUP_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_AMCLEANUP_THREAD_COUNT);
ThreadFactory tfCleanup = new ThreadFactoryBuilder()
.setNameFormat("ApplicationMasterLauncher #%d")
.build();
cleanupPool = new ThreadPoolExecutor(cleanupThreadCount, cleanupThreadCount, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());
cleanupPool.setThreadFactory(tfCleanup);

Configuration newConf = new YarnConfiguration(conf);
newConf.setInt(CommonConfigurationKeysPublic.
Expand All @@ -77,6 +92,7 @@ protected void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
launcherHandlingThread.start();
cleanupHandlingThread.start();
super.serviceStart();
}

Expand All @@ -90,19 +106,31 @@ protected Runnable createRunnableLauncher(RMAppAttempt application,
private void launch(RMAppAttempt application) {
Runnable launcher = createRunnableLauncher(application,
AMLauncherEventType.LAUNCH);
masterEvents.add(launcher);
launcherEvents.add(launcher);
}


@Override
protected void serviceStop() throws Exception {
launcherHandlingThread.interrupt();
cleanupHandlingThread.interrupt();

try {
launcherHandlingThread.join();
} catch (InterruptedException ie) {
LOG.info(launcherHandlingThread.getName() + " interrupted during join ",
ie); }
LOG.info(launcherHandlingThread.getName()
+ " interrupted during join ", ie);
}

try {
cleanupHandlingThread.join();
} catch (InterruptedException ie) {
LOG.info(cleanupHandlingThread.getName()
+ " interrupted during join ", ie);
}

launcherPool.shutdown();
cleanupPool.shutdown();
}

private class LauncherThread extends Thread {
Expand All @@ -115,20 +143,43 @@ public LauncherThread() {
public void run() {
while (!this.isInterrupted()) {
Runnable toLaunch;

try {
toLaunch = masterEvents.take();
toLaunch = launcherEvents.take();
launcherPool.execute(toLaunch);
} catch (InterruptedException e) {
LOG.warn(this.getClass().getName() + " interrupted. Returning.");
return;
}
}
}
}
}

private class CleanupThread extends Thread {

public CleanupThread() {
super("ApplicationMaster Cleanup");
}

@Override
public void run() {
while (!this.isInterrupted()) {
Runnable toCleanup;

try {
toCleanup = cleanupEvents.take();
cleanupPool.execute(toCleanup);
} catch (InterruptedException e) {
LOG.warn(this.getClass().getName() + " interrupted. Returning.");
return;
}
}
}
}

private void cleanup(RMAppAttempt application) {
Runnable launcher = createRunnableLauncher(application, AMLauncherEventType.CLEANUP);
masterEvents.add(launcher);
cleanupEvents.add(launcher);
}

@Override
Expand Down