Skip to content
Browse files

AZK-126: Fix race condition in AzkabanProcess

  • Loading branch information...
1 parent 7d0f432 commit 947d01299f6fc65651b61e81888714d64f54d6e8 Joe Crobak committed with Ibrahim Ulukaya Sep 27, 2011
Showing with 189 additions and 166 deletions.
  1. +189 −166 azkaban/src/java/azkaban/util/process/AzkabanProcess.java
View
355 azkaban/src/java/azkaban/util/process/AzkabanProcess.java
@@ -29,228 +29,251 @@
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
-import com.google.common.base.Joiner;
-
import azkaban.util.CircularBuffer;
+import com.google.common.base.Joiner;
+
/**
* A less shitty version of java.lang.Process.
*
- * Output is read by seperate threads to avoid deadlock and logged to log4j loggers.
+ * Output is read by seperate threads to avoid deadlock and logged to log4j
+ * loggers.
*
* @author jkreps
- *
+ *
*/
public class AzkabanProcess {
- private final String workingDir;
- private final List<String> cmd;
- private final Map<String, String> env;
- private final Logger logger;
- private final CountDownLatch startupLatch;
- private final CountDownLatch completeLatch;
- private volatile int processId;
- private volatile Process process;
-
- public AzkabanProcess(List<String> cmd, Map<String, String> env, String workingDir, Logger logger) {
- this.cmd = cmd;
- this.env = env;
- this.workingDir = workingDir;
- this.processId = -1;
- this.startupLatch = new CountDownLatch(1);
- this.completeLatch = new CountDownLatch(1);
- this.logger = logger;
- }
-
- /**
- * Execute this process, blocking until it has completed.
- */
- public void run() throws IOException {
- if(this.isStarted() || this.isComplete())
- throw new IllegalStateException("The process can only be used once.");
-
- ProcessBuilder builder = new ProcessBuilder(cmd);
- builder.directory(new File(workingDir));
- builder.environment().putAll(env);
- this.process = builder.start();
- this.startupLatch.countDown();
- LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(
- process.getInputStream()), logger, Level.INFO, 30);
- LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(process
- .getErrorStream()), logger, Level.ERROR, 30);
-
- this.processId = processId(process);
- if (processId == 0)
- logger.debug("Spawned thread with unknown process id");
- else
- logger.debug("Spawned thread with process id " + processId);
-
- outputGobbler.start();
- errorGobbler.start();
- int exitCode = -1;
- try {
- exitCode = process.waitFor();
- } catch (InterruptedException e) {
- logger.info("Process interrupted.", e);
- }
-
- completeLatch.countDown();
- if (exitCode != 0)
- throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
-
- // try to wait for everything to get logged out before exiting
- outputGobbler.awaitCompletion(5000);
- errorGobbler.awaitCompletion(5000);
- }
-
- /**
- * Await the completion of this process
- *
- * @throws InterruptedException if the thread is interrupted while waiting.
- */
- public void awaitCompletion() throws InterruptedException {
- this.completeLatch.await();
- }
-
- /**
+ private final String workingDir;
+ private final List<String> cmd;
+ private final Map<String, String> env;
+ private final Logger logger;
+ private final CountDownLatch startupLatch;
+ private final CountDownLatch completeLatch;
+ private volatile int processId;
+ private volatile Process process;
+
+ public AzkabanProcess(final List<String> cmd,
+ final Map<String, String> env, final String workingDir,
+ final Logger logger) {
+ this.cmd = cmd;
+ this.env = env;
+ this.workingDir = workingDir;
+ this.processId = -1;
+ this.startupLatch = new CountDownLatch(1);
+ this.completeLatch = new CountDownLatch(1);
+ this.logger = logger;
+ }
+
+ /**
+ * Execute this process, blocking until it has completed.
+ */
+ public void run() throws IOException {
+ if (this.isStarted() || this.isComplete()) {
+ throw new IllegalStateException(
+ "The process can only be used once.");
+ }
+
+ ProcessBuilder builder = new ProcessBuilder(cmd);
+ builder.directory(new File(workingDir));
+ builder.environment().putAll(env);
+ this.process = builder.start();
+ this.processId = processId(process);
+ if (processId == 0) {
+ logger.debug("Spawned thread with unknown process id");
+ } else {
+ logger.debug("Spawned thread with process id " + processId);
+ }
+
+ this.startupLatch.countDown();
+
+ LogGobbler outputGobbler = new LogGobbler(new InputStreamReader(
+ process.getInputStream()), logger, Level.INFO, 30);
+ LogGobbler errorGobbler = new LogGobbler(new InputStreamReader(
+ process.getErrorStream()), logger, Level.ERROR, 30);
+
+ outputGobbler.start();
+ errorGobbler.start();
+ int exitCode = -1;
+ try {
+ exitCode = process.waitFor();
+ } catch (InterruptedException e) {
+ logger.info("Process interrupted.", e);
+ }
+
+ completeLatch.countDown();
+ if (exitCode != 0) {
+ throw new ProcessFailureException(exitCode,
+ errorGobbler.getRecentLog());
+ }
+
+ // try to wait for everything to get logged out before exiting
+ outputGobbler.awaitCompletion(5000);
+ errorGobbler.awaitCompletion(5000);
+ }
+
+ /**
+ * Await the completion of this process
+ *
+ * @throws InterruptedException
+ * if the thread is interrupted while waiting.
+ */
+ public void awaitCompletion() throws InterruptedException {
+ this.completeLatch.await();
+ }
+
+ /**
* Await the start of this process
*
- * @throws InterruptedException if the thread is interrupted while waiting.
+ * @throws InterruptedException
+ * if the thread is interrupted while waiting.
+ */
+ public void awaitStartup() throws InterruptedException {
+ this.startupLatch.await();
+ }
+
+ /**
+ * Get the process id for this process, if it has started.
+ *
+ * @return The process id or -1 if it cannot be fetched
*/
- public void awaitStartup() throws InterruptedException {
- this.startupLatch.await();
- }
-
- /**
- * Get the process id for this process, if it has started.
- * @return The process id or -1 if it cannot be fetched
- */
- public int getProcessId() {
- checkStarted();
- return this.processId;
- }
-
- /**
- * Attempt to kill the process, waiting up to the given time for it to die
- * @param time The amount of time to wait
- * @param unit The time unit
- * @return true iff this soft kill kills the process in the given wait time.
- */
- public boolean softKill(long time, TimeUnit unit) throws InterruptedException {
- checkStarted();
- if(processId != 0 && isStarted()) {
+ public int getProcessId() {
+ checkStarted();
+ return this.processId;
+ }
+
+ /**
+ * Attempt to kill the process, waiting up to the given time for it to die
+ *
+ * @param time
+ * The amount of time to wait
+ * @param unit
+ * The time unit
+ * @return true iff this soft kill kills the process in the given wait time.
+ */
+ public boolean softKill(final long time, final TimeUnit unit)
+ throws InterruptedException {
+ checkStarted();
+ if (processId != 0 && isStarted()) {
try {
Runtime.getRuntime().exec("kill " + processId);
return completeLatch.await(time, unit);
- } catch(IOException e) {
- logger.error("Kill attempt failed.", e);
+ } catch (IOException e) {
+ logger.error("Kill attempt failed.", e);
}
return false;
}
return false;
- }
-
- /**
- * Force kill this process
- */
+ }
+
+ /**
+ * Force kill this process
+ */
public void hardKill() {
- checkStarted();
- if(isRunning())
- process.destroy();
+ checkStarted();
+ if (isRunning()) {
+ process.destroy();
+ }
}
-
+
/**
* Attempt to get the process id for this process
- * @param process The process to get the id from
+ *
+ * @param process
+ * The process to get the id from
* @return The id of the process
*/
- private int processId(java.lang.Process process) {
- checkStarted();
+ private int processId(final java.lang.Process process) {
int processId = 0;
try {
Field f = process.getClass().getDeclaredField("pid");
f.setAccessible(true);
processId = f.getInt(process);
- } catch(Throwable e) {
- e.printStackTrace();
+ } catch (Throwable e) {
+ e.printStackTrace();
}
return processId;
}
-
+
/**
* @return true iff the process has been started
*/
public boolean isStarted() {
- return startupLatch.getCount() == 0L;
+ return startupLatch.getCount() == 0L;
}
-
+
/**
* @return true iff the process has completed
*/
public boolean isComplete() {
- return completeLatch.getCount() == 0L;
+ return completeLatch.getCount() == 0L;
}
-
+
/**
* @return true iff the process is currently running
*/
public boolean isRunning() {
- return isStarted() && !isComplete();
+ return isStarted() && !isComplete();
}
-
+
public void checkStarted() {
- if(!isStarted())
- throw new IllegalStateException("Process has not yet started.");
+ if (!isStarted()) {
+ throw new IllegalStateException("Process has not yet started.");
+ }
}
-
+
@Override
public String toString() {
- return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env + ", cwd = " + workingDir + ")";
+ return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
+ + ", cwd = " + workingDir + ")";
}
- private static class LogGobbler extends Thread {
-
- private final BufferedReader inputReader;
- private final Logger logger;
- private final Level loggingLevel;
- private final CircularBuffer<String> buffer;
-
- public LogGobbler(Reader inputReader, Logger logger, Level level, int bufferLines) {
- this.inputReader = new BufferedReader(inputReader);
- this.logger = logger;
- this.loggingLevel = level;
- buffer = new CircularBuffer<String>(bufferLines);
- }
-
- public void run() {
- try {
- while (!Thread.currentThread().isInterrupted()) {
- String line = inputReader.readLine();
- if (line == null)
- return;
-
- buffer.append(line);
- logger.log(loggingLevel, line);
- }
- } catch (IOException e) {
- logger.error("Error reading from logging stream:", e);
- }
- }
-
- public void awaitCompletion(long waitMs) {
- try {
- join(waitMs);
- } catch(InterruptedException e) {
- logger.info("I/O thread interrupted.", e);
- }
- }
-
- public String getRecentLog() {
- return Joiner.on(System.getProperty("line.separator")).join(buffer);
- }
-
- }
+ private static class LogGobbler extends Thread {
+
+ private final BufferedReader inputReader;
+ private final Logger logger;
+ private final Level loggingLevel;
+ private final CircularBuffer<String> buffer;
+
+ public LogGobbler(final Reader inputReader, final Logger logger,
+ final Level level, final int bufferLines) {
+ this.inputReader = new BufferedReader(inputReader);
+ this.logger = logger;
+ this.loggingLevel = level;
+ buffer = new CircularBuffer<String>(bufferLines);
+ }
+
+ @Override
+ public void run() {
+ try {
+ while (!Thread.currentThread().isInterrupted()) {
+ String line = inputReader.readLine();
+ if (line == null) {
+ return;
+ }
+
+ buffer.append(line);
+ logger.log(loggingLevel, line);
+ }
+ } catch (IOException e) {
+ logger.error("Error reading from logging stream:", e);
+ }
+ }
+
+ public void awaitCompletion(final long waitMs) {
+ try {
+ join(waitMs);
+ } catch (InterruptedException e) {
+ logger.info("I/O thread interrupted.", e);
+ }
+ }
+
+ public String getRecentLog() {
+ return Joiner.on(System.getProperty("line.separator")).join(buffer);
+ }
+
+ }
}

0 comments on commit 947d012

Please sign in to comment.
Something went wrong with that request. Please try again.