Skip to content

Commit

Permalink
feature #18386 [Process] Add InputStream to seamlessly feed running p…
Browse files Browse the repository at this point in the history
…rocesses (nicolas-grekas)

This PR was merged into the 3.1-dev branch.

Discussion
----------

[Process] Add InputStream to seamlessly feed running processes

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #18262
| License       | MIT
| Doc PR        | symfony/symfony-docs#6424

Look at the tests, beautiful, isn't it?

Commits
-------

3d20b6c [Process] Add InputStream to seamlessly feed running processes
  • Loading branch information
fabpot committed Apr 2, 2016
2 parents 82f0acf + 3d20b6c commit dc189f0
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 23 deletions.
90 changes: 90 additions & 0 deletions src/Symfony/Component/Process/InputStream.php
@@ -0,0 +1,90 @@
<?php

/*
* This file is part of the Symfony package.
*
* (c) Fabien Potencier <fabien@symfony.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

namespace Symfony\Component\Process;

use Symfony\Component\Process\Exception\RuntimeException;

/**
* Provides a way to continuously write to the input of a Process until the InputStream is closed.
*
* @author Nicolas Grekas <p@tchwork.com>
*/
class InputStream implements \IteratorAggregate
{
private $onEmpty = null;
private $input = array();
private $open = true;

/**
* Sets a callback that is called when the write buffer becomes empty.
*/
public function onEmpty(callable $onEmpty = null)
{
$this->onEmpty = $onEmpty;
}

/**
* Appends an input to the write buffer.
*
* @param resource|scalar|\Traversable|null The input to append as stream resource, scalar or \Traversable
*/
public function write($input)
{
if (null === $input) {
return;
}
if ($this->isClosed()) {
throw new RuntimeException(sprintf('%s is closed', static::class));
}
$this->input[] = ProcessUtils::validateInput(__METHOD__, $input);
}

/**
* Closes the write buffer.
*/
public function close()
{
$this->open = false;
}

/**
* Tells whether the write buffer is closed or not.
*/
public function isClosed()
{
return !$this->open;
}

public function getIterator()
{
$this->open = true;

while ($this->open || $this->input) {
if (!$this->input) {
yield '';
continue;
}
$current = array_shift($this->input);

if ($current instanceof \Iterator) {
foreach ($current as $cur) {
yield $cur;
}
} else {
yield $current;
}
if (!$this->input && $this->open && null !== $onEmpty = $this->onEmpty) {
$this->write($onEmpty($this));
}
}
}
}
18 changes: 15 additions & 3 deletions src/Symfony/Component/Process/Pipes/AbstractPipes.php
Expand Up @@ -11,6 +11,8 @@

namespace Symfony\Component\Process\Pipes;

use Symfony\Component\Process\Exception\InvalidArgumentException;

/**
* @author Romain Neutron <imprec@gmail.com>
*
Expand All @@ -23,7 +25,7 @@ abstract class AbstractPipes implements PipesInterface

/** @var string */
private $inputBuffer = '';
/** @var resource|\Iterator|null */
/** @var resource|scalar|\Iterator|null */
private $input;
/** @var bool */
private $blocked = true;
Expand Down Expand Up @@ -84,6 +86,8 @@ protected function unblock()

