Skip to content

Commit

Permalink
Merge 8cfe1db into 1467f0d
Browse files Browse the repository at this point in the history
  • Loading branch information
azjezz committed Nov 17, 2021
2 parents 1467f0d + 8cfe1db commit e0f454b
Show file tree
Hide file tree
Showing 13 changed files with 147 additions and 124 deletions.
2 changes: 1 addition & 1 deletion docs/component/tcp.md
Expand Up @@ -21,7 +21,7 @@
#### `Classes`

- [ConnectOptions](./../../src/Psl/TCP/ConnectOptions.php#L7)
- [Server](./../../src/Psl/TCP/Server.php#L18)
- [Server](./../../src/Psl/TCP/Server.php#L17)
- [ServerOptions](./../../src/Psl/TCP/ServerOptions.php#L9)


2 changes: 1 addition & 1 deletion docs/component/unix.md
Expand Up @@ -20,6 +20,6 @@

#### `Classes`

- [Server](./../../src/Psl/Unix/Server.php#L17)
- [Server](./../../src/Psl/Unix/Server.php#L16)


14 changes: 4 additions & 10 deletions examples/tcp/basic-http-server.php
Expand Up @@ -17,26 +17,20 @@

IO\output_handle()->writeAll("Server is listening on http://localhost:3030\n");

Async\Scheduler::defer(static function () use ($server) {
Async\await_signal(SIGINT);
$server->stopListening();
});
Async\Scheduler::onSignal(SIGINT, $server->stopListening(...));

try {
while (true) {
$connection = $server->nextConnection();

Async\Scheduler::defer(static function() use ($connection) {
try {
$stream = $connection->getStream();

Async\await_readable($stream);
$connection->read();
$request = $connection->read();

$connection->writeAll("HTTP/1.1 200 OK\n");
$connection->writeAll("Server: PHP+PSL\n");
$connection->writeAll("Server: PHP-Standard-Library TCP Server - https://github.com/azjezz/psl\n");
$connection->writeAll("Connection: close\n");
$connection->writeAll("Content-Type: text/plain\n\n");
$connection->writeAll("Content-Type: text/plain; charset=utf-8\n\n");
$connection->writeAll("Hello, World!");
$connection->close();
} catch (Throwable) {
Expand Down
96 changes: 72 additions & 24 deletions src/Psl/Async/Scheduler.php
Expand Up @@ -32,6 +32,54 @@ public static function createSuspension(): Suspension
return EventLoop::createSuspension();
}

/**
* Execute a callback when a signal is received.
*
* @param int $signal_number The signal number to monitor.
* @param (callable(string, int): void) $callback The callback to execute.
*
* @return non-empty-string A unique identifier that can be used to cancel, enable or disable the callback.
*
* @see EventLoop::onSignal()
*/
public static function onSignal(int $signal_number, callable $callback): string
{
/**
* @psalm-suppress MissingThrowsDocblock
*
* @var non-empty-string
*/
return EventLoop::onSignal($signal_number, $callback);
}

/**
* Execute a callback when a stream resource becomes readable or is closed for reading.
*
* @param resource|object $stream The stream to monitor.
* @param (callable(string, resource|object): void) $callback The callback to execute.
*
* @return non-empty-string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public static function onReadable(mixed $stream, callable $callback): string
{
/** @var non-empty-string */
return EventLoop::onReadable($stream, $callback);
}

/**
* Execute a callback when a stream resource becomes writable or is closed for writing.
*
* @param resource|object $stream The stream to monitor.
* @param (callable(string, resource|object): void) $callback The callback to execute.
*
* @return non-empty-string A unique identifier that can be used to cancel, enable or disable the callback.
*/
public static function onWritable(mixed $stream, callable $callback): string
{
/** @var non-empty-string */
return EventLoop::onWritable($stream, $callback);
}

/**
* Queue a microtask.
*
Expand All @@ -46,14 +94,14 @@ public static function queue(callable $callback): void
* Defer the execution of a callback.
*
* @param (callable(string): void) $callback The callback to defer.
* The `$callbackId` will be invalidated before the callback invocation.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
* @return non-empty-string A unique identifier that can be used to cancel, enable or disable the callback.
*
* @see EventLoop::defer()
*/
public static function defer(callable $callback): string
{
/** @var non-empty-string */
return EventLoop::defer($callback);
}

Expand All @@ -62,43 +110,46 @@ public static function defer(callable $callback): string
*
* @param float $delay The amount of time, to delay the execution for in seconds.
* @param (callable(string): void) $callback The callback to delay.
* The `$callbackId` will be invalidated before the callback invocation.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
* @return non-empty-string A unique identifier that can be used to cancel, enable or disable the callback.
*
* @see EventLoop::delay()
*/
public static function delay(float $delay, callable $callback): string
{
/** @var non-empty-string */
return EventLoop::delay($delay, $callback);
}

/**
* Repeatedly execute a callback.
*
* @param int $interval The time interval, to wait between executions in seconds.
* @param float $interval The time interval, to wait between executions in seconds.
* @param callable(string) $callback The callback to repeat.
*
* @return string A unique identifier that can be used to cancel, enable or disable the callback.
* @return non-empty-string A unique identifier that can be used to cancel, enable or disable the callback.
*
* @see EventLoop::repeat()
*/
public static function repeat(float $interval, callable $callback): string
{
/** @var non-empty-string */
return EventLoop::repeat($interval, $callback);
}

/**
* Enable a callback to be active starting in the next tick.
*
* @param non-empty-string $identifier The callback identifier.
*
* @throws Psl\Exception\InvariantViolationException If the callback identifier is invalid.
*
* @see EventLoop::repeat()
*/
public static function enable(string $callbackId): void
public static function enable(string $identifier): void
{
try {
EventLoop::enable($callbackId);
EventLoop::enable($identifier);
} catch (InvalidCallbackError $error) {
Psl\invariant_violation($error->getMessage());
}
Expand All @@ -107,42 +158,40 @@ public static function enable(string $callbackId): void
/**
* Disable a callback immediately.
*
* @param string $identifier The callback identifier.
*
* @see EventLoop::disable()
*/
public static function disable(string $callbackId): void
public static function disable(string $identifier): void
{
EventLoop::disable($callbackId);
EventLoop::disable($identifier);
}

/**
* Cancel a callback.
*
* This will detach the event loop from all resources that are associated to the callback. After this operation the
* callback is permanently invalid. Calling this function MUST NOT fail, even if passed an invalid identifier.
*
* @param string $callbackId The callback identifier.
* @param string $identifier The callback identifier.
*
* @see EventLoop::cancel()
*/
public static function cancel(string $callbackId): void
public static function cancel(string $identifier): void
{
EventLoop::cancel($callbackId);
EventLoop::cancel($identifier);
}

/**
* Reference a callback.
*
* This will keep the event loop alive whilst the event is still being monitored. Callbacks have this state by
* default.
* @param non-empty-string $identifier The callback identifier.
*
* @throws Psl\Exception\InvariantViolationException If the callback identifier is invalid.
*
* @see EventLoop::reference()
*/
public static function reference(string $callbackId): void
public static function reference(string $identifier): void
{
try {
EventLoop::reference($callbackId);
EventLoop::reference($identifier);
} catch (InvalidCallbackError $error) {
Psl\invariant_violation($error->getMessage());
}
Expand All @@ -151,13 +200,12 @@ public static function reference(string $callbackId): void
/**
* Unreference a callback.
*
* The event loop should exit the run method when only unreferenced callbacks are still being monitored. Callbacks
* are all referenced by default.
* @param string $identifier The callback identifier.
*
* @see EventLoop::unreference()
*/
public static function unreference(string $callbackId): void
public static function unreference(string $identifier): void
{
EventLoop::unreference($callbackId);
EventLoop::unreference($identifier);
}
}
3 changes: 0 additions & 3 deletions src/Psl/Exception/ExceptionInterface.php
Expand Up @@ -6,9 +6,6 @@

use Throwable;

/**
* @internal
*/
interface ExceptionInterface extends Throwable
{
}
36 changes: 21 additions & 15 deletions src/Psl/IO/Internal/ResourceHandle.php
Expand Up @@ -9,7 +9,6 @@
use Psl\IO;
use Psl\IO\Exception;
use Psl\Type;
use Revolt\EventLoop;

use function error_get_last;
use function fclose;
Expand Down Expand Up @@ -50,8 +49,15 @@ class ResourceHandle implements IO\Stream\CloseSeekReadWriteHandleInterface

private bool $blocks;

private string $readWatcher = '';
private string $writeWatcher = '';
/**
* @var non-empty-string
*/
private string $readWatcher = 'invalid';

/**
* @var non-empty-string
*/
private string $writeWatcher = 'invalid';

private ?Async\Deferred $readDeferred = null;
private ?Async\Deferred $writeDeferred = null;
Expand Down Expand Up @@ -84,12 +90,12 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek)
Psl\invariant($readable, 'Handle is not readable.');

$deferred = &$this->readDeferred;
$this->readWatcher = EventLoop::onReadable($stream, static function () use (&$deferred) {
$this->readWatcher = Async\Scheduler::onReadable($stream, static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->complete(null);
});

EventLoop::disable($this->readWatcher);
Async\Scheduler::disable($this->readWatcher);
}

if ($write) {
Expand All @@ -102,12 +108,12 @@ public function __construct(mixed $stream, bool $read, bool $write, bool $seek)
Psl\invariant($writable, 'Handle is not writeable.');

$deferred = &$this->writeDeferred;
$this->writeWatcher = EventLoop::onWritable($stream, static function () use (&$deferred) {
$this->writeWatcher = Async\Scheduler::onWritable($stream, static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
$deferred?->complete(null);
});

EventLoop::disable($this->writeWatcher);
Async\Scheduler::disable($this->writeWatcher);
}

$this->useSingleRead = $meta["stream_type"] === "udp_socket" || $meta["stream_type"] === "STDIO";
Expand All @@ -128,11 +134,11 @@ public function write(string $bytes, ?float $timeout = null): int
$this->writeDeferred = new Async\Deferred();
$deferred = &$this->writeDeferred;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->writeWatcher);
Async\Scheduler::enable($this->writeWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$timeout = $timeout < 0.0 ? 0.0 : $timeout;
$delay_watcher = EventLoop::delay(
$delay_watcher = Async\Scheduler::delay(
$timeout,
static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
Expand All @@ -148,9 +154,9 @@ static function () use (&$deferred) {
$deferred->getAwaitable()->await();
} finally {
$deferred = null;
EventLoop::disable($this->writeWatcher);
Async\Scheduler::disable($this->writeWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
Async\Scheduler::cancel($delay_watcher);
}
}

Expand Down Expand Up @@ -236,11 +242,11 @@ public function read(?int $max_bytes = null, ?float $timeout = null): string
$this->readDeferred = new Async\Deferred();
$deferred = &$this->readDeferred;
/** @psalm-suppress MissingThrowsDocblock */
EventLoop::enable($this->readWatcher);
Async\Scheduler::enable($this->readWatcher);
$delay_watcher = null;
if (null !== $timeout) {
$timeout = $timeout < 0.0 ? 0.0 : $timeout;
$delay_watcher = EventLoop::delay(
$delay_watcher = Async\Scheduler::delay(
$timeout,
static function () use (&$deferred) {
/** @var Async\Deferred|null $deferred */
Expand All @@ -256,9 +262,9 @@ static function () use (&$deferred) {
$deferred->getAwaitable()->await();
} finally {
$deferred = null;
EventLoop::disable($this->readWatcher);
Async\Scheduler::disable($this->readWatcher);
if (null !== $delay_watcher) {
EventLoop::cancel($delay_watcher);
Async\Scheduler::cancel($delay_watcher);
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/Psl/Internal/suppress.php
Expand Up @@ -17,13 +17,11 @@
*/
function suppress(callable $fun)
{
/** @psalm-suppress ImpureFunctionCall */
$previous_level = error_reporting(0);

try {
return $fun();
} finally {
/** @psalm-suppress ImpureFunctionCall */
error_reporting($previous_level);
}
}

0 comments on commit e0f454b

Please sign in to comment.