Skip to content

Commit

Permalink
chore(io/tcp/unix): improve performance, and closed handle/server con…
Browse files Browse the repository at this point in the history
…dition

Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Nov 10, 2021
1 parent 449b1bd commit 24d93a8
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 126 deletions.
2 changes: 1 addition & 1 deletion docs/component/tcp.md
Expand Up @@ -21,7 +21,7 @@
#### `Classes`

- [ConnectOptions](./../../src/Psl/TCP/ConnectOptions.php#L7)
- [Server](./../../src/Psl/TCP/Server.php#L14)
- [Server](./../../src/Psl/TCP/Server.php#L17)
- [ServerOptions](./../../src/Psl/TCP/ServerOptions.php#L9)


2 changes: 1 addition & 1 deletion docs/component/unix.md
Expand Up @@ -20,6 +20,6 @@

#### `Classes`

- [Server](./../../src/Psl/Unix/Server.php#L12)
- [Server](./../../src/Psl/Unix/Server.php#L15)


49 changes: 36 additions & 13 deletions examples/tcp/basic-http-server.php
Expand Up @@ -6,29 +6,52 @@

use Psl\Async;
use Psl\IO;
use Psl\Network\Exception\AlreadyStoppedException;
use Psl\Str;
use Psl\TCP;
use Throwable;

require __DIR__ . '/../../vendor/autoload.php';

Async\main(static function (): never {
$server = TCP\Server::create('localhost');
Async\main(static function (): int {
$output = IO\output_handle();
$server = TCP\Server::create('localhost', 3030);

IO\output_handle()->writeAll("Server is listening on http://localhost:" . $server->getLocalAddress()->port . "\n");
$output->writeAll("Server is listening on http://localhost:3030\n");

while (true) {
$connection = $server->nextConnection();
$request = $connection->read();
Async\Scheduler::defer(static function () use ($server, $output) {
Async\await_signal(SIGINT);

IO\output_handle()->writeAll(Str\split($request, "\n")[0] . "\n");
$output->writeAll("\nGoodbye 👋\n");

$connection->writeAll("HTTP/1.1 200 OK\n");
$connection->writeAll("Server: PSL\n");
$connection->writeAll("Connection: close\n");
$connection->writeAll("Content-Type: text/plain\n\n");
$server->stopListening();
});

$connection->writeAll("Hello, World!");
try {
while (true) {
$connection = $server->nextConnection();

$connection->close();
Async\Scheduler::defer(static function() use ($connection, $output) {
try {
$request = $connection->read();

$output->writeAll("[" . round(memory_get_peak_usage(true) / 1024 / 1024, 1) . "MiB] " . Str\split($request, "\n")[0] . "\n");

$connection->writeAll("HTTP/1.1 200 OK\n");

$connection->writeAll("Server: PSL\n");
$connection->writeAll("Connection: close\n");
$connection->writeAll("Content-Type: text/plain\n\n");

$connection->writeAll("Hello, World!");
$connection->close();
} catch (Throwable) {
}
});
}
} catch (AlreadyStoppedException) {
// server stopped.
}

return 0;
});
115 changes: 88 additions & 27 deletions src/Psl/IO/Internal/ResourceHandle.php
Expand Up @@ -5,11 +5,12 @@
namespace Psl\IO\Internal;

use Psl;
use Psl\Async;
use Psl\Exception\InvariantViolationException;
use Psl\IO;
use Psl\IO\Exception;
use Psl\Type;
use Revolt\EventLoop;
use Revolt\EventLoop\Suspension;

use function error_get_last;
use function fclose;
Expand Down Expand Up @@ -37,7 +38,7 @@ class ResourceHandle implements IO\CloseSeekReadWriteHandleInterface
use IO\ReadHandleConvenienceMethodsTrait;
use IO\WriteHandleConvenienceMethodsTrait;

public const DEFAULT_READ_BUFFER_SIZE = 65536;
public const DEFAULT_READ_BUFFER_SIZE = 4096;
public const MAXIMUM_READ_BUFFER_SIZE = 786432;

/**
Expand All @@ -49,6 +50,12 @@ class ResourceHandle implements IO\CloseSeekReadWriteHandleInterface

private bool $blocks;

private string $readWatcher = '';
private string $writeWatcher = '';

private ?Suspension $readSuspension = null;
private ?Suspension $writeSuspension = null;

/**
* @param resource|object $resource
*
Expand All @@ -75,7 +82,7 @@ public function __construct(mixed $resource, bool $read, bool $write, bool $seek
$meta = stream_get_meta_data($resource);
$this->blocks = ($meta['wrapper_type'] ?? '') === 'plainfile';
if ($seek) {
$seekable = (bool) $meta['seekable'];
$seekable = (bool)$meta['seekable'];

Psl\invariant($seekable, 'Handle is not seekable.');
}
Expand All @@ -84,17 +91,32 @@ public function __construct(mixed $resource, bool $read, bool $write, bool $seek
$readable = str_contains($meta['mode'], 'r') || str_contains($meta['mode'], '+');

Psl\invariant($readable, 'Handle is not readable.');

$suspension = &$this->readSuspension;
$this->readWatcher = EventLoop::onReadable($resource, static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->resume(null);
});

EventLoop::disable($this->readWatcher);
}

if ($write) {
$writable = str_contains($meta['mode'], 'x')
|| str_contains($meta['mode'], 'w')
|| str_contains($meta['mode'], 'c')
|| str_contains($meta['mode'], 'a')
|| str_contains($meta['mode'], '+')
;
|| str_contains($meta['mode'], '+');

Psl\invariant($writable, 'Handle is not writeable.');

$suspension = &$this->writeSuspension;
$this->writeWatcher = EventLoop::onWritable($resource, static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->resume(null);
});

EventLoop::disable($this->writeWatcher);
}

$this->useSingleRead = $meta["stream_type"] === "udp_socket" || $meta["stream_type"] === "STDIO";
Expand All @@ -119,18 +141,32 @@ public function write(string $bytes, ?float $timeout = null): int

$bytes = substr($bytes, $written);

try {
/**
* @psalm-suppress PossiblyInvalidArgument
* @psalm-suppress PossiblyNullArgument - If null, writeImmediately would have thrown.
*/
Async\await_writable($this->resource, timeout: $timeout);
} catch (Async\Exception\TimeoutException) {
throw new Exception\TimeoutException('reached timeout while the handle is still not writable.');
} catch (Async\Exception\ResourceClosedException) {
$this->resource = null;
$this->writeSuspension = EventLoop::createSuspension();
$suspension = &$this->writeSuspension;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->writeWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->throw(
new Exception\TimeoutException('reached timeout while the handle is still not writable.')
);
}
);
}

throw new Exception\AlreadyClosedException('Handle has already been closed.');
try {
/** @var Suspension $suspension */
$suspension->suspend();
} finally {
$suspension = null;
EventLoop::disable($this->writeWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
}
}

return $written + $this->writeImmediately($bytes);
Expand All @@ -144,6 +180,10 @@ public function write(string $bytes, ?float $timeout = null): int
*/
public function writeImmediately(string $bytes): int
{
if (null !== $this->writeSuspension) {
throw new Exception\RuntimeException('Pending operation.');
}

if (null === $this->resource) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
}
Expand Down Expand Up @@ -217,18 +257,32 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string
return $chunk;
}

try {
/**
* @psalm-suppress PossiblyInvalidArgument
* @psalm-suppress PossiblyNullArgument - If null, writeImmediately would have thrown.
*/
Async\await_readable($this->resource, timeout: $timeout);
} catch (Async\Exception\TimeoutException) {
throw new Exception\TimeoutException('reached timeout while the handle is still not readable.');
} catch (Async\Exception\ResourceClosedException) {
$this->resource = null;
$this->readSuspension = EventLoop::createSuspension();
$suspension = &$this->readSuspension;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->readWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$delay_watcher = EventLoop::delay(
$timeout,
static function () use (&$suspension) {
/** @var Suspension|null $suspension */
$suspension?->throw(
new Exception\TimeoutException('reached timeout while the handle is still not readable.')
);
}
);
}

throw new Exception\AlreadyClosedException('Handle has already been closed.');
try {
/** @var Suspension $suspension */
$suspension->suspend();
} finally {
$suspension = null;
EventLoop::disable($this->readWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
}
}

return $this->readImmediately($max_bytes);
Expand All @@ -241,6 +295,10 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string
*/
public function readImmediately(?int $max_bytes = null): string
{
if (null !== $this->readSuspension) {
throw new Exception\RuntimeException('Pending operation.');
}

if (null === $this->resource) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
}
Expand Down Expand Up @@ -285,5 +343,8 @@ public function close(): void

throw new Exception\RuntimeException($error['message'] ?? 'unknown error.');
}

$this->readSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
$this->writeSuspension?->throw(throw new Exception\AlreadyClosedException('Handle has already been closed.'));
}
}
1 change: 0 additions & 1 deletion src/Psl/Internal/Loader.php
Expand Up @@ -474,7 +474,6 @@ final class Loader
'Psl\Network\Internal\get_sock_name',
'Psl\Network\Internal\socket_connect',
'Psl\Network\Internal\server_listen',
'Psl\Network\Internal\socket_accept',
'Psl\TCP\connect',
'Psl\Unix\connect',
];
Expand Down
50 changes: 0 additions & 50 deletions src/Psl/Network/Internal/socket_accept.php

This file was deleted.

0 comments on commit 24d93a8

Please sign in to comment.