diff --git a/CHANGELOG.md b/CHANGELOG.md index 274827908..0b7b9e8ff 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,4 +11,4 @@ * dropped support for PHP 8.0 * **BC** - signature of `Psl\Type\object` function changed from `object(classname $classname): TypeInterface` to `object(): TypeInterface` ( to preserve the old behavior, use `Psl\Type\instance_of` ) * introduced `Psl\Type\instance_of` function, with the signature of `instance_of(classname $classname): TypeInterface`. - +* introduced a new `Psl\Async` component. diff --git a/docs/README.md b/docs/README.md index f739fef1c..c929aa256 100644 --- a/docs/README.md +++ b/docs/README.md @@ -7,6 +7,7 @@ # Components API - [Psl](./component/psl.md) +- [Psl\Async](./component/async.md) - [Psl\Class](./component/class.md) - [Psl\Collection](./component/collection.md) - [Psl\DataStructure](./component/data-structure.md) diff --git a/docs/component/async.md b/docs/component/async.md new file mode 100644 index 000000000..78781212b --- /dev/null +++ b/docs/component/async.md @@ -0,0 +1,33 @@ + + +[*index](./../README.md) + +--- + +### `Psl\Async` Component + +#### `Functions` + +- [all](./../../src/Psl/Async/all.php#L17) +- [any](./../../src/Psl/Async/any.php#L25) +- [await](./../../src/Psl/Async/await.php#L18) +- [await_readable](./../../src/Psl/Async/await_readable.php#L18) +- [await_signal](./../../src/Psl/Async/await_signal.php#L18) +- [await_writable](./../../src/Psl/Async/await_writable.php#L18) +- [concurrently](./../../src/Psl/Async/concurrently.php#L19) +- [first](./../../src/Psl/Async/first.php#L22) +- [later](./../../src/Psl/Async/later.php#L14) +- [run](./../../src/Psl/Async/run.php#L19) +- [usleep](./../../src/Psl/Async/usleep.php#L10) + +#### `Classes` + +- [Awaitable](./../../src/Psl/Async/Awaitable.php#L17) +- [Deferred](./../../src/Psl/Async/Deferred.php#L13) +- [Scheduler](./../../src/Psl/Async/Scheduler.php#L19) + + diff --git a/docs/documenter.php b/docs/documenter.php index 49aa7ba8a..f2a5e2646 100644 --- a/docs/documenter.php +++ b/docs/documenter.php @@ -181,6 +181,7 @@ function get_all_components(): array { $components = [ 'Psl', + 'Psl\\Async', 'Psl\\Class', 'Psl\\Collection', 'Psl\\DataStructure', diff --git a/src/Psl/Async/Awaitable.php b/src/Psl/Async/Awaitable.php new file mode 100644 index 000000000..945db612d --- /dev/null +++ b/src/Psl/Async/Awaitable.php @@ -0,0 +1,200 @@ + $state + * + * @internal Use {@see Deferred} to create and resolve an awaitable. + */ + public function __construct(State $state) + { + $this->state = $state; + } + + /** + * Iterate over the given awaitables in completion order. + * + * @template Tk + * @template Tv + * + * @param iterable> $awaitables + * + * @return Generator, void, void> + */ + public static function iterate(iterable $awaitables): Generator + { + $iterator = new AwaitableIterator(); + + if (is_array($awaitables)) { + foreach ($awaitables as $key => $awaitable) { + /** @psalm-suppress MissingThrowsDocblock */ + $iterator->enqueue($awaitable->state, $key, $awaitable); + } + + /** @psalm-suppress MissingThrowsDocblock */ + $iterator->complete(); + } else { + Scheduler::defer(static function () use ($awaitables, $iterator): void { + // @codeCoverageIgnoreStart + try { + foreach ($awaitables as $key => $awaitable) { + $iterator->enqueue($awaitable->state, $key, $awaitable); + } + + /** @psalm-suppress MissingThrowsDocblock */ + $iterator->complete(); + } catch (Throwable $exception) { + /** @psalm-suppress MissingThrowsDocblock */ + $iterator->error($exception); + } + // @codeCoverageIgnoreEnd + }); + } + + /** @psalm-suppress MissingThrowsDocblock */ + while ($item = $iterator->consume()) { + yield $item[0] => $item[1]; + } + } + + /** + * @template Tv + * + * @param Tv $result + * + * @return Awaitable + */ + public static function complete(mixed $result): self + { + /** @var State $state */ + $state = new State(); + /** @psalm-suppress MissingThrowsDocblock */ + $state->complete($result); + + return new self($state); + } + + /** + * @return Awaitable + */ + public static function error(Throwable $throwable): self + { + /** @var State $state */ + $state = new State(); + /** @psalm-suppress MissingThrowsDocblock */ + $state->error($throwable); + + return new self($state); + } + + /** + * @return bool True if the operation has completed. + */ + public function isComplete(): bool + { + return $this->state->isComplete(); + } + + /** + * Attaches a callback that is invoked if this awaitable completes. + * + * The returned awaitable is completed with the return value of the callback, + * or errors with an exception thrown from the callback. + * + * @template Ts + * + * @param callable(T): Ts $on_success + * @param callable(Throwable): Ts $on_failure + * + * @return Awaitable + */ + public function then(callable $on_success, callable $on_failure): self + { + /** @var State $state */ + $state = new State(); + + $this->state->subscribe( + /** + * @param null|Throwable $error + * @param null|T $value + */ + static function (?Throwable $error, mixed $value) use ($state, $on_success, $on_failure): void { + if ($error) { + try { + $state->complete($on_failure($error)); + } catch (Throwable $throwable) { + $state->error($throwable); + } + + return; + } + + try { + /** + * @var T $value + */ + $state->complete($on_success($value)); + } catch (Throwable $exception) { + $state->error($exception); + } + }, + ); + + return new self($state); + } + + + /** + * Awaits the operation to complete. + * + * Throws an exception if the operation fails. + * + * @return T + */ + public function await(): mixed + { + $suspension = Scheduler::createSuspension(); + + $this->state->subscribe( + /** + * @param null|Throwable $error + * @param null|T $value + */ + static function (?Throwable $error, mixed $value) use ($suspension): void { + if ($error) { + $suspension->throw($error); + } else { + $suspension->resume($value); + } + }, + ); + + /** @var T */ + return $suspension->suspend(); + } + + /** + * Do not forward unhandled errors to the event loop handler. + */ + public function ignore(): void + { + $this->state->ignore(); + } +} diff --git a/src/Psl/Async/Deferred.php b/src/Psl/Async/Deferred.php new file mode 100644 index 000000000..3361fd02c --- /dev/null +++ b/src/Psl/Async/Deferred.php @@ -0,0 +1,70 @@ + + */ + private Internal\State $state; + + /** + * @var Awaitable + */ + private Awaitable $awaitable; + + public function __construct() + { + $this->state = new Internal\State(); + $this->awaitable = new Awaitable($this->state); + } + + /** + * Completes the operation with a result value. + * + * @param T $result Result of the operation. + * + * @throws Psl\Exception\InvariantViolationException If the operation is no longer pending. + */ + public function complete(mixed $result): void + { + $this->state->complete($result); + } + + /** + * Marks the operation as failed. + * + * @param Throwable $throwable Throwable to indicate the error. + * + * @throws Psl\Exception\InvariantViolationException If the operation is no longer pending. + */ + public function error(Throwable $throwable): void + { + $this->state->error($throwable); + } + + /** + * @return bool True if the operation has completed. + */ + public function isComplete(): bool + { + return $this->state->isComplete(); + } + + /** + * @return Awaitable The awaitable associated with this Deferred. + */ + public function getAwaitable(): Awaitable + { + return $this->awaitable; + } +} diff --git a/src/Psl/Async/Exception/CompositeException.php b/src/Psl/Async/Exception/CompositeException.php new file mode 100644 index 000000000..8931e7a81 --- /dev/null +++ b/src/Psl/Async/Exception/CompositeException.php @@ -0,0 +1,57 @@ + + */ + private array $reasons; + + /** + * @param non-empty-array $reasons Array of exceptions. + * @param string|null $message Exception message, defaults to message generated from passed exceptions. + */ + public function __construct(array $reasons, ?string $message = null) + { + parent::__construct($message ?? $this->generateMessage($reasons)); + + $this->reasons = $reasons; + } + + /** + * @return non-empty-array + */ + public function getReasons(): array + { + return $this->reasons; + } + + /** + * @param non-empty-array $reasons + */ + private function generateMessage(array $reasons): string + { + $message = Str\format('"Multiple errors encountered (%d); use "%s::getReasons()" to retrieve the array of exceptions thrown:', Iter\count($reasons), self::class); + + foreach ($reasons as $reason) { + $message .= PHP_EOL . PHP_EOL . $reason::class; + + if ($reason->getMessage() !== '') { + $message .= ': ' . $reason->getMessage(); + } + } + + return $message; + } +} diff --git a/src/Psl/Async/Exception/ExceptionInterface.php b/src/Psl/Async/Exception/ExceptionInterface.php new file mode 100644 index 000000000..30f213562 --- /dev/null +++ b/src/Psl/Async/Exception/ExceptionInterface.php @@ -0,0 +1,11 @@ +getCode(), + $throwable + ); + } +} diff --git a/src/Psl/Async/Internal/AwaitableIterator.php b/src/Psl/Async/Internal/AwaitableIterator.php new file mode 100644 index 000000000..8b85de50f --- /dev/null +++ b/src/Psl/Async/Internal/AwaitableIterator.php @@ -0,0 +1,140 @@ + + */ + private AwaitableIteratorQueue $queue; + + /** + * @var null|Awaitable|Awaitable|Awaitable}> + */ + private ?Awaitable $complete = null; + + public function __construct() + { + $this->queue = new AwaitableIteratorQueue(); + } + + /** + * @param State $state + * @param Tk $key + * @param Awaitable $awaitable + * + * @throws Psl\Exception\InvariantViolationException If the iterator has already been marked as complete. + */ + public function enqueue(State $state, mixed $key, Awaitable $awaitable): void + { + Psl\invariant(null === $this->complete, 'Iterator has already been marked as complete'); + + $queue = $this->queue; // Using separate object to avoid a circular reference. + $id = $state->subscribe( + /** + * @param Tv|null $_result + */ + static function ( + ?Throwable $_error, + mixed $_result, + string $id + ) use ( + $key, + $awaitable, + $queue + ): void { + unset($queue->pending[$id]); + + if ($queue->suspension) { + $queue->suspension->resume([$key, $awaitable]); + $queue->suspension = null; + return; + } + + $queue->items[] = [$key, $awaitable]; + } + ); + + $queue->pending[$id] = $state; + } + + /** + * @throws Psl\Exception\InvariantViolationException If the iterator has already been marked as complete. + */ + public function complete(): void + { + Psl\invariant(null === $this->complete, 'Iterator has already been marked as complete'); + + $this->complete = Awaitable::complete(null); + + if (!$this->queue->pending && $this->queue->suspension) { + $this->queue->suspension->resume(null); + $this->queue->suspension = null; + } + } + + /** + * @throws Psl\Exception\InvariantViolationException If the iterator has already been marked as complete. + */ + public function error(Throwable $exception): void + { + Psl\invariant(null === $this->complete, 'Iterator has already been marked as complete'); + + $this->complete = Awaitable::error($exception); + + if (!$this->queue->pending && $this->queue->suspension) { + $this->queue->suspension->throw($exception); + $this->queue->suspension = null; + } + } + + /** + * @throws Psl\Exception\InvariantViolationException If {@see consume()} is called concurrently. + * + * @return null|array{0: Tk, 1: Awaitable} + */ + public function consume(): ?array + { + Psl\invariant(null === $this->queue->suspension, 'Concurrent consume() operations are not supported'); + + if (Iter\is_empty($this->queue->items)) { + if (Iter\is_empty($this->queue->pending) && $this->complete !== null) { + return $this->complete->await(); + } + + $this->queue->suspension = Psl\Async\Scheduler::createSuspension(); + + /** @var null|array{0: Tk, 1: Awaitable} */ + return $this->queue->suspension->suspend(); + } + + $key = (int) Iter\first_key($this->queue->items); + $item = $this->queue->items[$key]; + + unset($this->queue->items[$key]); + + /** @var null|array{0: Tk, 1: Awaitable} */ + return $item; + } + + public function __destruct() + { + foreach ($this->queue->pending as $id => $state) { + $state->unsubscribe($id); + } + } +} diff --git a/src/Psl/Async/Internal/AwaitableIteratorQueue.php b/src/Psl/Async/Internal/AwaitableIteratorQueue.php new file mode 100644 index 000000000..d37c96e9d --- /dev/null +++ b/src/Psl/Async/Internal/AwaitableIteratorQueue.php @@ -0,0 +1,29 @@ +}> + */ + public array $items = []; + + /** + * @var array> + */ + public array $pending = []; + + public ?Suspension $suspension = null; +} diff --git a/src/Psl/Async/Internal/State.php b/src/Psl/Async/Internal/State.php new file mode 100644 index 000000000..ade9f5d5d --- /dev/null +++ b/src/Psl/Async/Internal/State.php @@ -0,0 +1,140 @@ + + */ + private array $callbacks = []; + + /** + * @var T|null + */ + private mixed $result = null; + + private ?Throwable $throwable = null; + + public function __destruct() + { + if ($this->throwable && !$this->handled) { + $throwable = Exception\UnhandledAwaitableException::forThrowable($this->throwable); + Scheduler::queue(static fn () => throw $throwable); + } + } + + /** + * Registers a callback to be notified once the operation is complete or errored. + * + * The callback is invoked directly from the event loop context, so suspension within the callback is not possible. + * + * @param (callable(?Throwable, ?T, string): void) $callback Callback invoked on completion of the awaitable. + * + * @return string Identifier that can be used to cancel interest for this awaitable. + */ + public function subscribe(callable $callback): string + { + /** @psalm-suppress StringIncrement */ + $id = self::$nextId++; + + $this->handled = true; + + if ($this->complete) { + Scheduler::queue(fn() => $callback($this->throwable, $this->result, $id)); + } else { + $this->callbacks[$id] = $callback; + } + + return $id; + } + + /** + * Cancels a subscription. + * + * Cancellations are advisory only. The callback might still be called if it is already queued for execution. + * + * @param string $id Identifier returned from subscribe() + */ + public function unsubscribe(string $id): void + { + unset($this->callbacks[$id]); + } + + /** + * Completes the operation with a result value. + * + * @param T $result Result of the operation. + * + * @throws Psl\Exception\InvariantViolationException If the operation is no longer pending. + * @throws Psl\Exception\InvariantViolationException If $result is an instance of {@see Awaitable}. + */ + public function complete(mixed $result): void + { + Psl\invariant(!$this->complete, 'Operation is no longer pending.'); + Psl\invariant(!$result instanceof Awaitable, 'Cannot complete with an instance of ' . Awaitable::class); + + $this->result = $result; + $this->invokeCallbacks(); + } + + /** + * Marks the operation as failed. + * + * @throws Psl\Exception\InvariantViolationException If the operation is no longer pending. + */ + public function error(Throwable $throwable): void + { + Psl\invariant(!$this->complete, 'Operation is no longer pending.'); + + $this->throwable = $throwable; + $this->invokeCallbacks(); + } + + /** + * Suppress the exception thrown to the loop error handler if and operation error is not handled by a callback. + */ + public function ignore(): void + { + $this->handled = true; + } + + /** + * @return bool True if the operation has completed. + */ + public function isComplete(): bool + { + return $this->complete; + } + + private function invokeCallbacks(): void + { + $this->complete = true; + + foreach ($this->callbacks as $id => $callback) { + Scheduler::queue(fn() => $callback($this->throwable, $this->result, $id)); + } + + $this->callbacks = []; + } +} diff --git a/src/Psl/Async/Scheduler.php b/src/Psl/Async/Scheduler.php new file mode 100644 index 000000000..5b9fcb9c2 --- /dev/null +++ b/src/Psl/Async/Scheduler.php @@ -0,0 +1,163 @@ +getMessage()); + } + } + + /** + * Disable a callback immediately. + * + * @see EventLoop::disable() + */ + public static function disable(string $callbackId): void + { + EventLoop::disable($callbackId); + } + + /** + * Cancel a callback. + * + * This will detach the event loop from all resources that are associated to the callback. After this operation the + * callback is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid identifier. + * + * @param string $callbackId The callback identifier. + * + * @see EventLoop::cancel() + */ + public static function cancel(string $callbackId): void + { + EventLoop::cancel($callbackId); + } + + /** + * Reference a callback. + * + * This will keep the event loop alive whilst the event is still being monitored. Callbacks have this state by + * default. + * + * @throws Psl\Exception\InvariantViolationException If the callback identifier is invalid. + * + * @see EventLoop::reference() + */ + public static function reference(string $callbackId): void + { + try { + EventLoop::reference($callbackId); + } catch (InvalidCallbackError $error) { + Psl\invariant_violation($error->getMessage()); + } + } + + /** + * Unreference a callback. + * + * The event loop should exit the run method when only unreferenced callbacks are still being monitored. Callbacks + * are all referenced by default. + * + * @see EventLoop::unreference() + */ + public static function unreference(string $callbackId): void + { + EventLoop::unreference($callbackId); + } +} diff --git a/src/Psl/Async/all.php b/src/Psl/Async/all.php new file mode 100644 index 000000000..8d74e3bec --- /dev/null +++ b/src/Psl/Async/all.php @@ -0,0 +1,27 @@ +> $awaitables + * + * @return array Unwrapped values with the order preserved. + */ +function all(iterable $awaitables): array +{ + $values = []; + + // Awaitable::iterate() to throw the first error based on completion order instead of argument order + foreach (Awaitable::iterate($awaitables) as $index => $awaitable) { + $values[$index] = $awaitable->await(); + } + + return $values; +} diff --git a/src/Psl/Async/any.php b/src/Psl/Async/any.php new file mode 100644 index 000000000..a8ab7c109 --- /dev/null +++ b/src/Psl/Async/any.php @@ -0,0 +1,39 @@ +> $awaitables + * + * @throws CompositeException If all $awaitables errored. + * @throws Psl\Exception\InvariantViolationException If no $awaitables were provided. + * + * @return T + */ +function any(iterable $awaitables): mixed +{ + $errors = []; + foreach (Awaitable::iterate($awaitables) as $first) { + try { + return $first->await(); + } catch (Throwable $throwable) { + $errors[] = $throwable; + } + } + + Psl\invariant([] !== $errors, 'No awaitables were provided.'); + + throw new CompositeException($errors); +} diff --git a/src/Psl/Async/await.php b/src/Psl/Async/await.php new file mode 100644 index 000000000..f6ab06af5 --- /dev/null +++ b/src/Psl/Async/await.php @@ -0,0 +1,21 @@ + $awaitable + * + * @return T + * + * @see Awaitable::await() + */ +function await(Awaitable $awaitable): mixed +{ + return $awaitable->await(); +} diff --git a/src/Psl/Async/await_readable.php b/src/Psl/Async/await_readable.php new file mode 100644 index 000000000..d506054cc --- /dev/null +++ b/src/Psl/Async/await_readable.php @@ -0,0 +1,45 @@ + $suspension->resume($resource) + ); + + if (!$reference) { + Scheduler::unreference($watcher); + } + + $timeout_watcher = null; + if (null !== $timeout_ms) { + $timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException())); + } + + try { + $suspension->suspend(); + } finally { + Scheduler::cancel($watcher); + + if (null !== $timeout_watcher) { + Scheduler::cancel($timeout_watcher); + } + } +} diff --git a/src/Psl/Async/await_signal.php b/src/Psl/Async/await_signal.php new file mode 100644 index 000000000..17376f150 --- /dev/null +++ b/src/Psl/Async/await_signal.php @@ -0,0 +1,45 @@ + $suspension->resume($signal_number) + ); + + if (!$reference) { + Scheduler::unreference($watcher); + } + + $timeout_watcher = null; + if (null !== $timeout_ms) { + $timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException())); + } + + try { + $suspension->suspend(); + } finally { + Scheduler::cancel($watcher); + + if (null !== $timeout_watcher) { + Scheduler::cancel($timeout_watcher); + } + } +} diff --git a/src/Psl/Async/await_writable.php b/src/Psl/Async/await_writable.php new file mode 100644 index 000000000..4d9bbbbd1 --- /dev/null +++ b/src/Psl/Async/await_writable.php @@ -0,0 +1,45 @@ + $suspension->resume($resource) + ); + + if (!$reference) { + Scheduler::unreference($watcher); + } + + $timeout_watcher = null; + if (null !== $timeout_ms) { + $timeout_watcher = Scheduler::delay($timeout_ms, static fn() => $suspension->throw(new Exception\TimeoutException())); + } + + try { + $suspension->suspend(); + } finally { + Scheduler::cancel($watcher); + + if (null !== $timeout_watcher) { + Scheduler::cancel($timeout_watcher); + } + } +} diff --git a/src/Psl/Async/concurrently.php b/src/Psl/Async/concurrently.php new file mode 100644 index 000000000..7edad40f3 --- /dev/null +++ b/src/Psl/Async/concurrently.php @@ -0,0 +1,34 @@ + $callables + * + * @return Awaitable> unwrapped values with the order preserved. + */ +function concurrently(iterable $callables): Awaitable +{ + return run(static function () use ($callables): array { + $awaitables = Dict\map( + $callables, + /** + * @param callable(): Tv $callable + * + * @return Awaitable + */ + static fn(callable $callable): Awaitable => run($callable), + ); + + return namespace\all($awaitables); + }); +} diff --git a/src/Psl/Async/first.php b/src/Psl/Async/first.php new file mode 100644 index 000000000..decdfbbbd --- /dev/null +++ b/src/Psl/Async/first.php @@ -0,0 +1,29 @@ +> $awaitables + * + * @throws Psl\Exception\InvariantViolationException If $awaitables is empty. + * + * @return T + */ +function first(iterable $awaitables): mixed +{ + foreach (Awaitable::iterate($awaitables) as $first) { + return $first->await(); + } + + Psl\invariant_violation('No awaitables were provided.'); +} diff --git a/src/Psl/Async/later.php b/src/Psl/Async/later.php new file mode 100644 index 000000000..649a9c493 --- /dev/null +++ b/src/Psl/Async/later.php @@ -0,0 +1,24 @@ + $suspension->resume(null)); + + try { + $suspension->suspend(); + } finally { + Scheduler::cancel($watcher); + } +} diff --git a/src/Psl/Async/run.php b/src/Psl/Async/run.php new file mode 100644 index 000000000..c06393913 --- /dev/null +++ b/src/Psl/Async/run.php @@ -0,0 +1,42 @@ + + */ +function run(callable $callable, ?int $timeout_ms = null): Awaitable +{ + $state = new Internal\State(); + + Scheduler::defer(static function () use ($callable, $state): void { + try { + $state->complete($callable()); + } catch (Throwable $throwable) { + $state->error($throwable); + } + }); + + if (null !== $timeout_ms) { + $id = Scheduler::delay($timeout_ms, static function () use ($state): void { + $state->error(new TimeoutException()); + }); + + $state->subscribe(static function () use ($id): void { + Scheduler::cancel($id); + }); + } + + return new Awaitable($state); +} diff --git a/src/Psl/Async/usleep.php b/src/Psl/Async/usleep.php new file mode 100644 index 000000000..48e4c6563 --- /dev/null +++ b/src/Psl/Async/usleep.php @@ -0,0 +1,20 @@ + $suspension->resume(null)); + + try { + $suspension->suspend(); + } finally { + Scheduler::cancel($watcher); + } +} diff --git a/src/Psl/Internal/Loader.php b/src/Psl/Internal/Loader.php index c5ca4a444..ea5fb9f9d 100644 --- a/src/Psl/Internal/Loader.php +++ b/src/Psl/Internal/Loader.php @@ -442,6 +442,17 @@ final class Loader 'Psl\Interface\defined', 'Psl\Trait\exists', 'Psl\Trait\defined', + 'Psl\Async\run', + 'Psl\Async\concurrently', + 'Psl\Async\await', + 'Psl\Async\any', + 'Psl\Async\all', + 'Psl\Async\await_readable', + 'Psl\Async\await_writable', + 'Psl\Async\await_signal', + 'Psl\Async\first', + 'Psl\Async\later', + 'Psl\Async\usleep', ]; public const INTERFACES = [ @@ -488,6 +499,7 @@ final class Loader 'Psl\IO\SeekWriteHandleInterface', 'Psl\IO\WriteHandleInterface', 'Psl\RandomSequence\SequenceInterface', + 'Psl\Async\Exception\ExceptionInterface', ]; public const TRAITS = [ @@ -573,6 +585,16 @@ final class Loader 'Psl\RandomSequence\MersenneTwisterSequence', 'Psl\RandomSequence\MersenneTwisterPHPVariantSequence', 'Psl\RandomSequence\SecureSequence', + 'Psl\Async\Exception\CompositeException', + 'Psl\Async\Exception\RuntimeException', + 'Psl\Async\Exception\TimeoutException', + 'Psl\Async\Exception\UnhandledAwaitableException', + 'Psl\Async\Internal\AwaitableIterator', + 'Psl\Async\Internal\AwaitableIteratorQueue', + 'Psl\Async\Internal\State', + 'Psl\Async\Awaitable', + 'Psl\Async\Deferred', + 'Psl\Async\Scheduler', ]; public const TYPE_CONSTANTS = 1; diff --git a/tests/unit/Async/AnyTest.php b/tests/unit/Async/AnyTest.php new file mode 100644 index 000000000..a280fd550 --- /dev/null +++ b/tests/unit/Async/AnyTest.php @@ -0,0 +1,62 @@ +expectException(InvariantViolationException::class); + $this->expectExceptionMessage('No awaitables were provided.'); + + Async\any([]); + } + + public function testAnyWillFailingAwaitables(): void + { + $this->expectException(Async\Exception\CompositeException::class); + + Async\any([ + Async\Awaitable::error(new InvariantViolationException('foo')), + Async\Awaitable::error(new InvariantViolationException('bar')), + ]); + } +} diff --git a/tests/unit/Async/AwaitReadableTest.php b/tests/unit/Async/AwaitReadableTest.php new file mode 100644 index 000000000..aa9e13913 --- /dev/null +++ b/tests/unit/Async/AwaitReadableTest.php @@ -0,0 +1,46 @@ +value .= '[read:waiting]'; + + Async\await_readable($read_socket); + + $ref->value .= '[read:ready]'; + }), + Async\run(static function () use ($ref, $write_socket) { + $ref->value .= '[write:sleep]'; + + Async\usleep(1000); + + fwrite($write_socket, "hello", 5); + + $ref->value .= '[write:done]'; + }), + ]; + + Async\all($handles); + + static::assertSame('[read:waiting][write:sleep][write:done][read:ready]', $ref->value); + } +} diff --git a/tests/unit/Async/AwaitableTest.php b/tests/unit/Async/AwaitableTest.php new file mode 100644 index 000000000..51dac54af --- /dev/null +++ b/tests/unit/Async/AwaitableTest.php @@ -0,0 +1,198 @@ +isComplete()); + $state->complete('hello'); + static::assertTrue($awaitable->isComplete()); + + $result = $awaitable->await(); + + static::assertSame('hello', $result); + } + + public function testErroredAwait(): void + { + $state = new State(); + $awaitable = new Awaitable($state); + + static::assertFalse($awaitable->isComplete()); + $state->error(new InvariantViolationException('foo')); + static::assertTrue($awaitable->isComplete()); + + $this->expectException(InvariantViolationException::class); + $this->expectExceptionMessage('foo'); + + $awaitable->await(); + } + + public function testDiscardedAwaitableError(): void + { + $state = new State(); + $awaitable = new Awaitable($state); + + static::assertFalse($awaitable->isComplete()); + $state->error(new InvariantViolationException('foo')); + static::assertTrue($awaitable->isComplete()); + + $this->expectException(UnhandledAwaitableException::class); + $this->expectExceptionMessage('Unhandled awaitable error "Psl\Exception\InvariantViolationException", make sure to call `Awaitable::await()` before the awaitable is destroyed, or call `Awaitable::ignore()` to ignore exceptions.'); + + unset($awaitable, $state); + + Async\later(); + } + + public function testDiscardedIgnoredAwaitableError(): void + { + $state = new State(); + $awaitable = new Awaitable($state); + + static::assertFalse($awaitable->isComplete()); + $state->error(new InvariantViolationException('foo')); + static::assertTrue($awaitable->isComplete()); + + $awaitable->ignore(); + + unset($awaitable, $state); + + Async\later(); + } + + public function testIterate(): void + { + $iterator = Awaitable::iterate([ + 'foo' => Awaitable::complete('foo'), + 'bar' => Awaitable::error(new InvariantViolationException('bar')), + 'baz' => Async\run(static function () { + Async\usleep(100); + + throw new InvariantViolationException('baz'); + }), + 'qux' => Async\run(static function () { + Async\usleep(200); + + return 'qux'; + }), + ]); + + static::assertSame('foo', $iterator->key()); + static::assertSame('foo', $iterator->current()->await()); + + $iterator->next(); + + static::assertSame('bar', $iterator->key()); + + try { + $iterator->current()->await(); + static::fail(); + } catch (InvariantViolationException) { + $this->addToAssertionCount(1); + } + + $iterator->next(); + + static::assertSame('baz', $iterator->key()); + + try { + $iterator->current()->await(); + static::fail(); + } catch (InvariantViolationException) { + $this->addToAssertionCount(1); + } + + $iterator->next(); + + static::assertSame('qux', $iterator->key()); + static::assertSame('qux', $iterator->current()->await()); + } + + public function testIterateGenerator(): void + { + $generator1 = Async\run(static function (): iterable { + yield 'foo' => 'foo'; + + Async\usleep(300); + + yield 'bar' => 'bar'; + }); + + $generator2 = Async\run(static function (): iterable { + yield 'baz' => 'baz'; + + Async\usleep(100); + + yield 'qux' => 'qux'; + }); + + $generator3 = Async\run(static function () use ($generator1, $generator2): iterable { + yield 'gen1' => $generator1; + + Async\usleep(200); + + yield 'gen2' => $generator2; + })->await(); + + $values = []; + // Awaitable::iterate() to throw the first error based on completion order instead of argument order + foreach (Awaitable::iterate($generator3) as $index => $awaitable) { + $values[$index] = $awaitable->await(); + } + + static::assertArrayHasKey('gen1', $values); + static::assertArrayHasKey('gen2', $values); + + $values['gen1'] = Dict\from_iterable($values['gen1']); + $values['gen2'] = Dict\from_iterable($values['gen2']); + + static::assertSame(['foo' => 'foo', 'bar' => 'bar'], $values['gen1']); + static::assertSame(['baz' => 'baz', 'qux' => 'qux'], $values['gen2']); + } + + public function testThenOnSuccess(): void + { + $awaitable = Async\run(static function (): string { + return 'hello'; + }); + + $awaitable = $awaitable + ->then( + static fn(string $result) => Str\reverse($result), + static fn(Throwable $exception) => exit(0), + ) + ->then( + static fn(string $result) => throw new InvariantViolationException($result), + static fn(Throwable $exception) => exit(0), + ) + ->then( + static fn($result) => exit(0), + static fn(Throwable $exception) => throw $exception, + ) + ->then( + static fn($result) => exit(0), + static fn(Throwable $exception) => $exception->getMessage(), + ) + ; + + static::assertSame('olleh', $awaitable->await()); + } +} diff --git a/tests/unit/Async/Exception/CompositeExceptionTest.php b/tests/unit/Async/Exception/CompositeExceptionTest.php new file mode 100644 index 000000000..dc7e8c2e7 --- /dev/null +++ b/tests/unit/Async/Exception/CompositeExceptionTest.php @@ -0,0 +1,24 @@ +getReasons()); + } +} diff --git a/tests/unit/Async/FirstTest.php b/tests/unit/Async/FirstTest.php new file mode 100644 index 000000000..17b527756 --- /dev/null +++ b/tests/unit/Async/FirstTest.php @@ -0,0 +1,51 @@ +expectException(InvariantViolationException::class); + + Async\first([]); + } +} diff --git a/tests/unit/Async/RunTest.php b/tests/unit/Async/RunTest.php new file mode 100644 index 000000000..36bb12932 --- /dev/null +++ b/tests/unit/Async/RunTest.php @@ -0,0 +1,58 @@ + Async\usleep(100), + static fn() => Async\usleep(100), + static fn() => Async\usleep(100), + ])); + + return 'hello'; + }); + + static::assertSame('hello', $awaitable->await()); + } + + public function testRunWithTimeout(): void + { + $awaitable = Async\run(static function (): string { + Async\await(Async\concurrently([ + static fn() => Async\usleep(1000), + static fn() => Async\usleep(1000), + static fn() => Async\usleep(1000), + ])); + + return 'hello'; + }, timeout_ms: 2000); + + static::assertSame('hello', $awaitable->await()); + } + + public function testRunTimedOut(): void + { + $awaitable = Async\run(static function (): string { + Async\await(Async\concurrently([ + static fn() => Async\usleep(1000), + static fn() => Async\usleep(1000), + static fn() => Async\usleep(1000), + ])); + + return 'hello'; + }, timeout_ms: 100); + + $this->expectException(Async\Exception\TimeoutException::class); + + $awaitable->await(); + } +}