Skip to content

Commit

Permalink
feat(async): introduce async component
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Nov 2, 2021
1 parent a781913 commit fa272fc
Show file tree
Hide file tree
Showing 34 changed files with 1,780 additions and 1 deletion.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Expand Up @@ -11,4 +11,4 @@
* dropped support for PHP 8.0
* **BC** - signature of `Psl\Type\object` function changed from `object<T of object>(classname<T> $classname): TypeInterface<T>` to `object(): TypeInterface<object>` ( to preserve the old behavior, use `Psl\Type\instance_of` )
* introduced `Psl\Type\instance_of` function, with the signature of `instance_of<T of object>(classname<T> $classname): TypeInterface<T>`.

* introduced a new `Psl\Async` component.
1 change: 1 addition & 0 deletions docs/README.md
Expand Up @@ -7,6 +7,7 @@
# Components API

- [Psl](./component/psl.md)
- [Psl\Async](./component/async.md)
- [Psl\Class](./component/class.md)
- [Psl\Collection](./component/collection.md)
- [Psl\DataStructure](./component/data-structure.md)
Expand Down
33 changes: 33 additions & 0 deletions docs/component/async.md
@@ -0,0 +1,33 @@
<!--
This markdown file was generated using `docs/documenter.php`.
Any edits to it will likely be lost.
-->

[*index](./../README.md)

---

### `Psl\Async` Component

#### `Functions`

