diff --git a/docs/component/async.md b/docs/component/async.md index ff691210..593f4d6c 100644 --- a/docs/component/async.md +++ b/docs/component/async.md @@ -12,7 +12,7 @@ #### `Functions` -- [all](./../../src/Psl/Async/all.php#L17) +- [all](./../../src/Psl/Async/all.php#L23) - [any](./../../src/Psl/Async/any.php#L25) - [await](./../../src/Psl/Async/await.php#L18) - [await_readable](./../../src/Psl/Async/await_readable.php#L18) @@ -21,7 +21,7 @@ - [concurrently](./../../src/Psl/Async/concurrently.php#L19) - [first](./../../src/Psl/Async/first.php#L24) - [later](./../../src/Psl/Async/later.php#L14) -- [run](./../../src/Psl/Async/run.php#L19) +- [run](./../../src/Psl/Async/run.php#L20) - [usleep](./../../src/Psl/Async/usleep.php#L10) #### `Classes` diff --git a/src/Psl/Async/all.php b/src/Psl/Async/all.php index 8d74e3be..00055493 100644 --- a/src/Psl/Async/all.php +++ b/src/Psl/Async/all.php @@ -4,8 +4,14 @@ namespace Psl\Async; +use Throwable; + /** - * Awaits all awaitables to complete or aborts if any errors concurrently. + * Awaits all awaitables to complete concurrently. + * + * If one or more awaitables fail, the first exception will be thrown. + * + * This function will wait for all awaitables to finish, even if the first one fails. * * @template Tk of array-key * @template Tv @@ -17,10 +23,20 @@ function all(iterable $awaitables): array { $values = []; + $errors = []; // Awaitable::iterate() to throw the first error based on completion order instead of argument order foreach (Awaitable::iterate($awaitables) as $index => $awaitable) { - $values[$index] = $awaitable->await(); + try { + $values[$index] = $awaitable->await(); + } catch (Throwable $throwable) { + $errors[] = $throwable; + } + } + + if ($errors !== []) { + /** @psalm-suppress MissingThrowsDocblock */ + throw $errors[0]; } return $values; diff --git a/src/Psl/Async/await_readable.php b/src/Psl/Async/await_readable.php index 7457ea81..5e11d164 100644 --- a/src/Psl/Async/await_readable.php +++ b/src/Psl/Async/await_readable.php @@ -27,13 +27,7 @@ function await_readable(mixed $resource, bool $reference = true, ?int $timeout_m $watcher = EventLoop::onReadable( $resource, - static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void { - if (null !== $timeout_watcher) { - Scheduler::cancel($timeout_watcher); - } - - $suspension->resume($resource); - }, + static fn() => $suspension->resume($resource), ); if (!$reference) { @@ -44,5 +38,10 @@ static function (string $_watcher, mixed $resource) use ($suspension, $timeout_w $suspension->suspend(); } finally { Scheduler::cancel($watcher); + + // cancel timeout watcher + if (null !== $timeout_watcher) { + Scheduler::cancel($timeout_watcher); + } } } diff --git a/src/Psl/Async/await_signal.php b/src/Psl/Async/await_signal.php index 6a0a2381..9c85a67f 100644 --- a/src/Psl/Async/await_signal.php +++ b/src/Psl/Async/await_signal.php @@ -27,13 +27,7 @@ function await_signal(int $signal, bool $reference = true, ?int $timeout_ms = nu $watcher = EventLoop::onSignal( $signal, - static function (string $_watcher, int $signal) use ($suspension, $timeout_watcher): void { - if (null !== $timeout_watcher) { - Scheduler::cancel($timeout_watcher); - } - - $suspension->resume($signal); - }, + static fn() => $suspension->resume($signal), ); if (!$reference) { @@ -44,5 +38,10 @@ static function (string $_watcher, int $signal) use ($suspension, $timeout_watch $suspension->suspend(); } finally { Scheduler::cancel($watcher); + + // cancel timeout watcher + if (null !== $timeout_watcher) { + Scheduler::cancel($timeout_watcher); + } } } diff --git a/src/Psl/Async/await_writable.php b/src/Psl/Async/await_writable.php index 46622f82..b5e3bd2d 100644 --- a/src/Psl/Async/await_writable.php +++ b/src/Psl/Async/await_writable.php @@ -27,23 +27,21 @@ function await_writable(mixed $resource, bool $reference = true, ?int $timeout_m $watcher = EventLoop::onWritable( $resource, - static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void { - if (null !== $timeout_watcher) { - Scheduler::cancel($timeout_watcher); - } - - $suspension->resume($resource); - }, + static fn() => $suspension->resume($resource), ); if (!$reference) { Scheduler::unreference($watcher); } - try { $suspension->suspend(); } finally { Scheduler::cancel($watcher); + + // cancel timeout watcher + if (null !== $timeout_watcher) { + Scheduler::cancel($timeout_watcher); + } } } diff --git a/src/Psl/Async/run.php b/src/Psl/Async/run.php index 8c3b354a..04b27113 100644 --- a/src/Psl/Async/run.php +++ b/src/Psl/Async/run.php @@ -4,6 +4,7 @@ namespace Psl\Async; +use Psl; use Psl\Async\Exception\TimeoutException; use Throwable; @@ -20,33 +21,50 @@ function run(callable $callable, ?int $timeout_ms = null): Awaitable { $state = new Internal\State(); - $timeout_watcher = null; + /** @var Psl\Ref $timeout_watcher */ + $timeout_watcher = new Psl\Ref(null); + /** @var Psl\Ref $delay_watcher */ + $delay_watcher = new Psl\Ref(null); + if (null !== $timeout_ms) { - $timeout_watcher = Scheduler::delay($timeout_ms, static function () use ($state): void { + $timeout_watcher->value = Scheduler::delay($timeout_ms, static function () use ($state, $delay_watcher, $timeout_watcher): void { + if (null !== $delay_watcher->value) { + $delay_watcher_value = $delay_watcher->value; + $delay_watcher->value = null; + Scheduler::cancel($delay_watcher_value); + } + + $timeout_watcher->value = null; $state->error(new TimeoutException()); }); - Scheduler::unreference($timeout_watcher); + Scheduler::unreference($timeout_watcher->value); } - Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void { + $delay_watcher->value = Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void { + $exception = null; + $result = null; try { $result = $callable(); - if (null !== $timeout_watcher) { - Scheduler::cancel($timeout_watcher); - } + } catch (Throwable $throwable) { + $exception = $throwable; + } - if ($state->isComplete()) { - // timed-out. - return; - } + if (null !== $timeout_watcher->value) { + $timeout_watcher_value = $timeout_watcher->value; + $timeout_watcher->value = null; + Scheduler::cancel($timeout_watcher_value); + } elseif ($state->isComplete()) { + // timeout has been reached. + return; + } + if (null !== $exception) { + $state->error($exception); + } else { $state->complete($result); - } catch (Throwable $throwable) { - $state->error($throwable); } }); - return new Awaitable($state); } diff --git a/tests/unit/Async/AllTest.php b/tests/unit/Async/AllTest.php new file mode 100644 index 00000000..652fd288 --- /dev/null +++ b/tests/unit/Async/AllTest.php @@ -0,0 +1,110 @@ + Async\run(static function (): string { + Async\usleep(100); + + return 'a'; + }), + 'b' => Async\run(static function (): string { + Async\usleep(100); + + return 'b'; + }), + 'c' => Async\run(static function (): string { + Async\usleep(100); + + return 'c'; + }), + ]; + + $results = Async\all($awaitables); + + static::assertSame(['a' => 'a', 'b' => 'b', 'c' => 'c'], $results); + } + + public function testAllForwardsException(): void + { + $this->expectException(InvariantViolationException::class); + $this->expectExceptionMessage('a'); + + Async\all([ + Async\run(static function (): string { + Async\usleep(100); + + throw new InvariantViolationException('a'); + }), + Async\run(static function (): string { + Async\usleep(200); + + throw new InvariantViolationException('b'); + }), + Async\run(static function (): string { + Async\usleep(300); + + return 'c'; + }), + Async\run(static function (): string { + Async\usleep(50); + + Async\later(); + + Async\usleep(50); + + return 'c'; + }), + ]); + } + + public function testAllAwaitablesAreCompleted(): void + { + $ref = new Psl\Ref(''); + + try { + Async\all([ + Async\run(static function () use ($ref): void { + $ref->value .= 'a'; + + throw new InvariantViolationException('a'); + }), + Async\run(static function () use ($ref): void { + Async\usleep(2000); + + $ref->value .= 'b'; + + throw new InvariantViolationException('b'); + }), + Async\run(static function () use ($ref): void { + Async\usleep(3000); + + $ref->value .= 'c'; + }), + Async\run(static function () use ($ref): void { + Async\usleep(500); + + Async\later(); + + Async\usleep(500); + + $ref->value .= 'd'; + }), + ]); + } catch (InvariantViolationException) { + } + + static::assertSame('adbc', $ref->value); + } +}