Skip to content

Commit

Permalink
feat(async): introduce more async helper functions
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Dec 1, 2021
1 parent b49bd8b commit 88fa3f2
Show file tree
Hide file tree
Showing 26 changed files with 453 additions and 87 deletions.
2 changes: 1 addition & 1 deletion examples/async/usleep.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
Async\main(static function (): int {
$start = time();

Async\concurrent([
Async\parallel([
static fn() => Async\sleep(2.0),
static fn() => Async\sleep(2.0),
static fn() => Async\sleep(2.0),
Expand Down
2 changes: 1 addition & 1 deletion examples/io/pipe.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

$output = IO\output_handle();

Async\concurrent([
Async\parallel([
static function() use($read, $output): void {
$output->writeAll("< sleeping.\n");

Expand Down
2 changes: 1 addition & 1 deletion examples/shell/concurrent.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
Async\main(static function (): int {
$start = time();

Async\concurrent([
Async\parallel([
static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']),
static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']),
static fn() => Shell\execute(PHP_BINARY, ['-r', '$t = time(); while(time() < ($t+1)) { echo "."; }']),
Expand Down
2 changes: 1 addition & 1 deletion examples/tcp/concurrent.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
Async\main(static function (): int {
$output = IO\output_handle();

Async\concurrent([
Async\parallel([
'server' => static function () use ($output): void {
$server = TCP\Server::create('localhost', 91337);
$output->writeAll("< server is listening\n");
Expand Down
2 changes: 1 addition & 1 deletion examples/unix/concurrent.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

$output = IO\output_handle();

Async\concurrent([
Async\parallel([
'server' => static function () use ($file, $output): void {
$server = Unix\Server::create($file);
$output->writeAll("< server is listening\n");
Expand Down
35 changes: 27 additions & 8 deletions src/Psl/Async/all.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@
/**
* Awaits all awaitables to complete concurrently.
*
* If one or more awaitables fail, all awaitables will be completed before throwing.
* If one awaitable fails, the exception will be thrown immediately, and the result of the callables will be ignored.
*
* If multiple awaitables failed at once, a {@see Exception\CompositeException} will be thrown.
*
* Once the awaitables have completed, an array containing the results will be returned preserving the original awaitables order.
*
* @template Tk of array-key
* @template Tv
Expand All @@ -22,20 +26,35 @@
function all(iterable $awaitables): array
{
$values = [];
$errors = [];

// Awaitable::iterate() to throw the first error based on completion order instead of argument order
foreach (Awaitable::iterate($awaitables) as $index => $awaitable) {
try {
$values[$index] = $awaitable->await();
} catch (Throwable $throwable) {
$errors[] = $throwable;
}
}
$errors = [];
foreach ($awaitables as $original) {
if ($original === $awaitable) {
continue;
}

if ($errors !== []) {
/** @psalm-suppress MissingThrowsDocblock */
throw $errors[0];
if (!$original->isComplete()) {
$original->ignore();
} else {
try {
$original->await();
} catch (Throwable $error) {
$errors[] = $error;
}
}
}

if ($errors === []) {
throw $throwable;
}

throw new Exception\CompositeException([$throwable, ...$errors], 'Multiple exceptions thrown while waiting.');
}
}

return Dict\map_with_key(
Expand Down
9 changes: 8 additions & 1 deletion src/Psl/Async/any.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,14 @@ function any(iterable $awaitables): mixed
$errors = [];
foreach (Awaitable::iterate($awaitables) as $first) {
try {
return $first->await();
$result = $first->await();
foreach ($awaitables as $awaitable) {
if ($awaitable !== $first) {
$awaitable->ignore();
}
}

return $result;
} catch (Throwable $throwable) {
$errors[] = $throwable;
}
Expand Down
34 changes: 0 additions & 34 deletions src/Psl/Async/concurrent.php

This file was deleted.

8 changes: 6 additions & 2 deletions src/Psl/Async/first.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
* @throws Psl\Exception\InvariantViolationException If $awaitables is empty.
*
* @return T
*
* @codeCoverageIgnore
*/
function first(iterable $awaitables): mixed
{
foreach (Awaitable::iterate($awaitables) as $first) {
foreach ($awaitables as $awaitable) {
if ($awaitable !== $first) {
$awaitable->ignore();
}
}

return $first->await();
}

Expand Down
77 changes: 77 additions & 0 deletions src/Psl/Async/parallel.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Psl\Dict;

/**
* Run the tasks iterable of functions in parallel, without waiting until the previous function has completed.
*
* If one tasks fails, the exception will be thrown immediately, and the result of the callables will be ignored.
*
* If multiple tasks failed at once, a {@see Exception\CompositeException} will be thrown.
*
* Once the tasks have completed, an array containing the results will be returned preserving the original awaitables order.
*
* <code>
* use Psl\Async;
*
* // execute `getOne()` and `getTwo()` functions in parallel.
*
* [$one, $two] = Async\parallel([
* getOne(...),
* getTwo(...),
* ]);
* </code>
*
* @note {@see parallel()} is about kicking-off I/O tasks in parallel, not about parallel execution of code.
* If your tasks do not use any timers or perform any I/O, they will actually be executed in series.
*
* <code>
* use Psl\Async;
*
* // the following runs in series.
*
* [$one, $two] = Async\parallel([
* fn() => file_get_contents('path/to/file1.txt'),
* fn() => file_get_contents('path/to/file2.txt'),
* ]);
* </code>
*
* @note Use {@see reflect()} to continue the execution of other tasks when a task fails.
*
* <code>
* use Psl\Async;
*
* // execute `getOne()` and `getTwo()` functions in parallel.
* // if either one of the given tasks fails, the other will continue execution.
*
* [$one, $two] = Async\parallel([
* Async\reflect(getOne(...)),
* Async\reflect(getTwo(...)),
* ]);
* </code>
*
* @template Tk of array-key
* @template Tv
*
* @param iterable<Tk, (callable(): Tv)> $tasks
*
* @return array<Tk, Tv>
*/
function parallel(iterable $tasks): array
{
$awaitables = Dict\map(
$tasks,
/**
* @param callable(): Tv $callable
*
* @return Awaitable<Tv>
*/
static fn(callable $callable): Awaitable => run($callable),
);

return namespace\all($awaitables);
}
28 changes: 28 additions & 0 deletions src/Psl/Async/reflect.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Closure;
use Exception;
use Psl\Result;

/**
* Wraps the given task in another task that always completes with a {@see Result\Success},
* or {@see Result\Failure} if the callable throws an {@see Exception}.
*
* @template T
*
* @param (callable(): T) $task
*
* @return (Closure(): Result\ResultInterface<T>)
*
* @see Result\wrap()
*
* @pure
*/
function reflect(callable $task): Closure
{
return static fn() => Result\wrap($task);
}
32 changes: 32 additions & 0 deletions src/Psl/Async/series.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Psl\Dict;

/**
* Run the functions in the tasks collection in series, each one running once the previous function has completed.
*
* If any functions in the series throws, no more functions are run, and the exception is immediately thrown.
*
* @template Tk of array-key
* @template Tv
*
* @param iterable<Tk, (callable(): Tv)> $tasks
*
* @return array<Tk, Tv>
*/
function series(iterable $tasks): array
{
return Dict\map(
$tasks,
/**
* @param callable(): Tv $callable
*
* @return Tv
*/
static fn(callable $callable): mixed => run($callable)->await(),
);
}
24 changes: 24 additions & 0 deletions src/Psl/Async/wrap.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Psl\Result;

/**
* Wraps the async function in an awaitable that completes with a {@see Result\Success},
* or {@see Result\Failure} if the task throws an {@see Exception}.
*
* @template T
*
* @param (callable(): T) $task
*
* @return Result\ResultInterface<T>
*
* @see reflect()
*/
function wrap(callable $task): Result\ResultInterface
{
return run(reflect($task))->await();
}
5 changes: 4 additions & 1 deletion src/Psl/Internal/Loader.php
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,10 @@ final class Loader
'Psl\Trait\defined',
'Psl\Async\main',
'Psl\Async\run',
'Psl\Async\concurrent',
'Psl\Async\parallel',
'Psl\Async\reflect',
'Psl\Async\wrap',
'Psl\Async\series',
'Psl\Async\await',
'Psl\Async\any',
'Psl\Async\all',
Expand Down
2 changes: 1 addition & 1 deletion src/Psl/Shell/execute.php
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ static function (array $m) use (
$stderr = new Stream\CloseReadHandle($pipes[2]);

try {
[$stdout_content, $stderr_content] = Async\concurrent([
[$stdout_content, $stderr_content] = Async\parallel([
static fn(): string => $stdout->readAll(timeout: $timeout),
static fn(): string => $stderr->readAll(timeout: $timeout),
]);
Expand Down
Loading

0 comments on commit 88fa3f2

Please sign in to comment.