- [all](./../../src/Psl/Async/all.php#L17)
- [any](./../../src/Psl/Async/any.php#L25)
- [await](./../../src/Psl/Async/await.php#L18)
- [await_readable](./../../src/Psl/Async/await_readable.php#L18)
- [await_signal](./../../src/Psl/Async/await_signal.php#L18)
- [await_writable](./../../src/Psl/Async/await_writable.php#L18)
- [concurrently](./../../src/Psl/Async/concurrently.php#L19)
- [first](./../../src/Psl/Async/first.php#L24)
- [later](./../../src/Psl/Async/later.php#L14)
- [run](./../../src/Psl/Async/run.php#L19)
- [usleep](./../../src/Psl/Async/usleep.php#L10)

#### `Classes`

- [Awaitable](./../../src/Psl/Async/Awaitable.php#L17)
- [Deferred](./../../src/Psl/Async/Deferred.php#L13)
- [Scheduler](./../../src/Psl/Async/Scheduler.php#L19)


1 change: 1 addition & 0 deletions docs/documenter.php
Expand Up @@ -181,6 +181,7 @@ function get_all_components(): array
{
$components = [
'Psl',
'Psl\\Async',
'Psl\\Class',
'Psl\\Collection',
'Psl\\DataStructure',
Expand Down
200 changes: 200 additions & 0 deletions src/Psl/Async/Awaitable.php
@@ -0,0 +1,200 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Generator;
use Psl\Async\Internal\AwaitableIterator;
use Psl\Async\Internal\State;
use Throwable;

use function is_array;

/**
* @template T
*/
final class Awaitable
{
private State $state;

/**
* @param State<T> $state
*
* @internal Use {@see Deferred} to create and resolve an awaitable.
*/
public function __construct(State $state)
{
$this->state = $state;
}

/**
* Iterate over the given awaitables in completion order.
*
* @template Tk
* @template Tv
*
* @param iterable<Tk, Awaitable<Tv>> $awaitables
*
* @return Generator<Tk, Awaitable<Tv>, void, void>
*/
public static function iterate(iterable $awaitables): Generator
{
$iterator = new AwaitableIterator();

if (is_array($awaitables)) {
foreach ($awaitables as $key => $awaitable) {
/** @psalm-suppress MissingThrowsDocblock */
$iterator->enqueue($awaitable->state, $key, $awaitable);
}

/** @psalm-suppress MissingThrowsDocblock */
$iterator->complete();
} else {
Scheduler::defer(static function () use ($awaitables, $iterator): void {
// @codeCoverageIgnoreStart
try {
foreach ($awaitables as $key => $awaitable) {
$iterator->enqueue($awaitable->state, $key, $awaitable);
}

/** @psalm-suppress MissingThrowsDocblock */
$iterator->complete();
} catch (Throwable $exception) {
/** @psalm-suppress MissingThrowsDocblock */
$iterator->error($exception);
}
// @codeCoverageIgnoreEnd
});
}

/** @psalm-suppress MissingThrowsDocblock */
while ($item = $iterator->consume()) {
yield $item[0] => $item[1];
}
}

/**
* @template Tv
*
* @param Tv $result
*
* @return Awaitable<Tv>
*/
public static function complete(mixed $result): self
{
/** @var State<Tv> $state */
$state = new State();
/** @psalm-suppress MissingThrowsDocblock */
$state->complete($result);

return new self($state);
}

/**
* @return Awaitable<void>
*/
public static function error(Throwable $throwable): self
{
/** @var State<void> $state */
$state = new State();
/** @psalm-suppress MissingThrowsDocblock */
$state->error($throwable);

return new self($state);
}

/**
* @return bool True if the operation has completed.
*/
public function isComplete(): bool
{
return $this->state->isComplete();
}

/**
* Attaches a callback that is invoked if this awaitable completes.
*
* The returned awaitable is completed with the return value of the callback,
* or errors with an exception thrown from the callback.
*
* @template Ts
*
* @param callable(T): Ts $on_success
* @param callable(Throwable): Ts $on_failure
*
* @return Awaitable<Ts>
*/
public function then(callable $on_success, callable $on_failure): self
{
/** @var State<Ts> $state */
$state = new State();

$this->state->subscribe(
/**
* @param null|Throwable $error
* @param null|T $value
*/
static function (?Throwable $error, mixed $value) use ($state, $on_success, $on_failure): void {
if ($error) {
try {
$state->complete($on_failure($error));
} catch (Throwable $throwable) {
$state->error($throwable);
}

return;
}

try {
/**
* @var T $value
*/
$state->complete($on_success($value));
} catch (Throwable $exception) {
$state->error($exception);
}
},
);

return new self($state);
}


/**
* Awaits the operation to complete.
*
* Throws an exception if the operation fails.
*
* @return T
*/
public function await(): mixed
{
$suspension = Scheduler::createSuspension();

$this->state->subscribe(
/**
* @param null|Throwable $error
* @param null|T $value
*/
static function (?Throwable $error, mixed $value) use ($suspension): void {
if ($error) {
$suspension->throw($error);
} else {
$suspension->resume($value);
}
},
);

/** @var T */
return $suspension->suspend();
}

/**
* Do not forward unhandled errors to the event loop handler.
*/
public function ignore(): void
{
$this->state->ignore();
}
}
70 changes: 70 additions & 0 deletions src/Psl/Async/Deferred.php
@@ -0,0 +1,70 @@
<?php

declare(strict_types=1);

namespace Psl\Async;

use Psl;
use Throwable;

/**
* @template T
*/
final class Deferred
{
/**
* @var Internal\State<T>
*/
private Internal\State $state;

/**
* @var Awaitable<T>
*/
private Awaitable $awaitable;

public function __construct()
{
$this->state = new Internal\State();
$this->awaitable = new Awaitable($this->state);
}

/**
* Completes the operation with a result value.
*
* @param T $result Result of the operation.
*
* @throws Psl\Exception\InvariantViolationException If the operation is no longer pending.
*/
public function complete(mixed $result): void
{
$this->state->complete($result);
}

/**
* Marks the operation as failed.
*
* @param Throwable $throwable Throwable to indicate the error.
*
* @throws Psl\Exception\InvariantViolationException If the operation is no longer pending.
*/
public function error(Throwable $throwable): void
{
$this->state->error($throwable);
}

/**
* @return bool True if the operation has completed.
*/
public function isComplete(): bool
{
return $this->state->isComplete();
}

/**
* @return Awaitable<T> The awaitable associated with this Deferred.
*/
public function getAwaitable(): Awaitable
{
return $this->awaitable;
}
}
57 changes: 57 additions & 0 deletions src/Psl/Async/Exception/CompositeException.php
@@ -0,0 +1,57 @@
<?php

declare(strict_types=1);

namespace Psl\Async\Exception;

use Exception;
use Psl\Iter;
use Psl\Str;
use Throwable;

use const PHP_EOL;

final class CompositeException extends Exception implements ExceptionInterface
{
/**
* @var non-empty-array<array-key, Throwable>
*/
private array $reasons;

/**
* @param non-empty-array<array-key, Throwable> $reasons Array of exceptions.
* @param string|null $message Exception message, defaults to message generated from passed exceptions.
*/
public function __construct(array $reasons, ?string $message = null)
{
parent::__construct($message ?? $this->generateMessage($reasons));

$this->reasons = $reasons;
}

/**
* @return non-empty-array<array-key, Throwable>
*/
public function getReasons(): array
{
return $this->reasons;
}

/**
* @param non-empty-array<array-key, Throwable> $reasons
*/
private function generateMessage(array $reasons): string
{
$message = Str\format('"Multiple errors encountered (%d); use "%s::getReasons()" to retrieve the array of exceptions thrown:', Iter\count($reasons), self::class);

foreach ($reasons as $reason) {
$message .= PHP_EOL . PHP_EOL . $reason::class;

if ($reason->getMessage() !== '') {
$message .= ': ' . $reason->getMessage();
}
}

return $message;
}
}
11 changes: 11 additions & 0 deletions src/Psl/Async/Exception/ExceptionInterface.php
@@ -0,0 +1,11 @@
<?php

declare(strict_types=1);

namespace Psl\Async\Exception;

use Psl\Exception;

interface ExceptionInterface extends Exception\ExceptionInterface
{
}

0 comments on commit fa272fc

Please sign in to comment.