Skip to content

Commit

Permalink
feat(io): introduce ReadHandle::reachedEndOfDataSource
Browse files Browse the repository at this point in the history
Signed-off-by: azjezz <azjezz@protonmail.com>
  • Loading branch information
azjezz committed Mar 27, 2024
1 parent b66e921 commit 25eeda7
Show file tree
Hide file tree
Showing 19 changed files with 258 additions and 118 deletions.
7 changes: 3 additions & 4 deletions examples/channel/bounded.php
Expand Up @@ -30,12 +30,11 @@

for ($i = 0; $i < 10; $i++) {
$file = File\open_read_only(__FILE__);
$reader = new IO\Reader($file);
while (!$reader->isEndOfFile()) {
$byte = $reader->readByte();

while ($byte = $file->readAll(1)) {
$sender->send($byte);
}

$file->close();
}

IO\write_error_line("[ sender ]: completed.");
Expand Down
8 changes: 8 additions & 0 deletions src/Psl/File/ReadHandle.php
Expand Up @@ -39,6 +39,14 @@ public function __construct(string $file)
parent::__construct($this->readHandle);
}

/**
* {@inheritDoc}
*/
public function reachedEndOfDataSource(): bool
{
return $this->readHandle->reachedEndOfDataSource();
}

/**
* {@inheritDoc}
*/
Expand Down
8 changes: 8 additions & 0 deletions src/Psl/File/ReadWriteHandle.php
Expand Up @@ -67,6 +67,14 @@ public function __construct(string $file, WriteMode $write_mode = WriteMode::Ope
parent::__construct($this->readWriteHandle);
}

/**
* {@inheritDoc}
*/
public function reachedEndOfDataSource(): bool
{
return $this->readWriteHandle->reachedEndOfDataSource();
}

/**
* {@inheritDoc}
*/
Expand Down
8 changes: 8 additions & 0 deletions src/Psl/IO/CloseReadStreamHandle.php
Expand Up @@ -23,6 +23,14 @@ public function __construct(mixed $stream)
$this->handle = new Internal\ResourceHandle($stream, read: true, write: false, seek: false, close: true);
}

/**
* {@inheritDoc}
*/
public function reachedEndOfDataSource(): bool
{
return $this->handle->reachedEndOfDataSource();
}

/**
* {@inheritDoc}
*/
Expand Down
8 changes: 8 additions & 0 deletions src/Psl/IO/CloseReadWriteStreamHandle.php
Expand Up @@ -24,6 +24,14 @@ public function __construct(mixed $stream)
$this->handle = new Internal\ResourceHandle($stream, read: true, write: true, seek: false, close: true);
}

/**
* {@inheritDoc}
*/
public function reachedEndOfDataSource(): bool
{
return $this->handle->reachedEndOfDataSource();
}

/**
* {@inheritDoc}
*/
Expand Down
8 changes: 8 additions & 0 deletions src/Psl/IO/CloseSeekReadStreamHandle.php
Expand Up @@ -23,6 +23,14 @@ public function __construct(mixed $stream)
$this->handle = new Internal\ResourceHandle($stream, read: true, write: false, seek: true, close: true);
}

/**
* {@inheritDoc}
*/
public function reachedEndOfDataSource(): bool
{
return $this->handle->reachedEndOfDataSource();
}

/**
* {@inheritDoc}
*/
Expand Down
8 changes: 8 additions & 0 deletions src/Psl/IO/CloseSeekReadWriteStreamHandle.php
Expand Up @@ -24,6 +24,14 @@ public function __construct(mixed $stream)
$this->handle = new Internal\ResourceHandle($stream, read: true, write: true, seek: true, close: true);
}

/**
* {@inheritDoc}
*/
public function reachedEndOfDataSource(): bool
{
return $this->handle->reachedEndOfDataSource();
}

