From 3d20b6c46b6c473940b53653a06280d47d45f6e2 Mon Sep 17 00:00:00 2001 From: Nicolas Grekas Date: Thu, 31 Mar 2016 17:15:42 +0200 Subject: [PATCH] [Process] Add InputStream to seamlessly feed running processes --- src/Symfony/Component/Process/InputStream.php | 90 +++++++++++++++++ .../Component/Process/Pipes/AbstractPipes.php | 18 +++- src/Symfony/Component/Process/Process.php | 6 +- .../Component/Process/ProcessBuilder.php | 2 +- .../Component/Process/Tests/ProcessTest.php | 99 ++++++++++++++++--- 5 files changed, 192 insertions(+), 23 deletions(-) create mode 100644 src/Symfony/Component/Process/InputStream.php diff --git a/src/Symfony/Component/Process/InputStream.php b/src/Symfony/Component/Process/InputStream.php new file mode 100644 index 000000000000..831b10932599 --- /dev/null +++ b/src/Symfony/Component/Process/InputStream.php @@ -0,0 +1,90 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +namespace Symfony\Component\Process; + +use Symfony\Component\Process\Exception\RuntimeException; + +/** + * Provides a way to continuously write to the input of a Process until the InputStream is closed. + * + * @author Nicolas Grekas + */ +class InputStream implements \IteratorAggregate +{ + private $onEmpty = null; + private $input = array(); + private $open = true; + + /** + * Sets a callback that is called when the write buffer becomes empty. + */ + public function onEmpty(callable $onEmpty = null) + { + $this->onEmpty = $onEmpty; + } + + /** + * Appends an input to the write buffer. + * + * @param resource|scalar|\Traversable|null The input to append as stream resource, scalar or \Traversable + */ + public function write($input) + { + if (null === $input) { + return; + } + if ($this->isClosed()) { + throw new RuntimeException(sprintf('%s is closed', static::class)); + } + $this->input[] = ProcessUtils::validateInput(__METHOD__, $input); + } + + /** + * Closes the write buffer. + */ + public function close() + { + $this->open = false; + } + + /** + * Tells whether the write buffer is closed or not. + */ + public function isClosed() + { + return !$this->open; + } + + public function getIterator() + { + $this->open = true; + + while ($this->open || $this->input) { + if (!$this->input) { + yield ''; + continue; + } + $current = array_shift($this->input); + + if ($current instanceof \Iterator) { + foreach ($current as $cur) { + yield $cur; + } + } else { + yield $current; + } + if (!$this->input && $this->open && null !== $onEmpty = $this->onEmpty) { + $this->write($onEmpty($this)); + } + } + } +} diff --git a/src/Symfony/Component/Process/Pipes/AbstractPipes.php b/src/Symfony/Component/Process/Pipes/AbstractPipes.php index 73c4e5cf5a50..3bf2cd469c87 100644 --- a/src/Symfony/Component/Process/Pipes/AbstractPipes.php +++ b/src/Symfony/Component/Process/Pipes/AbstractPipes.php @@ -11,6 +11,8 @@ namespace Symfony\Component\Process\Pipes; +use Symfony\Component\Process\Exception\InvalidArgumentException; + /** * @author Romain Neutron * @@ -23,7 +25,7 @@ abstract class AbstractPipes implements PipesInterface /** @var string */ private $inputBuffer = ''; - /** @var resource|\Iterator|null */ + /** @var resource|scalar|\Iterator|null */ private $input; /** @var bool */ private $blocked = true; @@ -84,6 +86,8 @@ protected function unblock() /** * Writes input to stdin. + * + * @throws InvalidArgumentException When an input iterator yields a non supported value */ protected function write() { @@ -97,10 +101,18 @@ protected function write() $input = null; } elseif (is_resource($input = $input->current())) { stream_set_blocking($input, 0); - } else { - $this->inputBuffer .= $input; + } elseif (!isset($this->inputBuffer[0])) { + if (!is_string($input)) { + if (!is_scalar($input)) { + throw new InvalidArgumentException(sprintf('%s yielded a value of type "%s", but only scalars and stream resources are supported', get_class($this->input), gettype($input))); + } + $input = (string) $input; + } + $this->inputBuffer = $input; $this->input->next(); $input = null; + } else { + $input = null; } } diff --git a/src/Symfony/Component/Process/Process.php b/src/Symfony/Component/Process/Process.php index 11cffca4dfc5..4820892fadb7 100644 --- a/src/Symfony/Component/Process/Process.php +++ b/src/Symfony/Component/Process/Process.php @@ -132,7 +132,7 @@ class Process * @param string $commandline The command line to run * @param string|null $cwd The working directory or null to use the working dir of the current PHP process * @param array|null $env The environment variables or null to use the same environment as the current PHP process - * @param string|null $input The input + * @param mixed|null $input The input as stream resource, scalar or \Traversable, or null for no input * @param int|float|null $timeout The timeout in seconds or null to disable * @param array $options An array of options for proc_open * @@ -1027,7 +1027,7 @@ public function setEnv(array $env) /** * Gets the Process input. * - * @return null|string The Process input + * @return resource|string|\Iterator|null The Process input */ public function getInput() { @@ -1039,7 +1039,7 @@ public function getInput() * * This content will be passed to the underlying process standard input. * - * @param mixed $input The content + * @param resource|scalar|\Traversable|null $input The content * * @return self The current Process instance * diff --git a/src/Symfony/Component/Process/ProcessBuilder.php b/src/Symfony/Component/Process/ProcessBuilder.php index 69843caeb94c..32fd2ed67903 100644 --- a/src/Symfony/Component/Process/ProcessBuilder.php +++ b/src/Symfony/Component/Process/ProcessBuilder.php @@ -167,7 +167,7 @@ public function addEnvironmentVariables(array $variables) /** * Sets the input of the process. * - * @param mixed $input The input as a string + * @param resource|scalar|\Traversable|null $input The input content * * @return ProcessBuilder * diff --git a/src/Symfony/Component/Process/Tests/ProcessTest.php b/src/Symfony/Component/Process/Tests/ProcessTest.php index db9a01051fec..a488e3d110f6 100644 --- a/src/Symfony/Component/Process/Tests/ProcessTest.php +++ b/src/Symfony/Component/Process/Tests/ProcessTest.php @@ -14,6 +14,7 @@ use Symfony\Component\Process\Exception\LogicException; use Symfony\Component\Process\Exception\ProcessTimedOutException; use Symfony\Component\Process\Exception\RuntimeException; +use Symfony\Component\Process\InputStream; use Symfony\Component\Process\PhpExecutableFinder; use Symfony\Component\Process\Pipes\PipesInterface; use Symfony\Component\Process\Process; @@ -1176,33 +1177,99 @@ public function provideVariousIncrementals() { public function testIteratorInput() { - $nextData = 'ping'; - $input = function () use (&$nextData) { - while (false !== $nextData) { - yield $nextData; - yield $nextData = ''; - } + $input = function () { + yield 'ping'; + yield 'pong'; }; - $input = $input(); - $process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);')); + $process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'), null, null, $input()); + $process->run(); + $this->assertSame('pingpong', $process->getOutput()); + } + + public function testSimpleInputStream() + { + $input = new InputStream(); + + $process = new Process(self::$phpBin.' -r '.escapeshellarg('echo \'ping\'; stream_copy_to_stream(STDIN, STDOUT);')); $process->setInput($input); - $process->start(function ($type, $data) use ($input, &$nextData) { + + $process->start(function ($type, $data) use ($input) { if ('ping' === $data) { - $h = fopen('php://memory', 'r+'); - fwrite($h, 'pong'); - rewind($h); - $nextData = $h; - $input->next(); - } else { - $nextData = false; + $input->write('pang'); + } elseif (!$input->isClosed()) { + $input->write('pong'); + $input->close(); + } + }); + + $process->wait(); + $this->assertSame('pingpangpong', $process->getOutput()); + } + + public function testInputStreamWithCallable() + { + $i = 0; + $stream = fopen('php://memory', 'w+'); + $stream = function () use ($stream, &$i) { + if ($i < 3) { + rewind($stream); + fwrite($stream, ++$i); + rewind($stream); + + return $stream; } + }; + + $input = new InputStream(); + $input->onEmpty($stream); + $input->write($stream()); + + $process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);')); + $process->setInput($input); + $process->start(function ($type, $data) use ($input) { + $input->close(); + }); + + $process->wait(); + $this->assertSame('123', $process->getOutput()); + } + + public function testInputStreamWithGenerator() + { + $input = new InputStream(); + $input->onEmpty(function ($input) { + yield 'pong'; + $input->close(); }); + $process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);')); + $process->setInput($input); + $process->start(); + $input->write('ping'); $process->wait(); $this->assertSame('pingpong', $process->getOutput()); } + public function testInputStreamOnEmpty() + { + $i = 0; + $input = new InputStream(); + $input->onEmpty(function () use (&$i) {++$i;}); + + $process = new Process(self::$phpBin.' -r '.escapeshellarg('echo 123; echo fread(STDIN, 1); echo 456;')); + $process->setInput($input); + $process->start(function ($type, $data) use ($input) { + if ('123' === $data) { + $input->close(); + } + }); + $process->wait(); + + $this->assertSame(0, $i, 'InputStream->onEmpty callback should be called only when the input *becomes* empty'); + $this->assertSame('123456', $process->getOutput()); + } + /** * @param string $commandline * @param null|string $cwd