Skip to content

Commit

Permalink
Merge 5ec0b05 into 2f9bf88
Browse files Browse the repository at this point in the history
  • Loading branch information
azjezz committed May 7, 2022
2 parents 2f9bf88 + 5ec0b05 commit 34e0c33
Show file tree
Hide file tree
Showing 5 changed files with 123 additions and 34 deletions.
1 change: 1 addition & 0 deletions docs/component/io.md
Expand Up @@ -16,6 +16,7 @@
- [input_handle](./../../src/Psl/IO/input_handle.php#L20)
- [output_handle](./../../src/Psl/IO/output_handle.php#L20)
- [pipe](./../../src/Psl/IO/pipe.php#L24)
- [streaming](./../../src/Psl/IO/streaming.php#L38)
- [write](./../../src/Psl/IO/write.php#L21)
- [write_error](./../../src/Psl/IO/write_error.php#L23)
- [write_error_line](./../../src/Psl/IO/write_error_line.php#L23)
Expand Down
2 changes: 1 addition & 1 deletion docs/component/shell.md
Expand Up @@ -12,7 +12,7 @@

#### `Functions`

- [execute](./../../src/Psl/Shell/execute.php#L42)
- [execute](./../../src/Psl/Shell/execute.php#L41)
- [unpack](./../../src/Psl/Shell/unpack.php#L20)

#### `Enums`
Expand Down
109 changes: 109 additions & 0 deletions src/Psl/IO/streaming.php
@@ -0,0 +1,109 @@
<?php

declare(strict_types=1);

namespace Psl\IO;

use Generator;
use Psl;
use Psl\Channel;
use Psl\Result;
use Psl\Str;
use Revolt\EventLoop;

/**
* Streaming the output of the given read stream handles using a generator.
*
* Example:
*
* $handles = [
* 'foo' => get_read_stream('foo'),
* 'bar' => get_read_stream('bar'),
* ];
*
* foreach(IO\streaming($handles) as $type => $chunk) {
* IO\write_line('received chunk "%s" from "%s" stream', $chunk, $type);
* }
*
* @template T of array-key
*
* @param iterable<T, ReadStreamHandleInterface> $handles
*
* @throws Exception\AlreadyClosedException If one of the handles has been already closed.
* @throws Exception\RuntimeException If an error occurred during the operation.
* @throws Exception\TimeoutException If $timeout is reached before being able to read all the handles until the end.
*
* @return Generator<T, string, mixed, void>
*/
function streaming(iterable $handles, ?float $timeout = null): Generator
{
/**
* @psalm-suppress UnnecessaryVarAnnotation
*
* @var Channel\ReceiverInterface<array{T|null, Result\ResultInterface<string>}> $receiver
*
* @psalm-suppress UnnecessaryVarAnnotation
*
* @var Channel\SenderInterface<array{T|null, Result\ResultInterface<string>}> $sender
*/
[$receiver, $sender] = Channel\unbounded();

/** @var Psl\Ref<array<T, string>> $watchers */
$watchers = new Psl\Ref([]);
foreach ($handles as $index => $handle) {
$stream = $handle->getStream();
if ($stream === null) {
throw new Exception\AlreadyClosedException(Str\format('Handle "%s" is already closed.', (string) $index));
}

$watchers->value[$index] = EventLoop::onReadable($stream, static function (string $watcher) use ($index, $handle, $sender, $watchers): void {
try {
$result = Result\wrap($handle->tryRead(...));
if ($result->isFailed() || ($result->isSucceeded() && $result->getResult() === '')) {
EventLoop::cancel($watcher);
unset($watchers->value[$index]);
}

$sender->send([$index, $result]);
} finally {
if ($watchers->value === []) {
$sender->close();
}
}
});
}

$timeout_watcher = null;
if ($timeout !== null) {
$timeout_watcher = EventLoop::delay($timeout, static function () use ($sender): void {
/** @var Result\ResultInterface<string> $failure */
$failure = new Result\Failure(
new Exception\TimeoutException('Reached timeout before being able to read all the handles until the end.')
);

$sender->send([null, $failure]);
});
}

try {
while (true) {
[$index, $result] = $receiver->receive();
if (null === $index || $result->isFailed()) {
throw $result->getThrowable();
}

yield $index => $result->getResult();
}
} catch (Channel\Exception\ClosedChannelException) {
// completed.
return;
} finally {
if ($timeout_watcher !== null) {
EventLoop::cancel($timeout_watcher);
}

foreach ($watchers->value as $watcher) {
EventLoop::cancel($watcher);
}
}
}
1 change: 1 addition & 0 deletions src/Psl/Internal/Loader.php
Expand Up @@ -468,6 +468,7 @@ final class Loader
'Psl\Unix\connect',
'Psl\Channel\bounded',
'Psl\Channel\unbounded',
'Psl\IO\streaming',
'Psl\IO\write',
'Psl\IO\write_line',
'Psl\IO\write_error',
Expand Down
44 changes: 11 additions & 33 deletions src/Psl/Shell/execute.php
Expand Up @@ -4,7 +4,6 @@

namespace Psl\Shell;

use Psl\Async;
use Psl\Dict;
use Psl\Env;
use Psl\Filesystem;
Expand Down Expand Up @@ -156,25 +155,15 @@ static function (array $m) use (
$stderr = new IO\CloseReadStreamHandle($pipes[2]);

try {
[$stdout_content, $stderr_content] = Async\concurrently([
static fn(): string => $stdout->readAll(timeout: $timeout),
static fn(): string => $stderr->readAll(timeout: $timeout),
]);
// @codeCoverageIgnoreStart
} catch (Async\Exception\CompositeException $exception) {
$reasons = $exception->getReasons();
if ($reasons[0] instanceof IO\Exception\TimeoutException) {
throw new Exception\TimeoutException('reached timeout while the process output is still not readable.', 0, $reasons[0]);
}

if ($reasons[1] instanceof IO\Exception\TimeoutException) {
throw new Exception\TimeoutException('reached timeout while the process output is still not readable.', 0, $reasons[1]);
$result = '';
/** @psalm-suppress MissingThrowsDocblock */
foreach (IO\streaming([1 => $stdout, 2 => $stderr], $timeout) as $type => $chunk) {
if ($chunk) {
$result .= pack('C1N1', $type, Str\Byte\length($chunk)) . $chunk;
}
}

throw new Exception\RuntimeException('Failed to reach process output.', 0, $exception ?? null);
} catch (IO\Exception\TimeoutException $previous) {
throw new Exception\TimeoutException('reached timeout while the process output is still not readable.', 0, $previous);
// @codeCoverageIgnoreEnd
} finally {
/** @psalm-suppress MissingThrowsDocblock */
$stdout->close();
Expand All @@ -185,29 +174,18 @@ static function (array $m) use (
}

if ($code !== 0) {
/** @psalm-suppress MissingThrowsDocblock */
[$stdout_content, $stderr_content] = namespace\unpack($result);

throw new Exception\FailedExecutionException($commandline, $stdout_content, $stderr_content, $code);
}

if (ErrorOutputBehavior::Packed === $error_output_behavior) {
$result = '';
$stdout_length = Str\Byte\length($stdout_content);
$stderr_length = Str\Byte\length($stderr_content);

if ($stdout_length) {
$stdout_header = pack('C1N1', 1, $stdout_length);

$result .= $stdout_header . $stdout_content;
}

if ($stderr_length) {
$stderr_header = pack('C1N1', 2, $stderr_length);

$result .= $stderr_header . $stderr_content;
}

return $result;
}

/** @psalm-suppress MissingThrowsDocblock */
[$stdout_content, $stderr_content] = namespace\unpack($result);
return match ($error_output_behavior) {
ErrorOutputBehavior::Prepend => $stderr_content . $stdout_content,
ErrorOutputBehavior::Append => $stdout_content . $stderr_content,
Expand Down

0 comments on commit 34e0c33

Please sign in to comment.