/**
* {@inheritDoc}
*/
Expand Down
85 changes: 60 additions & 25 deletions src/Psl/IO/Internal/ResourceHandle.php
Expand Up @@ -14,15 +14,19 @@

use function error_get_last;
use function fclose;
use function feof;
use function fread;
use function fseek;
use function ftell;
use function fwrite;
use function is_resource;
use function max;
use function str_contains;
use function stream_get_contents;
use function stream_get_meta_data;
use function stream_set_blocking;
use function stream_set_read_buffer;
use function stream_set_write_buffer;
use function substr;

/**
Expand All @@ -46,54 +50,57 @@ class ResourceHandle implements IO\CloseSeekReadWriteStreamHandleInterface
*/
protected mixed $stream;

/**
* @var null|Async\Sequence<array{null|int<1, max>, null|float}, string>
*/
private ?Async\Sequence $readSequence = null;

private ?Suspension $readSuspension = null;

/**
* @var string
*/
private string $readWatcher = 'invalid';

/**
* @var null|Async\Sequence<array{string, null|float}, int<0, max>>
*/
private ?Async\Sequence $writeSequence = null;

private ?Suspension $writeSuspension = null;
private string $writeWatcher = 'invalid';

/**
* @var string
* @var null|Async\Sequence<array{null|int<1, max>, null|float}, string>
*/
private string $writeWatcher = 'invalid';
private ?Async\Sequence $readSequence = null;
private ?Suspension $readSuspension = null;
private string $readWatcher = 'invalid';

private bool $useSingleRead = false;
private bool $reachedEof = false;

