Skip to content

Commit

Permalink
added GeneratorReadStream wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
alextartan committed Jul 25, 2019
1 parent 590e7fd commit 6c6f28b
Show file tree
Hide file tree
Showing 7 changed files with 312 additions and 62 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"require": {
"php": "^7.2",
"ext-readline": "*",
"psr/log": "^1.1.0",
"zendframework/zend-validator": "^2.10"
},
"require-dev": {
Expand Down
124 changes: 62 additions & 62 deletions composer.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions phpcs.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,9 @@
<!-- Paths to check -->
<file>src</file>
<file>test</file>

<!-- exclude these files because they implement internal snake_case methods-->
<rule ref="PSR1.Methods.CamelCapsMethodName.NotCamelCaps">
<exclude-pattern>src/Helpers/Stream/</exclude-pattern>
</rule>
</ruleset>
131 changes: 131 additions & 0 deletions src/Helpers/Stream/GeneratorReadStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
<?php
declare(strict_types=1);

namespace AlexTartan\Helpers\Stream;

use Generator;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
use function in_array;
use function stream_context_get_default;
use function stream_context_get_options;
use function stream_context_set_default;
use function strlen;
use function trigger_error;
use const E_USER_ERROR;

class GeneratorReadStream implements Stream
{
public const PROTOCOL = 'generator';

/** @var string */
private $path;

/** @var string */
private $buffer;

/** @var Generator */
private $generator;

/** @var LoggerInterface */
private $logger;

public static function createResourceUrl(
Generator $generator,
LoggerInterface $logger = null
): string {
if (in_array(self::PROTOCOL, stream_get_wrappers(), true)) {
stream_wrapper_unregister(self::PROTOCOL);
}
stream_wrapper_register(self::PROTOCOL, static::class);

// set defaults
$default = stream_context_get_options(stream_context_get_default());
$default[self::PROTOCOL]['generator'] = $generator;
$default[self::PROTOCOL]['id'] = uniqid('', true);
$default[self::PROTOCOL]['buffer'] = '';
$default[self::PROTOCOL]['logger'] = $logger ?? new NullLogger();

stream_context_set_default($default);

return self::PROTOCOL . '://' . $default[self::PROTOCOL]['id'];
}

public function stream_open(string $path, string $mode, int $options = STREAM_REPORT_ERRORS, string &$opened_path = null): bool
{
if (!in_array($mode, ['r', 'rb'], true)) {
return $this->triggerError('This stream is readonly');
}

$this->initProtocol($path);

return true;
}

public function stream_read(int $count): string
{
$this->logger->info("stream_read called asking for $count bytes");

$out = $this->buffer;
$currentLength = strlen($this->buffer);

while ($currentLength < $count && $this->generator->valid()) {
$currentValue = $this->generator->current();
$out .= $currentValue;
$this->generator->next();
$currentLength += strlen($currentValue);
$this->logger->info('loop read ' . strlen($currentValue) . ' bytes');
}
$this->logger->info("read $currentLength bytes");

// grabbing the requested size from what has been read
$returnValue = substr($out, 0, $count);

// storing the rest of the read content into the buffer, so it gets picked up on the next iteration
if (strlen($out) > $count) {
$this->buffer = substr($out, $count);
} else {
$this->buffer = '';
}

$this->logger->info('buffer now contains ' . strlen($this->buffer));
$this->logger->info('EOF?: ' . ($this->stream_eof() ? 'Y' : 'N'));

return $returnValue;
}

public function stream_eof(): bool
{
return !$this->generator->valid() && $this->buffer === '';
}

public function stream_close(): void
{
$this->logger->info("stream closed. buffer is: $this->buffer");
}

/**
* Parse the protocol out of the given path.
*/
private function initProtocol(string $path): void
{
$parts = explode('://', $path, 2);
$this->path = $parts[1];
$options = $this->getOptions()[self::PROTOCOL];
$this->buffer = $options['buffer'];
$this->generator = $options['generator'];
$this->logger = $options['logger'];
}

private function getOptions(): array
{
return stream_context_get_options(stream_context_get_default());
}

private function triggerError(string $error): bool
{
trigger_error($error, E_USER_ERROR);

return false;
}
}
16 changes: 16 additions & 0 deletions src/Helpers/Stream/Stream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<?php

namespace AlexTartan\Helpers\Stream;

use const STREAM_REPORT_ERRORS;

interface Stream
{
public function stream_open(string $path, string $mode, int $options = STREAM_REPORT_ERRORS, string &$opened_path = null): bool;

public function stream_read(int $count): string;

public function stream_eof(): bool;

public function stream_close(): void;
}
Loading

0 comments on commit 6c6f28b

Please sign in to comment.