Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(async): always cancel timeout watcher after suspension is finished. #253

Merged
merged 1 commit into from
Nov 4, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/component/async.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

#### `Functions`

- [all](./../../src/Psl/Async/all.php#L17)
- [all](./../../src/Psl/Async/all.php#L23)
- [any](./../../src/Psl/Async/any.php#L25)
- [await](./../../src/Psl/Async/await.php#L18)
- [await_readable](./../../src/Psl/Async/await_readable.php#L18)
Expand All @@ -21,7 +21,7 @@
- [concurrently](./../../src/Psl/Async/concurrently.php#L19)
- [first](./../../src/Psl/Async/first.php#L24)
- [later](./../../src/Psl/Async/later.php#L14)
- [run](./../../src/Psl/Async/run.php#L19)
- [run](./../../src/Psl/Async/run.php#L20)
- [usleep](./../../src/Psl/Async/usleep.php#L10)

#### `Classes`
Expand Down
20 changes: 18 additions & 2 deletions src/Psl/Async/all.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

namespace Psl\Async;

use Throwable;

/**
* Awaits all awaitables to complete or aborts if any errors concurrently.
* Awaits all awaitables to complete concurrently.
*
* If one or more awaitables fail, the first exception will be thrown.
*
* This function will wait for all awaitables to finish, even if the first one fails.
*
* @template Tk of array-key
* @template Tv
Expand All @@ -17,10 +23,20 @@
function all(iterable $awaitables): array
{
$values = [];
$errors = [];

// Awaitable::iterate() to throw the first error based on completion order instead of argument order
foreach (Awaitable::iterate($awaitables) as $index => $awaitable) {
$values[$index] = $awaitable->await();
try {
$values[$index] = $awaitable->await();
} catch (Throwable $throwable) {
$errors[] = $throwable;
}
}

if ($errors !== []) {
/** @psalm-suppress MissingThrowsDocblock */
throw $errors[0];
}

return $values;
Expand Down
13 changes: 6 additions & 7 deletions src/Psl/Async/await_readable.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ function await_readable(mixed $resource, bool $reference = true, ?int $timeout_m

$watcher = EventLoop::onReadable(
$resource,
static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($resource);
},
static fn() => $suspension->resume($resource),
);

if (!$reference) {
Expand All @@ -44,5 +38,10 @@ static function (string $_watcher, mixed $resource) use ($suspension, $timeout_w
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
13 changes: 6 additions & 7 deletions src/Psl/Async/await_signal.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ function await_signal(int $signal, bool $reference = true, ?int $timeout_ms = nu

$watcher = EventLoop::onSignal(
$signal,
static function (string $_watcher, int $signal) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($signal);
},
static fn() => $suspension->resume($signal),
);

if (!$reference) {
Expand All @@ -44,5 +38,10 @@ static function (string $_watcher, int $signal) use ($suspension, $timeout_watch
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
14 changes: 6 additions & 8 deletions src/Psl/Async/await_writable.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,21 @@ function await_writable(mixed $resource, bool $reference = true, ?int $timeout_m

$watcher = EventLoop::onWritable(
$resource,
static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($resource);
},
static fn() => $suspension->resume($resource),
);

if (!$reference) {
Scheduler::unreference($watcher);
}


try {
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
46 changes: 32 additions & 14 deletions src/Psl/Async/run.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Psl\Async;

use Psl;
use Psl\Async\Exception\TimeoutException;
use Throwable;

Expand All @@ -20,33 +21,50 @@ function run(callable $callable, ?int $timeout_ms = null): Awaitable
{
$state = new Internal\State();

$timeout_watcher = null;
/** @var Psl\Ref<string|null> $timeout_watcher */
$timeout_watcher = new Psl\Ref(null);
/** @var Psl\Ref<string|null> $delay_watcher */
$delay_watcher = new Psl\Ref(null);

if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static function () use ($state): void {
$timeout_watcher->value = Scheduler::delay($timeout_ms, static function () use ($state, $delay_watcher, $timeout_watcher): void {
if (null !== $delay_watcher->value) {
$delay_watcher_value = $delay_watcher->value;
$delay_watcher->value = null;
Scheduler::cancel($delay_watcher_value);
}

$timeout_watcher->value = null;
$state->error(new TimeoutException());
});

Scheduler::unreference($timeout_watcher);
Scheduler::unreference($timeout_watcher->value);
}

Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void {
$delay_watcher->value = Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void {
$exception = null;
$result = null;
try {
$result = $callable();
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
} catch (Throwable $throwable) {
$exception = $throwable;
}

if ($state->isComplete()) {
// timed-out.
return;
}
if (null !== $timeout_watcher->value) {
$timeout_watcher_value = $timeout_watcher->value;
$timeout_watcher->value = null;
Scheduler::cancel($timeout_watcher_value);
} elseif ($state->isComplete()) {
// timeout has been reached.
return;
}

if (null !== $exception) {
$state->error($exception);
} else {
$state->complete($result);
} catch (Throwable $throwable) {
$state->error($throwable);
}
});


return new Awaitable($state);
}
110 changes: 110 additions & 0 deletions tests/unit/Async/AllTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
<?php

declare(strict_types=1);

namespace Psl\Tests\Unit\Async;

use PHPUnit\Framework\TestCase;
use Psl;
use Psl\Async;
use Psl\Exception\InvariantViolationException;

final class AllTest extends TestCase
{
public function testAll(): void
{
$awaitables = [
'a' => Async\run(static function (): string {
Async\usleep(100);

return 'a';
}),
'b' => Async\run(static function (): string {
Async\usleep(100);

return 'b';
}),
'c' => Async\run(static function (): string {
Async\usleep(100);

return 'c';
}),
];

$results = Async\all($awaitables);

static::assertSame(['a' => 'a', 'b' => 'b', 'c' => 'c'], $results);
}

public function testAllForwardsException(): void
{
$this->expectException(InvariantViolationException::class);
$this->expectExceptionMessage('a');

Async\all([
Async\run(static function (): string {
Async\usleep(100);

throw new InvariantViolationException('a');
}),
Async\run(static function (): string {
Async\usleep(200);

throw new InvariantViolationException('b');
}),
Async\run(static function (): string {
Async\usleep(300);

return 'c';
}),
Async\run(static function (): string {
Async\usleep(50);

Async\later();

Async\usleep(50);

return 'c';
}),
]);
}

public function testAllAwaitablesAreCompleted(): void
{
$ref = new Psl\Ref('');

try {
Async\all([
Async\run(static function () use ($ref): void {
$ref->value .= 'a';

throw new InvariantViolationException('a');
}),
Async\run(static function () use ($ref): void {
Async\usleep(2000);

$ref->value .= 'b';

throw new InvariantViolationException('b');
}),
Async\run(static function () use ($ref): void {
Async\usleep(3000);

$ref->value .= 'c';
}),
Async\run(static function () use ($ref): void {
Async\usleep(500);

Async\later();

Async\usleep(500);

$ref->value .= 'd';
}),
]);
} catch (InvariantViolationException) {
}

static::assertSame('adbc', $ref->value);
}
}