Skip to content

Commit

Permalink
fix(async): always cancel timeout watcher after suspension is finished.
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Nov 4, 2021
1 parent 35477a6 commit 881ae0a
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 38 deletions.
20 changes: 18 additions & 2 deletions src/Psl/Async/all.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@

namespace Psl\Async;

use Throwable;

/**
* Awaits all awaitables to complete or aborts if any errors concurrently.
* Awaits all awaitables to complete concurrently.
*
* If one or more awaitables fail, the first exception will be thrown.
*
* This function will wait for all awaitables to finish, even if the first one fails.
*
* @template Tk of array-key
* @template Tv
Expand All @@ -17,10 +23,20 @@
function all(iterable $awaitables): array
{
$values = [];
$errors = [];

// Awaitable::iterate() to throw the first error based on completion order instead of argument order
foreach (Awaitable::iterate($awaitables) as $index => $awaitable) {
$values[$index] = $awaitable->await();
try {
$values[$index] = $awaitable->await();
} catch (Throwable $throwable) {
$errors[] = $throwable;
}
}

if ($errors !== []) {
/** @psalm-suppress MissingThrowsDocblock */
throw $errors[0];
}

return $values;
Expand Down
13 changes: 6 additions & 7 deletions src/Psl/Async/await_readable.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ function await_readable(mixed $resource, bool $reference = true, ?int $timeout_m

$watcher = EventLoop::onReadable(
$resource,
static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($resource);
},
static fn() => $suspension->resume($resource),
);

if (!$reference) {
Expand All @@ -44,5 +38,10 @@ static function (string $_watcher, mixed $resource) use ($suspension, $timeout_w
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
13 changes: 6 additions & 7 deletions src/Psl/Async/await_signal.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,7 @@ function await_signal(int $signal, bool $reference = true, ?int $timeout_ms = nu

$watcher = EventLoop::onSignal(
$signal,
static function (string $_watcher, int $signal) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($signal);
},
static fn() => $suspension->resume($signal),
);

if (!$reference) {
Expand All @@ -44,5 +38,10 @@ static function (string $_watcher, int $signal) use ($suspension, $timeout_watch
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
14 changes: 6 additions & 8 deletions src/Psl/Async/await_writable.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,23 +27,21 @@ function await_writable(mixed $resource, bool $reference = true, ?int $timeout_m

$watcher = EventLoop::onWritable(
$resource,
static function (string $_watcher, mixed $resource) use ($suspension, $timeout_watcher): void {
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}

$suspension->resume($resource);
},
static fn() => $suspension->resume($resource),
);

if (!$reference) {
Scheduler::unreference($watcher);
}


try {
$suspension->suspend();
} finally {
Scheduler::cancel($watcher);

// cancel timeout watcher
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
}
}
44 changes: 30 additions & 14 deletions src/Psl/Async/run.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Psl\Async;

use Psl;
use Psl\Async\Exception\TimeoutException;
use Throwable;

Expand All @@ -20,33 +21,48 @@ function run(callable $callable, ?int $timeout_ms = null): Awaitable
{
$state = new Internal\State();

$timeout_watcher = null;
$timeout_watcher = new Psl\Ref(null);
$delay_watcher = new Psl\Ref(null);

if (null !== $timeout_ms) {
$timeout_watcher = Scheduler::delay($timeout_ms, static function () use ($state): void {
$timeout_watcher->value = Scheduler::delay($timeout_ms, static function () use ($state, $delay_watcher, $timeout_watcher): void {
if (null !== $delay_watcher->value) {
$delay_watcher_value = $delay_watcher->value;
$delay_watcher->value = null;
Scheduler::cancel($delay_watcher_value);
}

$timeout_watcher->value = null;
$state->error(new TimeoutException());
});

Scheduler::unreference($timeout_watcher);
Scheduler::unreference($timeout_watcher->value);
}

Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void {
$delay_watcher->value = Scheduler::defer(static function () use ($callable, $state, $timeout_watcher): void {
$exception = null;
$result = null;
try {
$result = $callable();
if (null !== $timeout_watcher) {
Scheduler::cancel($timeout_watcher);
}
} catch (Throwable $throwable) {
$exception = $throwable;
}

if ($state->isComplete()) {
// timed-out.
return;
}
if (null !== $timeout_watcher->value) {
$timeout_watcher_value = $timeout_watcher->value;
$timeout_watcher->value = null;
Scheduler::cancel($timeout_watcher_value);
} elseif ($state->isComplete()) {
// timeout has been reached.
return;
}

if (null !== $exception) {
$state->error($exception);
} else {
$state->complete($result);
} catch (Throwable $throwable) {
$state->error($throwable);
}
});


return new Awaitable($state);
}

0 comments on commit 881ae0a

Please sign in to comment.