Skip to content

Commit

Permalink
chore(io): simplify write/read queue implementation
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Nov 16, 2021
1 parent 0bcbee7 commit 34b3ac5
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 100 deletions.
4 changes: 3 additions & 1 deletion src/Psl/Async/Awaitable.php
Expand Up @@ -201,8 +201,10 @@ static function (?Throwable $error, mixed $value) use ($suspension): void {
/**
* Do not forward unhandled errors to the event loop handler.
*/
public function ignore(): void
public function ignore(): self
{
$this->state->ignore();

return $this;
}
}
174 changes: 75 additions & 99 deletions src/Psl/IO/Internal/ResourceHandle.php
Expand Up @@ -6,13 +6,11 @@

use Psl;
use Psl\Async;
use Psl\Async\Awaitable;
use Psl\Exception\InvariantViolationException;
use Psl\IO;
use Psl\IO\Exception;
use Psl\Type;
use Revolt\EventLoop;
use Revolt\EventLoop\Suspension;

use function error_get_last;
use function fclose;
Expand Down Expand Up @@ -56,11 +54,8 @@ class ResourceHandle implements IO\Stream\CloseSeekReadWriteHandleInterface
private string $readWatcher = '';
private string $writeWatcher = '';

private ?Suspension $readSuspension = null;
private ?Suspension $writeSuspension = null;

private ?Awaitable $readAwaitable = null;
private ?Awaitable $writeAwaitable = null;
private ?Async\Deferred $readDeferred = null;
private ?Async\Deferred $writeDeferred = null;

/**
* @param resource|object $stream
Expand Down Expand Up @@ -89,10 +84,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek)

Psl\invariant($readable, 'Handle is not readable.');

$suspension = &$this->readSuspension;
$this->readWatcher = EventLoop::onReadable($stream, static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->resume(null);
$deferred = &$this->readDeferred;
$this->readWatcher = EventLoop::onReadable($stream, static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->complete(null);
});

EventLoop::disable($this->readWatcher);
Expand All @@ -107,10 +102,10 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek)

Psl\invariant($writable, 'Handle is not writeable.');

$suspension = &$this->writeSuspension;
$this->writeWatcher = EventLoop::onWritable($stream, static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->resume(null);
$deferred = &$this->writeDeferred;
$this->writeWatcher = EventLoop::onWritable($stream, static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->complete(null);
});

EventLoop::disable($this->writeWatcher);
Expand Down Expand Up @@ -138,38 +133,33 @@ public function write(string $bytes, ?float $timeout = null): int

$bytes = substr($bytes, $written);

$this->writeAwaitable = $awaitable = Async\run(function () use ($timeout): void {
$this->writeSuspension = EventLoop::createSuspension();
$suspension = &$this->writeSuspension;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->writeWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->throw(
new Exception\TimeoutException('reached timeout while the handle is still not writable.')
);
}
);
}

try {
/** @var Suspension $suspension */
$suspension->suspend();
} finally {
$suspension = null;
EventLoop::disable($this->writeWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
$this->writeDeferred = new Async\Deferred();
$deferred = &$this->writeDeferred;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->writeWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->error(
new Exception\TimeoutException('reached timeout while the handle is still not writable.')
);
}
}
});
);
}

$awaitable->await();
$this->writeAwaitable = null;
try {
/** @var Async\Deferred $deferred */
$deferred->getAwaitable()->await();
} finally {
$deferred = null;
EventLoop::disable($this->writeWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
}
}

return $written + $this->writeImmediately($bytes);
}
Expand All @@ -182,16 +172,12 @@ static function () use (&$suspension) {
*/
public function writeImmediately(string $bytes): int
{
if (null !== $this->writeAwaitable) {
// there's a pending write operation, wait for it first.
$awaitable = $this->writeAwaitable->then(
static fn() => null,
static fn() => null,
);

$awaitable->ignore();
$awaitable->await();
}
// there's a pending write operation, wait for it first.
$this->writeDeferred
?->getAwaitable()
->then(static fn() => null, static fn() => null)
->ignore()
->await();

if (!is_resource($this->stream)) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
Expand Down Expand Up @@ -266,38 +252,33 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string
return $chunk;
}

$this->readAwaitable = $awaitable = Async\run(function () use ($timeout): void {
$this->readSuspension = EventLoop::createSuspension();
$suspension = &$this->readSuspension;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->readWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->throw(
new Exception\TimeoutException('reached timeout while the handle is still not readable.')
);
}
);
}

try {
/** @var Suspension $suspension */
$suspension->suspend();
} finally {
$suspension = null;
EventLoop::disable($this->readWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
$this->readDeferred = new Async\Deferred();
$deferred = &$this->readDeferred;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->readWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->error(
new Exception\TimeoutException('reached timeout while the handle is still not readable.')
);
}
}
});
);
}

$awaitable->await();
$this->readAwaitable = null;
try {
/** @var Async\Deferred $deferred */
$deferred->getAwaitable()->await();
} finally {
$deferred = null;
EventLoop::disable($this->readWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
}
}

return $this->readImmediately($max_bytes);
}
Expand All @@ -309,16 +290,13 @@ static function () use (&$suspension) {
*/
public function readImmediately(?int $max_bytes = null): string
{
if (null !== $this->readAwaitable) {
// there's a pending read operation, wait for it.
$awaitable = $this->readAwaitable->then(
static fn() => null,
static fn() => null,
);

$awaitable->ignore();
$awaitable->await();
}
// there's a pending read operation, wait for it.
$this->readDeferred
?->getAwaitable()
->then(static fn() => null, static fn() => null)
->ignore()
->await()
;

if (!is_resource($this->stream)) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
Expand Down Expand Up @@ -366,10 +344,8 @@ public function close(): void
$this->stream = null;
}

$this->readAwaitable = null;
$this->readSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
$this->writeAwaitable = null;
$this->writeSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
$this->readDeferred?->error(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
$this->writeDeferred?->error(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
}

/**
Expand Down

0 comments on commit 34b3ac5

Please sign in to comment.