Skip to content

Commit

Permalink
chore: fix coding standards
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed May 7, 2022
1 parent bfab29c commit cf3aa60
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 44 deletions.
2 changes: 1 addition & 1 deletion docs/component/io.md
Expand Up @@ -16,7 +16,7 @@
- [input_handle](./../../src/Psl/IO/input_handle.php#L20)
- [output_handle](./../../src/Psl/IO/output_handle.php#L20)
- [pipe](./../../src/Psl/IO/pipe.php#L24)
- [streaming](./../../src/Psl/IO/streaming.php#L37)
- [streaming](./../../src/Psl/IO/streaming.php#L39)
- [write](./../../src/Psl/IO/write.php#L21)
- [write_error](./../../src/Psl/IO/write_error.php#L23)
- [write_error_line](./../../src/Psl/IO/write_error_line.php#L23)
Expand Down
2 changes: 1 addition & 1 deletion docs/component/shell.md
Expand Up @@ -12,7 +12,7 @@

#### `Functions`

- [execute](./../../src/Psl/Shell/execute.php#L42)
- [execute](./../../src/Psl/Shell/execute.php#L41)
- [unpack](./../../src/Psl/Shell/unpack.php#L20)

#### `Enums`
Expand Down
39 changes: 20 additions & 19 deletions src/Psl/IO/streaming.php
Expand Up @@ -5,11 +5,11 @@
namespace Psl\IO;

use Generator;
use Psl\Async;
use Psl;
use Psl\Channel;
use Psl\Result;
use Psl\Str;
use Psl;
use Revolt\EventLoop;

/**
* Streaming the output of the given read stream handles using a generator.
Expand Down Expand Up @@ -39,39 +39,40 @@ function streaming(iterable $handles, ?float $timeout = null): Generator
{
/**
* @psalm-suppress UnnecessaryVarAnnotation
*
* @var Channel\ReceiverInterface<array{T|null, Result\ResultInterface<string>}> $receiver
*
* @psalm-suppress UnnecessaryVarAnnotation
*
* @var Channel\SenderInterface<array{T|null, Result\ResultInterface<string>}> $sender
*/
[$receiver, $sender] = Channel\unbounded();

/** @var Psl\Ref<array<T, string>> $watchers */
$watchers = new Psl\Ref([]);
foreach ($handles as $k => $handle) {
foreach ($handles as $index => $handle) {
$stream = $handle->getStream();
if ($stream === null) {
throw new Exception\AlreadyClosedException(Str\format('Handle "%s" is already closed.', (string) $k));
throw new Exception\AlreadyClosedException(Str\format('Handle "%s" is already closed.', (string) $index));
}

$watchers->value[$k] = Async\Scheduler::onReadable($stream, static function (string $watcher) use ($k, $handle, $sender, $watchers): void {
$watchers->value[$index] = EventLoop::onReadable($stream, static function (string $watcher) use ($index, $handle, $sender, $watchers): void {
$result = Result\wrap($handle->tryRead(...));
if ($result->isFailed() || ($result->isSucceeded() && $result->getResult() === '')) {
Async\Scheduler::cancel($watcher);
unset($watchers->value[$k]);
if ($watchers->value === []) {
$sender->close();
}

return;
EventLoop::cancel($watcher);
unset($watchers->value[$index]);
}

$sender->send([$k, $result]);
$sender->send([$index, $result]);
if ($watchers->value === []) {
$sender->close();
}
});
}

$timeout_watcher = null;
if ($timeout !== null) {
$timeout_watcher = Async\Scheduler::delay($timeout, static function () use ($sender): void {
$timeout_watcher = EventLoop::delay($timeout, static function () use ($sender): void {
/** @var Result\ResultInterface<string> $failure */
$failure = new Result\Failure(
new Exception\TimeoutException('Reached timeout before being able to read all the handles until the end.')
Expand All @@ -83,23 +84,23 @@ function streaming(iterable $handles, ?float $timeout = null): Generator

try {
while (true) {
[$k, $result] = $receiver->receive();
if (null === $k || $result->isFailed()) {
[$index, $result] = $receiver->receive();
if (null === $index || $result->isFailed()) {
throw $result->getThrowable();
}

yield $k => $result->getResult();
yield $index => $result->getResult();
}
} catch (Channel\Exception\ClosedChannelException) {
// completed.
return;
} finally {
if ($timeout_watcher !== null) {
Async\Scheduler::cancel($timeout_watcher);
EventLoop::cancel($timeout_watcher);
}

foreach ($watchers->value as $watcher) {
Async\Scheduler::cancel($watcher);
EventLoop::cancel($watcher);
}
}
}
30 changes: 7 additions & 23 deletions src/Psl/Shell/execute.php
Expand Up @@ -155,14 +155,11 @@ static function (array $m) use (
$stderr = new IO\CloseReadStreamHandle($pipes[2]);

try {
$stdout_content = '';
$stderr_content = '';
$result = '';
/** @psalm-suppress MissingThrowsDocblock */
foreach (IO\streaming(['out' => $stdout, 'err' => $stderr], $timeout) as $type => $chunk) {
if ('out' === $type) {
$stdout_content .= $chunk;
} else {
$stderr_content .= $chunk;
foreach (IO\streaming([1 => $stdout, 2 => $stderr], $timeout) as $type => $chunk) {
if ($chunk) {
$result .= pack('C1N1', $type, Str\Byte\length($chunk)) . $chunk;
}
}
} catch (IO\Exception\TimeoutException $previous) {
Expand All @@ -177,29 +174,16 @@ static function (array $m) use (
}

if ($code !== 0) {
[$stdout_content, $stderr_content] = namespace\unpack($result);

throw new Exception\FailedExecutionException($commandline, $stdout_content, $stderr_content, $code);
}

if (ErrorOutputBehavior::Packed === $error_output_behavior) {
$result = '';
$stdout_length = Str\Byte\length($stdout_content);
$stderr_length = Str\Byte\length($stderr_content);

if ($stdout_length) {
$stdout_header = pack('C1N1', 1, $stdout_length);

$result .= $stdout_header . $stdout_content;
}

if ($stderr_length) {
$stderr_header = pack('C1N1', 2, $stderr_length);

$result .= $stderr_header . $stderr_content;
}

return $result;
}

[$stdout_content, $stderr_content] = namespace\unpack($result);
return match ($error_output_behavior) {
ErrorOutputBehavior::Prepend => $stderr_content . $stdout_content,
ErrorOutputBehavior::Append => $stdout_content . $stderr_content,
Expand Down

0 comments on commit cf3aa60

Please sign in to comment.