/**
* Writes input to stdin.
*
* @throws InvalidArgumentException When an input iterator yields a non supported value
*/
protected function write()
{
Expand All @@ -97,10 +101,18 @@ protected function write()
$input = null;
} elseif (is_resource($input = $input->current())) {
stream_set_blocking($input, 0);
} else {
$this->inputBuffer .= $input;
} elseif (!isset($this->inputBuffer[0])) {
if (!is_string($input)) {
if (!is_scalar($input)) {
throw new InvalidArgumentException(sprintf('%s yielded a value of type "%s", but only scalars and stream resources are supported', get_class($this->input), gettype($input)));
}
$input = (string) $input;
}
$this->inputBuffer = $input;
$this->input->next();
$input = null;
} else {
$input = null;
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/Symfony/Component/Process/Process.php
Expand Up @@ -132,7 +132,7 @@ class Process
* @param string $commandline The command line to run
* @param string|null $cwd The working directory or null to use the working dir of the current PHP process
* @param array|null $env The environment variables or null to use the same environment as the current PHP process
* @param string|null $input The input
* @param mixed|null $input The input as stream resource, scalar or \Traversable, or null for no input
* @param int|float|null $timeout The timeout in seconds or null to disable
* @param array $options An array of options for proc_open
*
Expand Down Expand Up @@ -1027,7 +1027,7 @@ public function setEnv(array $env)
/**
* Gets the Process input.
*
* @return null|string The Process input
* @return resource|string|\Iterator|null The Process input
*/
public function getInput()
{
Expand All @@ -1039,7 +1039,7 @@ public function getInput()
*
* This content will be passed to the underlying process standard input.
*
* @param mixed $input The content
* @param resource|scalar|\Traversable|null $input The content
*
* @return self The current Process instance
*
Expand Down
2 changes: 1 addition & 1 deletion src/Symfony/Component/Process/ProcessBuilder.php
Expand Up @@ -167,7 +167,7 @@ public function addEnvironmentVariables(array $variables)
/**
* Sets the input of the process.
*
* @param mixed $input The input as a string
* @param resource|scalar|\Traversable|null $input The input content
*
* @return ProcessBuilder
*
Expand Down
99 changes: 83 additions & 16 deletions src/Symfony/Component/Process/Tests/ProcessTest.php
Expand Up @@ -14,6 +14,7 @@
use Symfony\Component\Process\Exception\LogicException;
use Symfony\Component\Process\Exception\ProcessTimedOutException;
use Symfony\Component\Process\Exception\RuntimeException;
use Symfony\Component\Process\InputStream;
use Symfony\Component\Process\PhpExecutableFinder;
use Symfony\Component\Process\Pipes\PipesInterface;
use Symfony\Component\Process\Process;
Expand Down Expand Up @@ -1176,33 +1177,99 @@ public function provideVariousIncrementals() {

public function testIteratorInput()
{
$nextData = 'ping';
$input = function () use (&$nextData) {
while (false !== $nextData) {
yield $nextData;
yield $nextData = '';
}
$input = function () {
yield 'ping';
yield 'pong';
};
$input = $input();

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'), null, null, $input());
$process->run();
$this->assertSame('pingpong', $process->getOutput());
}

public function testSimpleInputStream()
{
$input = new InputStream();

$process = new Process(self::$phpBin.' -r '.escapeshellarg('echo \'ping\'; stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input, &$nextData) {

$process->start(function ($type, $data) use ($input) {
if ('ping' === $data) {
$h = fopen('php://memory', 'r+');
fwrite($h, 'pong');
rewind($h);
$nextData = $h;
$input->next();
} else {
$nextData = false;
$input->write('pang');
} elseif (!$input->isClosed()) {
$input->write('pong');
$input->close();
}
});

$process->wait();
$this->assertSame('pingpangpong', $process->getOutput());
}

public function testInputStreamWithCallable()
{
$i = 0;
$stream = fopen('php://memory', 'w+');
$stream = function () use ($stream, &$i) {
if ($i < 3) {
rewind($stream);
fwrite($stream, ++$i);
rewind($stream);

return $stream;
}
};

$input = new InputStream();
$input->onEmpty($stream);
$input->write($stream());

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input) {
$input->close();
});

$process->wait();
$this->assertSame('123', $process->getOutput());
}

public function testInputStreamWithGenerator()
{
$input = new InputStream();
$input->onEmpty(function ($input) {
yield 'pong';
$input->close();
});

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start();
$input->write('ping');
$process->wait();
$this->assertSame('pingpong', $process->getOutput());
}

public function testInputStreamOnEmpty()
{
$i = 0;
$input = new InputStream();
$input->onEmpty(function () use (&$i) {++$i;});

$process = new Process(self::$phpBin.' -r '.escapeshellarg('echo 123; echo fread(STDIN, 1); echo 456;'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input) {
if ('123' === $data) {
$input->close();
}
});
$process->wait();

$this->assertSame(0, $i, 'InputStream->onEmpty callback should be called only when the input *becomes* empty');
$this->assertSame('123456', $process->getOutput());
}

/**
* @param string $commandline
* @param null|string $cwd
Expand Down

0 comments on commit dc189f0

Please sign in to comment.