From 0b6679e7a6545efc6f9d6b9046a3e97549281a8f Mon Sep 17 00:00:00 2001 From: Yanqin Jin Date: Sun, 27 Aug 2017 19:26:34 -0700 Subject: [PATCH] Move addShutdownHook to ProcessUtils. Address review comments. Fix exception handling, variable naming, etc. --- .../src/main/java/alluxio/ProcessUtils.java | 23 ++++++++++ .../alluxio/logserver/AlluxioLogServer.java | 26 +++-------- .../logserver/AlluxioLogServerProcess.java | 44 +++++++++---------- 3 files changed, 49 insertions(+), 44 deletions(-) diff --git a/core/server/common/src/main/java/alluxio/ProcessUtils.java b/core/server/common/src/main/java/alluxio/ProcessUtils.java index f5f742b2cf2c..9f49704688d5 100644 --- a/core/server/common/src/main/java/alluxio/ProcessUtils.java +++ b/core/server/common/src/main/java/alluxio/ProcessUtils.java @@ -43,5 +43,28 @@ public static void run(Process process) { } } + /** + * Add a shutdown hook that will be invoked when a signal is sent to this process. + * + * The process may be utilizing some resources, and this shutdown hook will be invoked by + * JVM when a SIGTERM is sent to the process by "kill" command. The shutdown hook calls + * {@link Process#stop()} method to cleanly release the resources and exit. + * + * @param process the data structure representing the process to terminate + */ + public static void addShutdownHook(final Process process) { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + process.stop(); + } catch (Exception e) { + LOG.error("Failed to add a shutdown hook to process."); + System.exit(0); + } + } + }); + } + private ProcessUtils() {} // prevent instantiation } diff --git a/logserver/src/main/java/alluxio/logserver/AlluxioLogServer.java b/logserver/src/main/java/alluxio/logserver/AlluxioLogServer.java index 2470c674ea0a..208b3a9257ee 100644 --- a/logserver/src/main/java/alluxio/logserver/AlluxioLogServer.java +++ b/logserver/src/main/java/alluxio/logserver/AlluxioLogServer.java @@ -11,13 +11,16 @@ package alluxio.logserver; -import alluxio.Process; import alluxio.ProcessUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + /** * Alluxio log server receiving logs pushed from Alluxio servers. */ public final class AlluxioLogServer { + private static Logger LOG = LoggerFactory.getLogger(AlluxioLogServer.class); /** * Main entry point of {@link AlluxioLogServer}. * @@ -25,29 +28,10 @@ public final class AlluxioLogServer { */ public static void main(String[] args) { final AlluxioLogServerProcess process = new AlluxioLogServerProcess(args[0]); - addShutdownHook(process); + ProcessUtils.addShutdownHook(process); ProcessUtils.run(process); } - /** - * Add a shutdown hook that will be invoked when a signal is sent to this process. - * - * @param process the data structure representing the process to terminate - */ - private static void addShutdownHook(final Process process) { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - process.stop(); - } catch (Exception e) { - System.exit(0); - } - } - } - ); - } - /** * Private constructor to prevent user from instantiating any * {@link AlluxioLogServer} instance. diff --git a/logserver/src/main/java/alluxio/logserver/AlluxioLogServerProcess.java b/logserver/src/main/java/alluxio/logserver/AlluxioLogServerProcess.java index 893e451e19fa..ab50659ab883 100644 --- a/logserver/src/main/java/alluxio/logserver/AlluxioLogServerProcess.java +++ b/logserver/src/main/java/alluxio/logserver/AlluxioLogServerProcess.java @@ -52,9 +52,9 @@ */ public class AlluxioLogServerProcess implements LogServerProcess { private static final Logger LOG = LoggerFactory.getLogger(AlluxioLogServer.class); - private static final long STOP_TIMEOUT_IN_MS = 60000; - private static final int BASE_SLEEP_TIME_IN_MS = 50; - private static final int MAX_SLEEP_TIME_IN_MS = 30000; + private static final long STOP_TIMEOUT_MS = 60000; + private static final int BASE_SLEEP_TIME_MS = 50; + private static final int MAX_SLEEP_TIME_MS = 30000; private static final int MAX_NUM_RETRY = 20; private final String mBaseLogsDir; @@ -72,8 +72,11 @@ public class AlluxioLogServerProcess implements LogServerProcess { */ public AlluxioLogServerProcess(String baseLogsDir) { mPort = Configuration.getInt(PropertyKey.LOG_SERVER_PORT); - mMinNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MIN) + 2; - mMaxNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MAX) + 2; + // The log server serves the logging requests from Alluxio workers, Alluxio master, Alluxio + // secondary master, and Alluxio proxy. Therefore the number of threads required by + // log server is #workers + 1 (master) + 1 (secondary master) + 1 (proxy). + mMinNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MIN) + 3; + mMaxNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MAX) + 3; mBaseLogsDir = baseLogsDir; mStopped = true; } @@ -106,12 +109,12 @@ private void startServing() { new SynchronousQueue<>(); mThreadPool = new ThreadPoolExecutor(mMinNumberOfThreads, mMaxNumberOfThreads, - STOP_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS, synchronousQueue); + STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS, synchronousQueue); try { mServerSocket = new ServerSocket(mPort); } catch (IOException e) { LOG.error("Failed to bind to port {}.", mPort); - return; + throw new RuntimeException(e); } mStopped = false; while (!mStopped) { @@ -121,27 +124,22 @@ private void startServing() { AlluxioLog4jSocketNode clientSocketNode = new AlluxioLog4jSocketNode(this, client); RetryPolicy retryPolicy = new ExponentialBackoffRetry( - BASE_SLEEP_TIME_IN_MS, MAX_SLEEP_TIME_IN_MS, MAX_NUM_RETRY); + BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_NUM_RETRY); while (true) { try { mThreadPool.execute(clientSocketNode); break; - } catch (Throwable t) { - if (t instanceof RejectedExecutionException) { - if (!retryPolicy.attemptRetry()) { - LOG.warn("Connection with {} has been rejected by ExecutorService {} times" - + "till timedout, reason: {}", - inetAddress.getHostAddress(), retryPolicy.getRetryCount(), t); - client.close(); - break; - } - } else if (t instanceof Error) { - LOG.error("ExecutorService threw error: ", t); - throw (Error) t; - } else { - LOG.warn("ExecutorService threw error: ", t); + } catch (RejectedExecutionException e) { + if (!retryPolicy.attemptRetry()) { + LOG.warn("Connection with {} has been rejected by ExecutorService {} times" + + "till timedout, reason: {}", + inetAddress.getHostAddress(), retryPolicy.getRetryCount(), e); + client.close(); break; } + } catch (Error | Exception e) { + LOG.error("ExecutorService threw error: ", e); + throw e; } } } catch (IOException e) { @@ -168,7 +166,7 @@ private void stopServing() { } mThreadPool.shutdown(); - long timeoutMS = STOP_TIMEOUT_IN_MS; + long timeoutMS = STOP_TIMEOUT_MS; long now = System.currentTimeMillis(); while (timeoutMS >= 0) { try {