Skip to content

Commit

Permalink
Merge b30e69b into 0dff1fe
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Oct 6, 2017
2 parents 0dff1fe + b30e69b commit 70fc8f9
Showing 1 changed file with 25 additions and 3 deletions.
28 changes: 25 additions & 3 deletions lib/ResourceInputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ final class ResourceInputStream implements InputStream {
/** @var bool */
private $readable = true;

/** @var int */
private $chunkSize = self::DEFAULT_CHUNK_SIZE;

/**
* @param resource $stream Stream resource.
* @param int $chunkSize Chunk size per `fread()` operation.
Expand All @@ -44,6 +47,7 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE)
\stream_set_read_buffer($stream, 0);

$this->resource = $stream;
$this->chunkSize = $chunkSize;

$deferred = &$this->deferred;
$readable = &$this->readable;
Expand All @@ -53,6 +57,7 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE)
$data = @\fread($stream, $chunkSize);

\assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");

if ($data === '' && \feof($stream)) {
$readable = false;
Loop::cancel($watcher);
Expand Down Expand Up @@ -81,10 +86,27 @@ public function read(): Promise {
return new Success; // Resolve with null on closed stream.
}

$this->deferred = new Deferred;
Loop::enable($this->watcher);
// Attempt a direct read, because the read buffer might be filled due to stream wrappers such as OpenSSL.

// Error reporting suppressed since fread() produces a warning if the stream has been shutdown
$data = @\fread($this->resource, $this->chunkSize);

\assert($data !== false, "Trying to read from a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");

if ($data === '') {
if (\feof($this->resource)) {
$this->readable = false;
Loop::cancel($this->watcher);
$data = null; // Stream closed, resolve read with null.
} else {
$this->deferred = new Deferred;
Loop::enable($this->watcher);

return $this->deferred->promise();
}
}

return $this->deferred->promise();
return new Success($data);
}

/**
Expand Down

0 comments on commit 70fc8f9

Please sign in to comment.