Skip to content

Commit

Permalink
feature #18513 [Process] Turn getIterator() args to flags & add ITER_…
Browse files Browse the repository at this point in the history
…SKIP_OUT/ERR modes (nicolas-grekas)

This PR was merged into the 3.1-dev branch.

Discussion
----------

[Process] Turn getIterator() args to flags & add ITER_SKIP_OUT/ERR modes

| Q             | A
| ------------- | ---
| Branch?       | master
| Bug fix?      | no
| New feature?  | yes
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | #8454
| License       | MIT
| Doc PR        | -

Targeted at 3.1

Commits
-------

428f12e [Process] Turn getIterator() args to flags & add ITER_SKIP_OUT/ERR modes
  • Loading branch information
nicolas-grekas committed Apr 13, 2016
2 parents f1d12a1 + 428f12e commit cb0fe14
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 30 deletions.
53 changes: 33 additions & 20 deletions src/Symfony/Component/Process/Process.php
Expand Up @@ -43,6 +43,11 @@ class Process implements \IteratorAggregate
// Timeout Precision in seconds.
const TIMEOUT_PRECISION = 0.2;

const ITER_NON_BLOCKING = 1; // By default, iterating over outputs is a blocking call, use this flag to make it non-blocking
const ITER_KEEP_OUTPUT = 2; // By default, outputs are cleared while iterating, use this flag to keep them in memory
const ITER_SKIP_OUT = 4; // Use this flag to skip STDOUT while iterating
const ITER_SKIP_ERR = 8; // Use this flag to skip STDERR while iterating

