Skip to content

Commit

Permalink
Merge 4f827f5 into 35477a6
Browse files Browse the repository at this point in the history
  • Loading branch information
azjezz committed Nov 4, 2021
2 parents 35477a6 + 4f827f5 commit 4698341
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 40 deletions.
4 changes: 2 additions & 2 deletions docs/component/async.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

#### `Functions`

- [all](./../../src/Psl/Async/all.php#L17)
- [all](./../../src/Psl/Async/all.php#L23)
- [any](./../../src/Psl/Async/any.php#L25)
- [await](./../../src/Psl/Async/await.php#L18)
- [await_readable](./../../src/Psl/Async/await_readable.php#L18)
Expand All @@ -21,7 +21,7 @@
- [concurrently](./../../src/Psl/Async/concurrently.php#L19)
- [first](./../../src/Psl/Async/first.php#L24)
- [later](./../../src/Psl/Async/later.php#L14)
- [run](./../../src/Psl/Async/run.php#L19)
- [run](./../../src/Psl/Async/run.php#L20)
- [usleep](./../../src/Psl/Async/usleep.php#L10)

#### `Classes`
Expand Down
20 changes: 18 additions & 2 deletions src/Psl/Async/all.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

namespace Psl\Async;

use Throwable;

/**
* Awaits all awaitables to complete or aborts if any errors concurrently.
* Awaits all awaitables to complete concurrently.
*
* If one or more awaitables fail, the first exception will be thrown.
*
* This function will wait for all awaitables to finish, even if the first one fails.
*
* @template Tk of array-key
* @template Tv
Expand All @@ -17,10 +23,20 @@
function all(iterable $awaitables): array
{
$values = [];
$errors = [];

// Awaitable::iterate() to throw the first error based on completion order instead of argument order
foreach (Awaitable::iterate($awaitables) as $index => $awaitable) {
$values[$index] = $awaitable->await();
try {
$values[$index] = $awaitable->await();
} catch (Throwable $throwable) {
$errors[] = $throwable;
}
}

if ($errors !== []) {
/** @psalm-suppress MissingThrowsDocblock */
throw $errors[0];
}

return $values;
Expand Down
13 changes: 6 additions & 7 deletions src/Psl/Async/await_readable.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ function await_readable(mixed $resource, bool $reference = true, ?int $timeout_m

$watcher = EventLoop::onReadable(
$resource,
static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($resource);
},
static fn() => $suspension->resume($resource),
);

if (!$reference) {
Expand All @@ -44,5 +38,10 @@ static function (string $_watcher, mixed $resource) use ($suspension, $timeout_w
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
13 changes: 6 additions & 7 deletions src/Psl/Async/await_signal.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ function await_signal(int $signal, bool $reference = true, ?int $timeout_ms = nu

$watcher = EventLoop::onSignal(
$signal,
static function (string $_watcher, int $signal) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($signal);
},
static fn() => $suspension->resume($signal),
);

if (!$reference) {
Expand All @@ -44,5 +38,10 @@ static function (string $_watcher, int $signal) use ($suspension, $timeout_watch
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
14 changes: 6 additions & 8 deletions src/Psl/Async/await_writable.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,21 @@ function await_writable(mixed $resource, bool $reference = true, ?int $timeout_m

$watcher = EventLoop::onWritable(
$resource,
static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($resource);
},
static fn() => $suspension->resume($resource),
);

if (!$reference) {
Scheduler::unreference($watcher);
}


try {
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
46 changes: 32 additions & 14 deletions src/Psl/Async/run.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Psl\Async;

use Psl;
use Psl\Async\Exception\TimeoutException;
use Throwable;

Expand All @@ -20,33 +21,50 @@ function run(callable $callable, ?int $timeout_ms = null): Awaitable
{
$state = new Internal\State();

$timeout_watcher = null;
/** @var Psl\Ref<string|null> $timeout_watcher */
$timeout_watcher = new Psl\Ref(null);
/** @var Psl\Ref<string|null> $delay_watcher */
$delay_watcher = new Psl\Ref(null);

if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static function () use ($state): void {
$timeout_watcher->value = Scheduler::delay($timeout_ms, static function () use ($state, $delay_watcher, $timeout_watcher): void {
if (null !== $delay_watcher->value) {
$delay_watcher_value = $delay_watcher->value;
$delay_watcher->value = null;
Scheduler::cancel($delay_watcher_value);
}

$timeout_watcher->value = null;
$state->error(new TimeoutException());
});

Scheduler::unreference($timeout_watcher);
Scheduler::unreference($timeout_watcher->value);
}

Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void {
$delay_watcher->value = Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void {
$exception = null;
$result = null;
try {
$result = $callable();
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
} catch (Throwable $throwable) {
$exception = $throwable;
}

if ($state->isComplete()) {
// timed-out.
return;
}
if (null !== $timeout_watcher->value) {
$timeout_watcher_value = $timeout_watcher->value;
$timeout_watcher->value = null;
Scheduler::cancel($timeout_watcher_value);
} elseif ($state->isComplete()) {
// timeout has been reached.
return;
}

if (null !== $exception) {
$state->error($exception);
} else {
$state->complete($result);
} catch (Throwable $throwable) {
$state->error($throwable);
}
});


return new Awaitable($state);
}
110 changes: 110 additions & 0 deletions tests/unit/Async/AllTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?php

declare(strict_types=1);

namespace Psl\Tests\Unit\Async;

use PHPUnit\Framework\TestCase;
use Psl;
use Psl\Async;
use Psl\Exception\InvariantViolationException;

final class AllTest extends TestCase
{
public function testAll(): void
{
$awaitables = [
'a' => Async\run(static function (): string {
Async\usleep(100);

return 'a';
}),
'b' => Async\run(static function (): string {
Async\usleep(100);

return 'b';
}),
'c' => Async\run(static function (): string {
Async\usleep(100);

return 'c';
}),
];

$results = Async\all($awaitables);

static::assertSame(['a' => 'a', 'b' => 'b', 'c' => 'c'], $results);
}

public function testAllForwardsException(): void
{
$this->expectException(InvariantViolationException::class);
$this->expectExceptionMessage('a');

Async\all([
Async\run(static function (): string {
Async\usleep(100);

throw new InvariantViolationException('a');
}),
Async\run(static function (): string {
Async\usleep(200);

throw new InvariantViolationException('b');
}),
Async\run(static function (): string {
Async\usleep(300);

return 'c';
}),
Async\run(static function (): string {
Async\usleep(50);

Async\later();

Async\usleep(50);

return 'c';
}),
]);
}

public function testAllAwaitablesAreCompleted(): void
{
$ref = new Psl\Ref('');

try {
Async\all([
Async\run(static function () use ($ref): void {
$ref->value .= 'a';

throw new InvariantViolationException('a');
}),
Async\run(static function () use ($ref): void {
Async\usleep(2000);

$ref->value .= 'b';

throw new InvariantViolationException('b');
}),
Async\run(static function () use ($ref): void {
Async\usleep(3000);

$ref->value .= 'c';
}),
Async\run(static function () use ($ref): void {
Async\usleep(500);

Async\later();

Async\usleep(500);

$ref->value .= 'd';
}),
]);
} catch (InvariantViolationException) {
}

static::assertSame('adbc', $ref->value);
}
}

0 comments on commit 4698341

Please sign in to comment.