From 24d93a8cd3ed60b2c36860dd3bfc55fc2357897e Mon Sep 17 00:00:00 2001 From: azjezz Date: Wed, 10 Nov 2021 08:34:20 +0100 Subject: [PATCH] chore(io/tcp/unix): improve performance, and closed handle/server condition Signed-off-by: azjezz --- docs/component/tcp.md | 2 +- docs/component/unix.md | 2 +- examples/tcp/basic-http-server.php | 49 ++++++--- src/Psl/IO/Internal/ResourceHandle.php | 115 ++++++++++++++++----- src/Psl/Internal/Loader.php | 1 - src/Psl/Network/Internal/socket_accept.php | 50 --------- src/Psl/TCP/Server.php | 97 +++++++++++++---- src/Psl/Unix/Server.php | 84 ++++++++++++--- tests/unit/IO/PipeTest.php | 39 +++++++ 9 files changed, 313 insertions(+), 126 deletions(-) delete mode 100644 src/Psl/Network/Internal/socket_accept.php diff --git a/docs/component/tcp.md b/docs/component/tcp.md index b751f315b..cea677a0b 100644 --- a/docs/component/tcp.md +++ b/docs/component/tcp.md @@ -21,7 +21,7 @@ #### `Classes` - [ConnectOptions](./../../src/Psl/TCP/ConnectOptions.php#L7) -- [Server](./../../src/Psl/TCP/Server.php#L14) +- [Server](./../../src/Psl/TCP/Server.php#L17) - [ServerOptions](./../../src/Psl/TCP/ServerOptions.php#L9) diff --git a/docs/component/unix.md b/docs/component/unix.md index 765ad6817..d300e6a07 100644 --- a/docs/component/unix.md +++ b/docs/component/unix.md @@ -20,6 +20,6 @@ #### `Classes` -- [Server](./../../src/Psl/Unix/Server.php#L12) +- [Server](./../../src/Psl/Unix/Server.php#L15) diff --git a/examples/tcp/basic-http-server.php b/examples/tcp/basic-http-server.php index cc6b523d5..b60cb8b3f 100644 --- a/examples/tcp/basic-http-server.php +++ b/examples/tcp/basic-http-server.php @@ -6,29 +6,52 @@ use Psl\Async; use Psl\IO; +use Psl\Network\Exception\AlreadyStoppedException; use Psl\Str; use Psl\TCP; +use Throwable; require __DIR__ . '/../../vendor/autoload.php'; -Async\main(static function (): never { - $server = TCP\Server::create('localhost'); +Async\main(static function (): int { + $output = IO\output_handle(); + $server = TCP\Server::create('localhost', 3030); - IO\output_handle()->writeAll("Server is listening on http://localhost:" . $server->getLocalAddress()->port . "\n"); + $output->writeAll("Server is listening on http://localhost:3030\n"); - while (true) { - $connection = $server->nextConnection(); - $request = $connection->read(); + Async\Scheduler::defer(static function () use ($server, $output) { + Async\await_signal(SIGINT); - IO\output_handle()->writeAll(Str\split($request, "\n")[0] . "\n"); + $output->writeAll("\nGoodbye 👋\n"); - $connection->writeAll("HTTP/1.1 200 OK\n"); - $connection->writeAll("Server: PSL\n"); - $connection->writeAll("Connection: close\n"); - $connection->writeAll("Content-Type: text/plain\n\n"); + $server->stopListening(); + }); - $connection->writeAll("Hello, World!"); + try { + while (true) { + $connection = $server->nextConnection(); - $connection->close(); + Async\Scheduler::defer(static function() use ($connection, $output) { + try { + $request = $connection->read(); + + $output->writeAll("[" . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . "MiB] " . Str\split($request, "\n")[0] . "\n"); + + $connection->writeAll("HTTP/1.1 200 OK\n"); + + $connection->writeAll("Server: PSL\n"); + $connection->writeAll("Connection: close\n"); + $connection->writeAll("Content-Type: text/plain\n\n"); + + $connection->writeAll("Hello, World!"); + $connection->close(); + } catch (Throwable) { + } + }); + } + } catch (AlreadyStoppedException) { + // server stopped. } + + return 0; }); diff --git a/src/Psl/IO/Internal/ResourceHandle.php b/src/Psl/IO/Internal/ResourceHandle.php index 8bab16712..43b790fda 100644 --- a/src/Psl/IO/Internal/ResourceHandle.php +++ b/src/Psl/IO/Internal/ResourceHandle.php @@ -5,11 +5,12 @@ namespace Psl\IO\Internal; use Psl; -use Psl\Async; use Psl\Exception\InvariantViolationException; use Psl\IO; use Psl\IO\Exception; use Psl\Type; +use Revolt\EventLoop; +use Revolt\EventLoop\Suspension; use function error_get_last; use function fclose; @@ -37,7 +38,7 @@ class ResourceHandle implements IO\CloseSeekReadWriteHandleInterface use IO\ReadHandleConvenienceMethodsTrait; use IO\WriteHandleConvenienceMethodsTrait; - public const DEFAULT_READ_BUFFER_SIZE = 65536; + public const DEFAULT_READ_BUFFER_SIZE = 4096; public const MAXIMUM_READ_BUFFER_SIZE = 786432; /** @@ -49,6 +50,12 @@ class ResourceHandle implements IO\CloseSeekReadWriteHandleInterface private bool $blocks; + private string $readWatcher = ''; + private string $writeWatcher = ''; + + private ?Suspension $readSuspension = null; + private ?Suspension $writeSuspension = null; + /** * @param resource|object $resource * @@ -75,7 +82,7 @@ public function __construct(mixed $resource, bool $read, bool $write, bool $seek $meta = stream_get_meta_data($resource); $this->blocks = ($meta['wrapper_type'] ?? '') === 'plainfile'; if ($seek) { - $seekable = (bool) $meta['seekable']; + $seekable = (bool)$meta['seekable']; Psl\invariant($seekable, 'Handle is not seekable.'); } @@ -84,6 +91,14 @@ public function __construct(mixed $resource, bool $read, bool $write, bool $seek $readable = str_contains($meta['mode'], 'r') || str_contains($meta['mode'], '+'); Psl\invariant($readable, 'Handle is not readable.'); + + $suspension = &$this->readSuspension; + $this->readWatcher = EventLoop::onReadable($resource, static function () use (&$suspension) { + /** @var Suspension|null $suspension */ + $suspension?->resume(null); + }); + + EventLoop::disable($this->readWatcher); } if ($write) { @@ -91,10 +106,17 @@ public function __construct(mixed $resource, bool $read, bool $write, bool $seek || str_contains($meta['mode'], 'w') || str_contains($meta['mode'], 'c') || str_contains($meta['mode'], 'a') - || str_contains($meta['mode'], '+') - ; + || str_contains($meta['mode'], '+'); Psl\invariant($writable, 'Handle is not writeable.'); + + $suspension = &$this->writeSuspension; + $this->writeWatcher = EventLoop::onWritable($resource, static function () use (&$suspension) { + /** @var Suspension|null $suspension */ + $suspension?->resume(null); + }); + + EventLoop::disable($this->writeWatcher); } $this->useSingleRead = $meta["stream_type"] === "udp_socket" || $meta["stream_type"] === "STDIO"; @@ -119,18 +141,32 @@ public function write(string $bytes, ?float $timeout = null): int $bytes = substr($bytes, $written); - try { - /** - * @psalm-suppress PossiblyInvalidArgument - * @psalm-suppress PossiblyNullArgument - If null, writeImmediately would have thrown. - */ - Async\await_writable($this->resource, timeout: $timeout); - } catch (Async\Exception\TimeoutException) { - throw new Exception\TimeoutException('reached timeout while the handle is still not writable.'); - } catch (Async\Exception\ResourceClosedException) { - $this->resource = null; + $this->writeSuspension = EventLoop::createSuspension(); + $suspension = &$this->writeSuspension; + /** @psalm-suppress MissingThrowsDocblock */ + EventLoop::enable($this->writeWatcher); + $delay_watcher = null; + if (null !== $timeout) { + $delay_watcher = EventLoop::delay( + $timeout, + static function () use (&$suspension) { + /** @var Suspension|null $suspension */ + $suspension?->throw( + new Exception\TimeoutException('reached timeout while the handle is still not writable.') + ); + } + ); + } - throw new Exception\AlreadyClosedException('Handle has already been closed.'); + try { + /** @var Suspension $suspension */ + $suspension->suspend(); + } finally { + $suspension = null; + EventLoop::disable($this->writeWatcher); + if (null !== $delay_watcher) { + EventLoop::cancel($delay_watcher); + } } return $written + $this->writeImmediately($bytes); @@ -144,6 +180,10 @@ public function write(string $bytes, ?float $timeout = null): int */ public function writeImmediately(string $bytes): int { + if (null !== $this->writeSuspension) { + throw new Exception\RuntimeException('Pending operation.'); + } + if (null === $this->resource) { throw new Exception\AlreadyClosedException('Handle has already been closed.'); } @@ -217,18 +257,32 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string return $chunk; } - try { - /** - * @psalm-suppress PossiblyInvalidArgument - * @psalm-suppress PossiblyNullArgument - If null, writeImmediately would have thrown. - */ - Async\await_readable($this->resource, timeout: $timeout); - } catch (Async\Exception\TimeoutException) { - throw new Exception\TimeoutException('reached timeout while the handle is still not readable.'); - } catch (Async\Exception\ResourceClosedException) { - $this->resource = null; + $this->readSuspension = EventLoop::createSuspension(); + $suspension = &$this->readSuspension; + /** @psalm-suppress MissingThrowsDocblock */ + EventLoop::enable($this->readWatcher); + $delay_watcher = null; + if (null !== $timeout) { + $delay_watcher = EventLoop::delay( + $timeout, + static function () use (&$suspension) { + /** @var Suspension|null $suspension */ + $suspension?->throw( + new Exception\TimeoutException('reached timeout while the handle is still not readable.') + ); + } + ); + } - throw new Exception\AlreadyClosedException('Handle has already been closed.'); + try { + /** @var Suspension $suspension */ + $suspension->suspend(); + } finally { + $suspension = null; + EventLoop::disable($this->readWatcher); + if (null !== $delay_watcher) { + EventLoop::cancel($delay_watcher); + } } return $this->readImmediately($max_bytes); @@ -241,6 +295,10 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string */ public function readImmediately(?int $max_bytes = null): string { + if (null !== $this->readSuspension) { + throw new Exception\RuntimeException('Pending operation.'); + } + if (null === $this->resource) { throw new Exception\AlreadyClosedException('Handle has already been closed.'); } @@ -285,5 +343,8 @@ public function close(): void throw new Exception\RuntimeException($error['message'] ?? 'unknown error.'); } + + $this->readSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.')); + $this->writeSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.')); } } diff --git a/src/Psl/Internal/Loader.php b/src/Psl/Internal/Loader.php index d76c5a6f6..4e2d49a41 100644 --- a/src/Psl/Internal/Loader.php +++ b/src/Psl/Internal/Loader.php @@ -474,7 +474,6 @@ final class Loader 'Psl\Network\Internal\get_sock_name', 'Psl\Network\Internal\socket_connect', 'Psl\Network\Internal\server_listen', - 'Psl\Network\Internal\socket_accept', 'Psl\TCP\connect', 'Psl\Unix\connect', ]; diff --git a/src/Psl/Network/Internal/socket_accept.php b/src/Psl/Network/Internal/socket_accept.php deleted file mode 100644 index 18093b209..000000000 --- a/src/Psl/Network/Internal/socket_accept.php +++ /dev/null @@ -1,50 +0,0 @@ -impl = $impl; + $suspension = &$this->suspension; + $this->watcher = EventLoop::onReadable( + $this->impl, + /** + * @param resource|object $resource + */ + static function (string $_watcher, mixed $resource) use (&$suspension): void { + /** + * @var resource $resource + */ + $sock = @stream_socket_accept($resource, timeout: 0.0); + /** @var \Revolt\EventLoop\Suspension|null $tmp */ + $tmp = $suspension; + $suspension = null; + if ($sock !== false) { + $tmp?->resume($sock); + + return; + } + + /** @var array{file: string, line: int, message: string, type: int} $err */ + $err = error_get_last(); + $tmp?->throw(new Network\Exception\RuntimeException('Failed to accept incoming connection: ' . $err['message'], $err['type'])); + }, + ); + EventLoop::disable($this->watcher); } /** @@ -30,9 +66,9 @@ private function __construct( * @throws Psl\Network\Exception\RuntimeException In case failed to listen to on given address. */ public static function create( - string $host, - int $port = 0, - ?ServerOptions $options = null, + string $host, + int $port = 0, + ?ServerOptions $options = null, ): self { $server_options = $options ?? ServerOptions::create(); $socket_options = $server_options->socketOptions; @@ -56,22 +92,33 @@ public static function create( */ public function nextConnection(): SocketInterface { + if (null !== $this->suspension) { + throw new Network\Exception\RuntimeException('Pending operation.'); + } + if (null === $this->impl) { throw new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.'); } - // @codeCoverageIgnoreStart - try { + $sock = @stream_socket_accept($this->impl, timeout: 0.0); + if ($sock !== false) { /** @psalm-suppress MissingThrowsDocblock */ - return new Internal\Socket( - Network\Internal\socket_accept($this->impl) - ); - } catch (Network\Exception\AlreadyStoppedException $exception) { - $this->impl = null; + return new Internal\Socket($sock); + } + + $this->suspension = $suspension = EventLoop::createSuspension(); + /** @psalm-suppress MissingThrowsDocblock */ + EventLoop::enable($this->watcher); - throw $exception; + try { + /** @var resource $socket */ + $socket = $suspension->suspend(); + } finally { + EventLoop::disable($this->watcher); } - // @codeCoverageIgnoreEnd + + /** @psalm-suppress MissingThrowsDocblock */ + return new Internal\Socket($socket); } /** @@ -86,6 +133,11 @@ public function getLocalAddress(): Network\Address return Network\Internal\get_sock_name($this->impl); } + public function __destruct() + { + $this->stopListening(); + } + /** * {@inheritDoc} */ @@ -95,13 +147,18 @@ public function stopListening(): void return; } + $suspension = null; + if (null !== $this->watcher) { + EventLoop::cancel($this->watcher); + $suspension = $this->suspension; + $this->suspension = null; + } + $resource = $this->impl; $this->impl = null; + /** @psalm-suppress PossiblyNullArgument */ fclose($resource); - } - public function __destruct() - { - $this->stopListening(); + $suspension?->throw(new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.')); } } diff --git a/src/Psl/Unix/Server.php b/src/Psl/Unix/Server.php index f63304a07..f30cf40fe 100644 --- a/src/Psl/Unix/Server.php +++ b/src/Psl/Unix/Server.php @@ -6,17 +6,54 @@ use Psl; use Psl\Network; +use Revolt\EventLoop; +use function error_get_last; use function fclose; +use function stream_socket_accept; final class Server implements Network\ServerInterface { /** - * @param resource|null $impl + * @var resource|null $impl */ - private function __construct( - private mixed $impl - ) { + private mixed $impl; + private ?EventLoop\Suspension $suspension = null; + private string $watcher; + + /** + * @param resource $impl + */ + private function __construct(mixed $impl) + { + $this->impl = $impl; + $suspension = &$this->suspension; + $this->watcher = EventLoop::onReadable( + $this->impl, + /** + * @param resource|object $resource + */ + static function (string $_watcher, mixed $resource) use (&$suspension): void { + /** + * @var resource $resource + */ + $sock = @stream_socket_accept($resource, timeout: 0.0); + /** @var \Revolt\EventLoop\Suspension|null $tmp */ + $tmp = $suspension; + $suspension = null; + if ($sock !== false) { + $tmp?->resume($sock); + + return; + } + + + /** @var array{file: string, line: int, message: string, type: int} $err */ + $err = error_get_last(); + $tmp?->throw(new Network\Exception\RuntimeException('Failed to accept incoming connection: ' . $err['message'], $err['type'])); + }, + ); + EventLoop::disable($this->watcher); } /** @@ -38,22 +75,33 @@ public static function create(string $file): self */ public function nextConnection(): SocketInterface { + if (null !== $this->suspension) { + throw new Network\Exception\RuntimeException('Pending operation.'); + } + if (null === $this->impl) { throw new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.'); } - // @codeCoverageIgnoreStart - try { + $sock = @stream_socket_accept($this->impl, timeout: 0.0); + if ($sock !== false) { /** @psalm-suppress MissingThrowsDocblock */ - return new Internal\Socket( - Network\Internal\socket_accept($this->impl) - ); - } catch (Network\Exception\AlreadyStoppedException $exception) { - $this->impl = null; + return new Internal\Socket($sock); + } - throw $exception; + $this->suspension = $suspension = EventLoop::createSuspension(); + /** @psalm-suppress MissingThrowsDocblock */ + EventLoop::enable($this->watcher); + + try { + /** @var resource $socket */ + $socket = $suspension->suspend(); + } finally { + EventLoop::disable($this->watcher); } - // @codeCoverageIgnoreEnd + + /** @psalm-suppress MissingThrowsDocblock */ + return new Internal\Socket($socket); } /** @@ -77,9 +125,19 @@ public function stopListening(): void return; } + $suspension = null; + if (null !== $this->watcher) { + EventLoop::cancel($this->watcher); + $suspension = $this->suspension; + $this->suspension = null; + } + $resource = $this->impl; $this->impl = null; + /** @psalm-suppress PossiblyNullArgument */ fclose($resource); + + $suspension?->throw(new Network\Exception\AlreadyStoppedException('Server socket has already been stopped.')); } public function __destruct() diff --git a/tests/unit/IO/PipeTest.php b/tests/unit/IO/PipeTest.php index fa11bd98d..9774e454b 100644 --- a/tests/unit/IO/PipeTest.php +++ b/tests/unit/IO/PipeTest.php @@ -98,4 +98,43 @@ public function testReadAllTimedOut(): void $read->readAll(timeout: 0.001); } + + public function testReadOnAlreadyClosedPipe(): void + { + [$read, $_write] = IO\pipe(); + + Async\Scheduler::defer(static fn() => $read->close()); + $b = Async\run(static fn() => $read->readAll()); + + $this->expectException(IO\Exception\AlreadyClosedException::class); + $this->expectExceptionMessage('Handle has already been closed.'); + + // > $b starts + // > while waiting for $read to become readable, switch to defer + // > defer calls close() + // > close() throws in read suspension + // > $b fails with the already closed exception. + + $b->await(); + } + + public function testWriteOnAlreadyClosedPipe(): void + { + [$_read, $write] = IO\pipe(); + + Async\Scheduler::defer(static fn() => $write->close()); + + $b = Async\run(static fn() => $write->writeAll('hello')); + + $this->expectException(IO\Exception\AlreadyClosedException::class); + $this->expectExceptionMessage('Handle has already been closed.'); + + // > $b starts + // > while waiting for $write to become writable, switch to defer + // > defer calls close() + // > close() throws in read suspension + // > $b fails with the already closed exception. + + $b->await(); + } }