private $callback;
private $hasCallback = false;
private $commandline;
Expand Down Expand Up @@ -503,41 +508,49 @@ public function getIncrementalOutput()
/**
* Returns an iterator to the output of the process, with the output type as keys (Process::OUT/ERR).
*
* @param bool $blocking Whether to use a blocking read call.
* @param bool $clearOutput Whether to clear or keep output in memory.
* @param int $flags A bit field of Process::ITER_* flags.
*
* @throws LogicException in case the output has been disabled
* @throws LogicException In case the process is not started
*
* @return \Generator
*/
public function getIterator($blocking = true, $clearOutput = true)
public function getIterator($flags = 0)
{
$this->readPipesForOutput(__FUNCTION__, false);

while (null !== $this->callback || !feof($this->stdout) || !feof($this->stderr)) {
$out = stream_get_contents($this->stdout, -1, $this->incrementalOutputOffset);
$clearOutput = !(self::ITER_KEEP_OUTPUT & $flags);
$blocking = !(self::ITER_NON_BLOCKING & $flags);
$yieldOut = !(self::ITER_SKIP_OUT & $flags);
$yieldErr = !(self::ITER_SKIP_ERR & $flags);

if (isset($out[0])) {
if ($clearOutput) {
$this->clearOutput();
} else {
$this->incrementalOutputOffset = ftell($this->stdout);
}
while (null !== $this->callback || ($yieldOut && !feof($this->stdout)) || ($yieldErr && !feof($this->stderr))) {
if ($yieldOut) {
$out = stream_get_contents($this->stdout, -1, $this->incrementalOutputOffset);

if (isset($out[0])) {
if ($clearOutput) {
$this->clearOutput();
} else {
$this->incrementalOutputOffset = ftell($this->stdout);
}

yield self::OUT => $out;
yield self::OUT => $out;
}
}

$err = stream_get_contents($this->stderr, -1, $this->incrementalErrorOutputOffset);
if ($yieldErr) {
$err = stream_get_contents($this->stderr, -1, $this->incrementalErrorOutputOffset);

if (isset($err[0])) {
if ($clearOutput) {
$this->clearErrorOutput();
} else {
$this->incrementalErrorOutputOffset = ftell($this->stderr);
}
if (isset($err[0])) {
if ($clearOutput) {
$this->clearErrorOutput();
} else {
$this->incrementalErrorOutputOffset = ftell($this->stderr);
}

yield self::ERR => $err;
yield self::ERR => $err;
}
}

if (!$blocking && !isset($out[0]) && !isset($err[0])) {
Expand Down
3 changes: 3 additions & 0 deletions src/Symfony/Component/Process/ProcessUtils.php
Expand Up @@ -96,6 +96,9 @@ public static function validateInput($caller, $input)
if (is_scalar($input)) {
return (string) $input;
}
if ($input instanceof Process) {
return $input->getIterator($input::ITER_SKIP_ERR);
}
if ($input instanceof \Iterator) {
return $input;
}
Expand Down
35 changes: 25 additions & 10 deletions src/Symfony/Component/Process/Tests/ProcessTest.php
Expand Up @@ -1154,7 +1154,7 @@ public function pipesCodeProvider()
* @dataProvider provideVariousIncrementals
*/
public function testIncrementalOutputDoesNotRequireAnotherCall($stream, $method) {
$process = new Process(self::$phpBin.' -r '.escapeshellarg('$n = 0; while ($n < 3) { file_put_contents(\''.$stream.'\', $n, 1); $n++; usleep(1000); }'), null, null, null, null);
$process = $this->getProcess(self::$phpBin.' -r '.escapeshellarg('$n = 0; while ($n < 3) { file_put_contents(\''.$stream.'\', $n, 1); $n++; usleep(1000); }'), null, null, null, null);
$process->start();
$result = '';
$limit = microtime(true) + 3;
Expand Down Expand Up @@ -1182,7 +1182,7 @@ public function testIteratorInput()
yield 'pong';
};

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'), null, null, $input());
$process = $this->getProcess(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'), null, null, $input());
$process->run();
$this->assertSame('pingpong', $process->getOutput());
}
Expand All @@ -1191,7 +1191,7 @@ public function testSimpleInputStream()
{
$input = new InputStream();

$process = new Process(self::$phpBin.' -r '.escapeshellarg('echo \'ping\'; stream_copy_to_stream(STDIN, STDOUT);'));
$process = $this->getProcess(self::$phpBin.' -r '.escapeshellarg('echo \'ping\'; stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);

$process->start(function ($type, $data) use ($input) {
Expand Down Expand Up @@ -1225,7 +1225,7 @@ public function testInputStreamWithCallable()
$input->onEmpty($stream);
$input->write($stream());

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process = $this->getProcess(self::$phpBin.' -r '.escapeshellarg('echo fread(STDIN, 3);'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input) {
$input->close();
Expand All @@ -1243,7 +1243,7 @@ public function testInputStreamWithGenerator()
$input->close();
});

$process = new Process(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process = $this->getProcess(self::$phpBin.' -r '.escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);'));
$process->setInput($input);
$process->start();
$input->write('ping');
Expand All @@ -1257,7 +1257,7 @@ public function testInputStreamOnEmpty()
$input = new InputStream();
$input->onEmpty(function () use (&$i) {++$i;});

$process = new Process(self::$phpBin.' -r '.escapeshellarg('echo 123; echo fread(STDIN, 1); echo 456;'));
$process = $this->getProcess(self::$phpBin.' -r '.escapeshellarg('echo 123; echo fread(STDIN, 1); echo 456;'));
$process->setInput($input);
$process->start(function ($type, $data) use ($input) {
if ('123' === $data) {
Expand All @@ -1274,7 +1274,7 @@ public function testIteratorOutput()
{
$input = new InputStream();

$process = new Process(self::$phpBin.' -r '.escapeshellarg('fwrite(STDOUT, 123); fwrite(STDERR, 234); fwrite(STDOUT, fread(STDIN, 3)); fwrite(STDERR, 456);'));
$process = $this->getProcess(self::$phpBin.' -r '.escapeshellarg('fwrite(STDOUT, 123); fwrite(STDERR, 234); flush(); usleep(10000); fwrite(STDOUT, fread(STDIN, 3)); fwrite(STDERR, 456);'));
$process->setInput($input);
$process->start();
$output = array();
Expand Down Expand Up @@ -1310,12 +1310,12 @@ public function testNonBlockingNorClearingIteratorOutput()
{
$input = new InputStream();

$process = new Process(self::$phpBin.' -r '.escapeshellarg('fwrite(STDOUT, fread(STDIN, 3));'));
$process = $this->getProcess(self::$phpBin.' -r '.escapeshellarg('fwrite(STDOUT, fread(STDIN, 3));'));
$process->setInput($input);
$process->start();
$output = array();

foreach ($process->getIterator(false, false) as $type => $data) {
foreach ($process->getIterator($process::ITER_NON_BLOCKING | $process::ITER_KEEP_OUTPUT) as $type => $data) {
$output[] = array($type, $data);
break;
}
Expand All @@ -1326,7 +1326,7 @@ public function testNonBlockingNorClearingIteratorOutput()

$input->write(123);

foreach ($process->getIterator(false, false) as $type => $data) {
foreach ($process->getIterator($process::ITER_NON_BLOCKING | $process::ITER_KEEP_OUTPUT) as $type => $data) {
if ('' !== $data) {
$output[] = array($type, $data);
}
Expand All @@ -1342,6 +1342,21 @@ public function testNonBlockingNorClearingIteratorOutput()
$this->assertSame($expectedOutput, $output);
}

public function testChainedProcesses()
{
$p1 = new Process(self::$phpBin.' -r '.escapeshellarg('fwrite(STDERR, 123); fwrite(STDOUT, 456);'));
$p2 = $this->getProcess(sprintf('%s -r %s', self::$phpBin, escapeshellarg('stream_copy_to_stream(STDIN, STDOUT);')));
$p2->setInput($p1);

$p1->start();
$p2->run();

$this->assertSame('123', $p1->getErrorOutput());
$this->assertSame('', $p1->getOutput());
$this->assertSame('', $p2->getErrorOutput());
$this->assertSame('456', $p2->getOutput());
}

/**
* @param string $commandline
* @param null|string $cwd
Expand Down

0 comments on commit cb0fe14

Please sign in to comment.