/**
* @param resource $stream
*/
public function __construct(mixed $stream, bool $read, bool $write, bool $seek, private bool $close)
public function __construct(mixed $stream, bool $read, bool $write, bool $seek, private readonly bool $close)
{
/** @psalm-suppress RedundantConditionGivenDocblockType - The stream is always a resource, but we want to make sure it is a stream resource. */
$this->stream = Type\resource('stream')->assert($stream);

/** @psalm-suppress UnusedFunctionCall */
stream_set_read_buffer($stream, 0);
stream_set_blocking($stream, false);

$meta = stream_get_meta_data($stream);
if ($read) {
$this->useSingleRead = ($meta["stream_type"] === "udp_socket" || $meta["stream_type"] === "STDIO");
}

$blocks = $meta['blocked'] || ($meta['wrapper_type'] ?? '') === 'plainfile';
if ($seek) {
Psl\invariant($meta['seekable'], 'Handle is not seekable.');
$seekable = $meta['seekable'];

Psl\invariant($seekable, 'Handle is not seekable.');
}

if ($read) {
Psl\invariant(str_contains($meta['mode'], 'r') || str_contains($meta['mode'], '+'), 'Handle is not readable.');
$readable = str_contains($meta['mode'], 'r') || str_contains($meta['mode'], '+');

Psl\invariant($readable, 'Handle is not readable.');

/** @psalm-suppress UnusedFunctionCall */
stream_set_read_buffer($stream, 0);

$this->readWatcher = EventLoop::onReadable($this->stream, function () {
$this->readSuspension?->resume(null);
$this->readSuspension?->resume();
});

$this->readSequence = new Async\Sequence(
/**
* @param array{null|int<1, max>, null|float} $input
Expand Down Expand Up @@ -142,11 +149,14 @@ function (array $input) use ($blocks): string {
|| str_contains($meta['mode'], 'a')
|| str_contains($meta['mode'], '+');

Psl\invariant($writable, 'Handle is not writeable.');
Psl\invariant($writable, 'Handle is not writeable.');

stream_set_write_buffer($stream, 0);

$this->writeWatcher = EventLoop::onReadable($this->stream, function () {
$this->writeSuspension?->resume(null);
$this->writeWatcher = EventLoop::onWritable($this->stream, function () {
$this->writeSuspension?->resume();
});

$this->writeSequence = new Async\Sequence(
/**
* @param array{string, null|float} $input
Expand Down Expand Up @@ -254,6 +264,22 @@ public function tell(): int
return max($result, 0);
}

/**
* {@inheritDoc}
*/
public function reachedEndOfDataSource(): bool
{
if (!is_resource($this->stream)) {
throw new Exception\AlreadyClosedException('Handle has already been closed.');
}

if ($this->reachedEof) {
return true;
}

return $this->reachedEof = feof($this->stream);
}

/**
* {@inheritDoc}
*/
Expand All @@ -279,14 +305,23 @@ public function tryRead(?int $max_bytes = null): string
$max_bytes = self::MAXIMUM_READ_BUFFER_SIZE;
}

$result = fread($this->stream, $max_bytes);
if ($this->useSingleRead) {
$result = fread($this->stream, $max_bytes);
} else {
$result = stream_get_contents($this->stream, $max_bytes);
}

if ($result === false) {
/** @var array{message: string} $error */
$error = error_get_last();

throw new Exception\RuntimeException($error['message'] ?? 'unknown error.');
}

if ($result === '' && feof($this->stream)) {
$this->reachedEof = true;
}

return $result;
}

Expand Down
29 changes: 15 additions & 14 deletions src/Psl/IO/MemoryHandle.php
Expand Up @@ -21,20 +21,25 @@ final class MemoryHandle implements CloseSeekReadWriteHandleInterface
private int $offset = 0;
private string $buffer;
private bool $closed = false;
private bool $reachedEof = false;

public function __construct(string $buffer = '')
{
$this->buffer = $buffer;
}

/**
* Read from the handle.
*
* @param positive-int|null $max_bytes the maximum number of bytes to read.
*
* @throws Exception\AlreadyClosedException If the handle has been already closed.
*
* @return string the read data on success, or an empty string if the end of file is reached.
* {@inheritDoc}
*/
public function reachedEndOfDataSource(): bool
{
$this->assertHandleIsOpen();

return $this->reachedEof;
}

/**
* {@inheritDoc}
*/
public function tryRead(?int $max_bytes = null): string
{
Expand All @@ -46,6 +51,8 @@ public function tryRead(?int $max_bytes = null): string

$length = strlen($this->buffer);
if ($this->offset >= $length) {
$this->reachedEof = true;

return '';
}

Expand All @@ -58,13 +65,7 @@ public function tryRead(?int $max_bytes = null): string
}

/**
* Read from the handle.
*
* @param positive-int|null $max_bytes the maximum number of bytes to read.
*
* @throws Exception\AlreadyClosedException If the handle has been already closed.
*
* @return string the read data on success, or an empty string if the end of file is reached.
* {@inheritDoc}
*/
public function read(?int $max_bytes = null, ?float $timeout = null): string
{
Expand Down
6 changes: 3 additions & 3 deletions src/Psl/IO/ReadHandleConvenienceMethodsTrait.php
Expand Up @@ -17,7 +17,7 @@ trait ReadHandleConvenienceMethodsTrait
/**
* Read until there is no more data to read.
*
* It is possible for this to never return, e.g. if called on a pipe or
* It is possible for this to never return, e.g. if called on a pipe
* or socket which the other end keeps open forever. Set a timeout if you
* do not want this to happen.
*
Expand Down Expand Up @@ -60,15 +60,15 @@ static function () use ($data): void {
if ($to_read !== null) {
$to_read -= strlen($chunk);
}
} while (($to_read === null || $to_read > 0) && $chunk !== '');
} while (($to_read === null || $to_read > 0) && !$this->reachedEndOfDataSource());

return $data->value;
}

/**
* Read a fixed amount of data.
*
* It is possible for this to never return, e.g. if called on a pipe or
* It is possible for this to never return, e.g. if called on a pipe
* or socket which the other end keeps open forever. Set a timeout if you
* do not want this to happen.
*
Expand Down

0 comments on commit 25eeda7

Please sign in to comment.