From af6567336353e8c1baa9a00dfd682c032784cb0a Mon Sep 17 00:00:00 2001 From: drak3 Date: Wed, 21 Mar 2012 17:58:02 +0100 Subject: [PATCH] [Process] Added support for non-blocking process control Added methods to control long running processes to the Process class: - A non blocking start method to startup a process and return immediately - A blocking waitForTermination method to wait for the processes termination - A stop method to stop a process started with start All status-getters like getOutput were changed to return real-time data --- src/Symfony/Component/Process/Process.php | 280 ++++++++++++++---- .../Tests/Component/Process/ProcessTest.php | 35 +++ 2 files changed, 264 insertions(+), 51 deletions(-) diff --git a/src/Symfony/Component/Process/Process.php b/src/Symfony/Component/Process/Process.php index d578278afa73..7bc196c71b1e 100644 --- a/src/Symfony/Component/Process/Process.php +++ b/src/Symfony/Component/Process/Process.php @@ -24,6 +24,14 @@ class Process const ERR = 'err'; const OUT = 'out'; + const STATUS_READY = 'ready'; + const STATUS_STARTED = 'started'; + const STATUS_TERMINATED = 'terminated'; + + const STDIN = 0; + const STDOUT = 1; + const STDERR = 2; + private $commandline; private $cwd; private $env; @@ -31,10 +39,13 @@ class Process private $timeout; private $options; private $exitcode; - private $status; + private $processInformation; private $stdout; private $stderr; private $enhanceWindowsCompatibility; + private $pipes; + private $process; + private $status = self::STATUS_READY; /** * Exit codes translation table. @@ -141,24 +152,36 @@ public function __construct($commandline, $cwd = null, array $env = null, $stdin */ public function run($callback = null) { - $this->stdout = ''; - $this->stderr = ''; - $that = $this; - $out = self::OUT; - $err = self::ERR; - $callback = function ($type, $data) use ($that, $callback, $out, $err) - { - if ($out == $type) { - $that->addOutput($data); - } else { - $that->addErrorOutput($data); - } + $this->start($callback); - if (null !== $callback) { - call_user_func($callback, $type, $data); - } - }; + return $this->waitForTermination($callback); + } + /** + * Starts the process and returns after sending the stdin. + * This method blocks until all stdin data is sent to the process then it returns while the pricess runs in the background. + * The termination of the process can be awaited with waitForTermination + * + * The callback receives the type of output (out or err) and + * some bytes from the output in real-time while writing the standard input to the process. It allows to have feedback + * from the independent process during execution. + * If there is no callback passed, the waitForTermination() method can be called with true as a second parameter + * then the callback will get all data occured in (and since) the start call + * + * @param Closure|string|array $callback A PHP callback to run whenever there is some + * output available on STDOUT or STDERR + * + * @throws \RuntimeException When process can't be launch or is stopped + * @throws \RuntimeException When process is already running + */ + public function start($callback = null) + { + if($this->isRunning()) { + throw new \RuntimeException('Process is already running'); + } + $this->stdout = ''; + $this->stderr = ''; + $callback = $this->buildCallback($callback); $descriptors = array(array('pipe', 'r'), array('pipe', 'w'), array('pipe', 'w')); $commandline = $this->commandline; @@ -170,28 +193,30 @@ public function run($callback = null) } } - $process = proc_open($commandline, $descriptors, $pipes, $this->cwd, $this->env, $this->options); + $this->process = proc_open($commandline, $descriptors, $this->pipes, $this->cwd, $this->env, $this->options); - if (!is_resource($process)) { + if (!is_resource($this->process)) { throw new \RuntimeException('Unable to launch a new process.'); } + $this->status = self::STATUS_STARTED; - foreach ($pipes as $pipe) { + foreach ($this->pipes as $pipe) { stream_set_blocking($pipe, false); } - if (null === $this->stdin) { - fclose($pipes[0]); - $writePipes = null; + if(null === $this->stdin) { + fclose($this->pipes[0]); + + return; } else { - $writePipes = array($pipes[0]); + $writePipes = array($this->pipes[0]); $stdinLen = strlen($this->stdin); $stdinOffset = 0; } - unset($pipes[0]); + unset($this->pipes[0]); - while ($pipes || $writePipes) { - $r = $pipes; + while ($writePipes) { + $r = $this->pipes; $w = $writePipes; $e = null; @@ -200,7 +225,7 @@ public function run($callback = null) if (false === $n) { break; } elseif ($n === 0) { - proc_terminate($process); + proc_terminate($this->process); throw new \RuntimeException('The process timed out.'); } @@ -217,56 +242,99 @@ public function run($callback = null) } foreach ($r as $pipe) { - $type = array_search($pipe, $pipes); + $type = array_search($pipe, $this->pipes); $data = fread($pipe, 8192); if (strlen($data) > 0) { - call_user_func($callback, $type == 1 ? $out : $err, $data); + call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data); } if (false === $data || feof($pipe)) { fclose($pipe); - unset($pipes[$type]); + unset($this->pipes[$type]); } } } - $this->status = proc_get_status($process); + $this->processInformation = proc_get_status($this->process); + } + + /** + * Waits for the process to terminate + * + * The callback receives the type of output (out or err) and + * some bytes from the output in real-time while writing the standard input to the process. It allows to have feedback + * from the independent process during execution. + * + * @param Closure|string|array $callback + * @param boolean $invokeCallbackWithStartData + * @return int the exitcode of the process + * @throws \RuntimeException + */ + public function waitForTermination($callback = null) + { + $this->processInformation = proc_get_status($this->process); + $callback = $this->buildCallback($callback); + while ($this->pipes) { + $r = $this->pipes; + $w = null; + $e = null; + + $n = @stream_select($r, $w, $e, $this->timeout); + + if (false === $n) { + break; + } elseif ($n === 0) { + proc_terminate($this->process); + + throw new \RuntimeException('The process timed out.'); + } + + foreach ($r as $pipe) { + $type = array_search($pipe, $this->pipes); + $data = fread($pipe, 8192); + if (strlen($data) > 0) { + call_user_func($callback, $type == 1 ? self::OUT : self::ERR, $data); + } + if (false === $data || feof($pipe)) { + fclose($pipe); + unset($this->pipes[$type]); + } + } + } + $this->updateStatus(); + if ($this->processInformation['signaled']) { + throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->processInformation['stopsig'])); + } $time = 0; - while (1 == $this->status['running'] && $time < 1000000) { + while ($this->isRunning() && $time < 1000000) { $time += 1000; usleep(1000); - $this->status = proc_get_status($process); } - $exitcode = proc_close($process); + $exitcode = proc_close($this->process); - if ($this->status['signaled']) { - throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->status['stopsig'])); + if ($this->processInformation['signaled']) { + throw new \RuntimeException(sprintf('The process stopped because of a "%s" signal.', $this->processInformation['stopsig'])); } - return $this->exitcode = $this->status['running'] ? $exitcode : $this->status['exitcode']; + return $this->exitcode = $this->processInformation['running'] ? $exitcode : $this->processInformation['exitcode']; } /** - * Returns the output of the process (STDOUT). - * - * This only returns the output if you have not supplied a callback - * to the run() method. - * + * Returns the current output of the process (STDOUT). * @return string The process output * * @api */ public function getOutput() { + $this->updateOutput(); + return $this->stdout; } /** - * Returns the error output of the process (STDERR). - * - * This only returns the error output if you have not supplied a callback - * to the run() method. + * Returns the current error output of the process (STDERR). * * @return string The process error output * @@ -274,6 +342,8 @@ public function getOutput() */ public function getErrorOutput() { + $this->updateErrorOutput(); + return $this->stderr; } @@ -286,6 +356,8 @@ public function getErrorOutput() */ public function getExitCode() { + $this->updateStatus(); + return $this->exitcode; } @@ -302,6 +374,8 @@ public function getExitCode() */ public function getExitCodeText() { + $this->updateStatus(); + return isset(self::$exitCodes[$this->exitcode]) ? self::$exitCodes[$this->exitcode] : 'Unknown error'; } @@ -314,6 +388,8 @@ public function getExitCodeText() */ public function isSuccessful() { + $this->updateStatus(); + return 0 == $this->exitcode; } @@ -328,7 +404,9 @@ public function isSuccessful() */ public function hasBeenSignaled() { - return $this->status['signaled']; + $this->updateStatus(); + + return $this->processInformation['signaled']; } /** @@ -342,7 +420,9 @@ public function hasBeenSignaled() */ public function getTermSignal() { - return $this->status['termsig']; + $this->updateStatus(); + + return $this->processInformation['termsig']; } /** @@ -356,7 +436,9 @@ public function getTermSignal() */ public function hasBeenStopped() { - return $this->status['stopped']; + $this->updateStatus(); + + return $this->processInformation['stopped']; } /** @@ -370,7 +452,49 @@ public function hasBeenStopped() */ public function getStopSignal() { - return $this->status['stopsig']; + $this->updateStatus(); + + return $this->processInformation['stopsig']; + } + + /** + * Returns if the process is currently running + * @return boolean + */ + public function isRunning() { + if(self::STATUS_STARTED === $this->status) { + $this->updateStatus(); + if($this->processInformation['running'] === false) { + $this->status = self::STATUS_TERMINATED; + } + + return $this->processInformation['running']; + } + + return false; + } + + /** + * Stops the process + * @param float $timeout the timeout in seconds + * @return int the exitcode of the process + * @throws \RuntimeException if the process got signaled + */ + public function stop($timeout=10) { + $timeoutMicro = (int) $timeout*10E6; + if($this->isRunning()) { + proc_terminate($this->process); + $time = 0; + while (1 == $this->isRunning() && $time < $timeoutMicro) { + $time += 1000; + usleep(1000); + } + $exitcode = proc_close($this->process); + $this->exitcode = -1 === $this->processInformation['exitcode'] ? $exitcode : $this->processInformation['exitcode']; + } + $this->status = self::STATUS_TERMINATED; + + return $this->exitcode; } public function addOutput($line) @@ -452,4 +576,58 @@ public function setEnhanceWindowsCompatibility($enhance) { $this->enhanceWindowsCompatibility = (Boolean) $enhance; } + + /** + * Builds up the callback used by waitForTermination + * The callbacks adds all occured output to the specific buffer and calls the usercallback (if present) with the received output + * @param callable $callback the userdefined callback + * @return callable + */ + protected function buildCallback($callback) { + $that = $this; + $out = self::OUT; + $err = self::ERR; + $callback = function ($type, $data) use ($that, $callback, $out, $err) + { + if ($out == $type) { + $that->addOutput($data); + } else { + $that->addErrorOutput($data); + } + + if (null !== $callback) { + call_user_func($callback, $type, $data); + } + }; + + return $callback; + } + + /** + * If the process was started, its status is updated + */ + protected function updateStatus() + { + if(self::STATUS_STARTED === $this->status) { + $this->processInformation = proc_get_status($this->process); + if(!$this->processInformation['running']) { + $this->status = self::STATUS_TERMINATED; + if(-1 !== $this->processInformation['exitcode']) { + $this->exitcode = $this->processInformation['exitcode']; + } + } + } + } + + protected function updateErrorOutput() { + if(isset($this->pipes[self::STDERR]) && is_resource($this->pipes[self::STDERR])) { + $this->addErrorOutput(stream_get_contents($this->pipes[self::STDERR])); + } + } + + protected function updateOutput() { + if(isset($this->pipes[self::STDOUT]) && is_resource($this->pipes[self::STDOUT])) { + $this->addOutput(stream_get_contents($this->pipes[self::STDOUT])); + } + } } diff --git a/tests/Symfony/Tests/Component/Process/ProcessTest.php b/tests/Symfony/Tests/Component/Process/ProcessTest.php index e36820c72808..ffa7e9ee1e20 100644 --- a/tests/Symfony/Tests/Component/Process/ProcessTest.php +++ b/tests/Symfony/Tests/Component/Process/ProcessTest.php @@ -79,6 +79,41 @@ public function testExitCodeText() $this->assertEquals('Misuse of shell builtins', $process->getExitCodeText()); } + public function testStartIsNonBlocking() + { + $process = new Process('php -r "sleep(4);"'); + $start = microtime(true); + $process->start(); + $end = microtime(true); + $this->assertLessThan(1 , $end-$start); + } + + public function testUpdateStatus() { + $process = new Process('php -h'); + $process->start(); + usleep(0.05E6); //wait for output + $this->assertEquals(0, $process->getExitCode()); + $this->assertTrue(strlen($process->getOutput()) > 0 ); + } + + public function testIsRunning() { + $process = new Process('php -r "sleep(1);"'); + $this->assertFalse($process->isRunning()); + $process->start(); + $this->assertTrue($process->isRunning()); + $process->waitForTermination(); + $this->assertFalse($process->isRunning()); + } + + public function testStop() { + $process = new Process('php -r "while(true){}"'); + $process->start(); + $this->assertTrue($process->isRunning()); + $process->stop(); + $this->assertFalse($process->isRunning()); + $this->assertTrue($process->hasBeenSignaled()); + } + public function responsesCodeProvider() { return array(