Skip to content

Commit

Permalink
Move addShutdownHook to ProcessUtils.
Browse files Browse the repository at this point in the history
Address review comments. Fix exception handling, variable naming, etc.
  • Loading branch information
riversand9 committed Aug 28, 2017
1 parent 6226329 commit 0b6679e
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 44 deletions.
23 changes: 23 additions & 0 deletions core/server/common/src/main/java/alluxio/ProcessUtils.java
Expand Up @@ -43,5 +43,28 @@ public static void run(Process process) {
}
}

/**
* Add a shutdown hook that will be invoked when a signal is sent to this process.
*
* The process may be utilizing some resources, and this shutdown hook will be invoked by
* JVM when a SIGTERM is sent to the process by "kill" command. The shutdown hook calls
* {@link Process#stop()} method to cleanly release the resources and exit.
*
* @param process the data structure representing the process to terminate
*/
public static void addShutdownHook(final Process process) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
process.stop();
} catch (Exception e) {
LOG.error("Failed to add a shutdown hook to process.");
System.exit(0);
}
}
});
}

private ProcessUtils() {} // prevent instantiation
}
26 changes: 5 additions & 21 deletions logserver/src/main/java/alluxio/logserver/AlluxioLogServer.java
Expand Up @@ -11,43 +11,27 @@

package alluxio.logserver;

import alluxio.Process;
import alluxio.ProcessUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Alluxio log server receiving logs pushed from Alluxio servers.
*/
public final class AlluxioLogServer {
private static Logger LOG = LoggerFactory.getLogger(AlluxioLogServer.class);
/**
* Main entry point of {@link AlluxioLogServer}.
*
* @param args command line arguments that will be parsed to initialize {@link AlluxioLogServer}
*/
public static void main(String[] args) {
final AlluxioLogServerProcess process = new AlluxioLogServerProcess(args[0]);
addShutdownHook(process);
ProcessUtils.addShutdownHook(process);
ProcessUtils.run(process);
}

/**
* Add a shutdown hook that will be invoked when a signal is sent to this process.
*
* @param process the data structure representing the process to terminate
*/
private static void addShutdownHook(final Process process) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
process.stop();
} catch (Exception e) {
System.exit(0);
}
}
}
);
}

/**
* Private constructor to prevent user from instantiating any
* {@link AlluxioLogServer} instance.
Expand Down
Expand Up @@ -52,9 +52,9 @@
*/
public class AlluxioLogServerProcess implements LogServerProcess {
private static final Logger LOG = LoggerFactory.getLogger(AlluxioLogServer.class);
private static final long STOP_TIMEOUT_IN_MS = 60000;
private static final int BASE_SLEEP_TIME_IN_MS = 50;
private static final int MAX_SLEEP_TIME_IN_MS = 30000;
private static final long STOP_TIMEOUT_MS = 60000;
private static final int BASE_SLEEP_TIME_MS = 50;
private static final int MAX_SLEEP_TIME_MS = 30000;
private static final int MAX_NUM_RETRY = 20;

private final String mBaseLogsDir;
Expand All @@ -72,8 +72,11 @@ public class AlluxioLogServerProcess implements LogServerProcess {
*/
public AlluxioLogServerProcess(String baseLogsDir) {
mPort = Configuration.getInt(PropertyKey.LOG_SERVER_PORT);
mMinNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MIN) + 2;
mMaxNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MAX) + 2;
// The log server serves the logging requests from Alluxio workers, Alluxio master, Alluxio
// secondary master, and Alluxio proxy. Therefore the number of threads required by
// log server is #workers + 1 (master) + 1 (secondary master) + 1 (proxy).
mMinNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MIN) + 3;
mMaxNumberOfThreads = Configuration.getInt(PropertyKey.MASTER_WORKER_THREADS_MAX) + 3;
mBaseLogsDir = baseLogsDir;
mStopped = true;
}
Expand Down Expand Up @@ -106,12 +109,12 @@ private void startServing() {
new SynchronousQueue<>();
mThreadPool =
new ThreadPoolExecutor(mMinNumberOfThreads, mMaxNumberOfThreads,
STOP_TIMEOUT_IN_MS, TimeUnit.MILLISECONDS, synchronousQueue);
STOP_TIMEOUT_MS, TimeUnit.MILLISECONDS, synchronousQueue);
try {
mServerSocket = new ServerSocket(mPort);
} catch (IOException e) {
LOG.error("Failed to bind to port {}.", mPort);
return;
throw new RuntimeException(e);
}
mStopped = false;
while (!mStopped) {
Expand All @@ -121,27 +124,22 @@ private void startServing() {
AlluxioLog4jSocketNode clientSocketNode =
new AlluxioLog4jSocketNode(this, client);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(
BASE_SLEEP_TIME_IN_MS, MAX_SLEEP_TIME_IN_MS, MAX_NUM_RETRY);
BASE_SLEEP_TIME_MS, MAX_SLEEP_TIME_MS, MAX_NUM_RETRY);
while (true) {
try {
mThreadPool.execute(clientSocketNode);
break;
} catch (Throwable t) {
if (t instanceof RejectedExecutionException) {
if (!retryPolicy.attemptRetry()) {
LOG.warn("Connection with {} has been rejected by ExecutorService {} times"
+ "till timedout, reason: {}",
inetAddress.getHostAddress(), retryPolicy.getRetryCount(), t);
client.close();
break;
}
} else if (t instanceof Error) {
LOG.error("ExecutorService threw error: ", t);
throw (Error) t;
} else {
LOG.warn("ExecutorService threw error: ", t);
} catch (RejectedExecutionException e) {
if (!retryPolicy.attemptRetry()) {
LOG.warn("Connection with {} has been rejected by ExecutorService {} times"
+ "till timedout, reason: {}",
inetAddress.getHostAddress(), retryPolicy.getRetryCount(), e);
client.close();
break;
}
} catch (Error | Exception e) {
LOG.error("ExecutorService threw error: ", e);
throw e;
}
}
} catch (IOException e) {
Expand All @@ -168,7 +166,7 @@ private void stopServing() {
}

mThreadPool.shutdown();
long timeoutMS = STOP_TIMEOUT_IN_MS;
long timeoutMS = STOP_TIMEOUT_MS;
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
Expand Down

0 comments on commit 0b6679e

Please sign in to comment.