Skip to content

Commit

Permalink
Remove close() from interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed May 11, 2017
1 parent 98bbffb commit cfbf25e
Show file tree
Hide file tree
Showing 10 changed files with 42 additions and 117 deletions.
4 changes: 1 addition & 3 deletions lib/GzipInputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,8 @@ public function read(): Promise {
});
}

public function close() {
protected function close() {
$this->resource = null;

$this->source->close();
$this->source = null;
}
}
4 changes: 1 addition & 3 deletions lib/GzipOutputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,8 @@ public function end(string $finalData = ""): Promise {
return $promise;
}

public function close() {
protected function close() {
$this->resource = null;

$this->destination->close();
$this->destination = null;
}
}
9 changes: 0 additions & 9 deletions lib/InMemoryStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,4 @@ public function read(): Promise {

return $promise;
}

/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* @return void
*/
public function close() {
$this->contents = null;
}
}
10 changes: 0 additions & 10 deletions lib/InputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,4 @@ interface InputStream {
* @throws PendingReadException Thrown if another read operation is still pending.
*/
public function read(): Promise;

/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you
* want to allow half-closed duplex streams, you must use different objects for input and output.
*
* @return void
*/
public function close();
}
23 changes: 5 additions & 18 deletions lib/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ class Message implements InputStream, Promise {
/** @var \Amp\Deferred|null */
private $backpressure;

/** @var bool True if close() is called or the iterator has completed. */
private $closed = false;
/** @var bool True if the iterator has completed. */
private $complete = false;

/**
* @param \Amp\Iterator $iterator An iterator that only emits strings.
Expand Down Expand Up @@ -71,7 +71,7 @@ private function iterate(Iterator $iterator): \Generator {
$buffer = ""; // Destroy last emitted chunk to free memory.
}

$this->closed = true;
$this->complete = true;

if ($this->pendingRead) {
$deferred = $this->pendingRead;
Expand Down Expand Up @@ -100,8 +100,8 @@ public function read(): Promise {

return new Success($buffer);
}
if ($this->closed) {

if ($this->complete) {
return new Success;
}

Expand All @@ -123,17 +123,4 @@ public function onResolve(callable $onResolved) {

$this->coroutine->onResolve($onResolved);
}

public function close() {
$this->buffering = true;
$this->closed = true;

if ($this->pendingRead) {
$deferred = $this->pendingRead;
$this->pendingRead = null;
$deferred->resolve($this->buffer === "" ? $this->buffer : null);
}

$this->buffer = "";
}
}
15 changes: 3 additions & 12 deletions lib/OutputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ interface OutputStream {
public function write(string $data): Promise;

/**
* Closes the stream after all pending writes have been completed. Optionally writes a final data chunk before.
* Marks the stream as no longer writable. Optionally writes a final data chunk before. Note that this is not the
* same as forcefully closing the stream. This method waits for all pending writes to complete before closing the
* stream. Socket streams implementing this interface should only close the writable side of the stream.
*
* @param string $finalData Bytes to write.
*
Expand All @@ -29,15 +31,4 @@ public function write(string $data): Promise;
* @throws ClosedException If the stream has already been closed.
*/
public function end(string $finalData = ""): Promise;

/**
* Closes the stream forcefully. Multiple `close()` calls are ignored. Successful streams should always be closed
* via `end()`.
*
* Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you
* want to allow half-closed duplex streams, you must use different objects for input and output.
*
* @return void
*/
public function close();
}
9 changes: 1 addition & 8 deletions lib/Parser.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public function __construct(\Generator $generator) {
* @return string
*/
public function cancel(): string {
$this->close();
$this->generator = null;
return $this->buffer;
}

Expand Down Expand Up @@ -136,11 +136,4 @@ private function send(string $data, bool $end = false): Promise {
}
}
}

/**
* @inheritdoc
*/
public function close() {
$this->generator = null;
}
}
34 changes: 11 additions & 23 deletions lib/ResourceInputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,16 @@ class ResourceInputStream implements InputStream {
/** @var bool */
private $readable = true;

/** @var bool */
private $autoClose = true;

public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE, $autoClose = true) {
if (!is_resource($stream) || get_resource_type($stream) !== 'stream') {
public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE) {
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}

$meta = \stream_get_meta_data($stream);

if (isset($meta["mode"]) && $meta["mode"] !== ""
&& strpos($meta["mode"], "r") === false
&& strpos($meta["mode"], "+") === false
&& \strpos($meta["mode"], "r") === false
&& \strpos($meta["mode"], "+") === false
) {
throw new \Error("Expected a readable stream");
}
Expand All @@ -43,13 +40,13 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE,
\stream_set_read_buffer($stream, 0);

$this->resource = $stream;
$this->autoClose = $autoClose;

$deferred = &$this->deferred;
$readable = &$this->readable;
$resource = &$this->resource;

$this->watcher = Loop::onReadable($this->resource, static function ($watcher, $stream) use (
&$deferred, &$readable, $chunkSize
&$deferred, &$readable, &$resource, $chunkSize
) {
if ($deferred === null) {
return;
Expand All @@ -60,6 +57,7 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE,

if ($data === false || ($data === '' && (\feof($stream) || !\is_resource($stream)))) {
$readable = false;
$resource = null;
Loop::cancel($watcher);
$data = null; // Stream closed, resolve read with null.
}
Expand Down Expand Up @@ -101,20 +99,13 @@ public function read(): Promise {
/**
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* Note: If a class implements `InputStream` and `OutputStream`, `close()` will close both streams at once. If you
* want to allow half-closed duplex streams, you must use different objects for input and output.
*
* @return void
*/
public function close() {
protected function close() {
if ($this->resource === null) {
return;
}

if ($this->autoClose && \is_resource($this->resource)) {
@\fclose($this->resource);
}

$this->resource = null;
$this->readable = false;

Expand All @@ -127,13 +118,10 @@ public function close() {
Loop::cancel($this->watcher);
}

/**
* @return resource|null The stream resource or null if the stream has closed.
*/
public function getResource() {
return $this->resource;
}

public function __destruct() {
if ($this->autoClose) {
$this->close();
}
}
}
43 changes: 20 additions & 23 deletions lib/ResourceOutputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@ class ResourceOutputStream implements OutputStream {
/** @var bool */
private $writable = true;

/** @var bool */
private $autoClose = true;

public function __construct($stream, int $chunkSize = 8192, bool $autoClose = true) {
if (!is_resource($stream) || get_resource_type($stream) !== 'stream') {
public function __construct($stream, int $chunkSize = 8192) {
if (!\is_resource($stream) || \get_resource_type($stream) !== 'stream') {
throw new \Error("Expected a valid stream");
}

Expand All @@ -39,12 +36,12 @@ public function __construct($stream, int $chunkSize = 8192, bool $autoClose = tr
\stream_set_write_buffer($stream, 0);

$this->resource = $stream;
$this->autoClose = $autoClose;

$writes = $this->writes = new \SplQueue;
$writable = &$this->writable;
$resource = &$this->resource;

$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$writable) {
$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$writable, &$resource) {
try {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
Expand All @@ -61,17 +58,20 @@ public function __construct($stream, int $chunkSize = 8192, bool $autoClose = tr

if ($written === false || $written === 0) {
$writable = false;
$resource = null;

$message = "Failed to write to socket";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
$exception = new \Exception($message);
$exception = new StreamException($message);
$deferred->fail($exception);
while (!$writes->isEmpty()) {
list(, , $deferred) = $writes->shift();
$deferred->fail($exception);
}

Loop::cancel($watcher);
return;
}

Expand Down Expand Up @@ -127,8 +127,8 @@ public function end(string $finalData = ""): Promise {
* @return Promise
*/
private function send(string $data, bool $end = false): Promise {
if (!$this->writable) {
return new Failure(new \Exception("The stream is not writable"));
if ($this->resource === null) {
return new Failure(new StreamException("The stream is not writable"));
}

$length = \strlen($data);
Expand Down Expand Up @@ -173,24 +173,24 @@ private function send(string $data, bool $end = false): Promise {
$promise = $deferred->promise();

if ($end) {
$promise->onResolve([$this, 'close']);
$promise->onResolve(function () {
$this->close();
});
}

return $promise;
}

/**
* @inheritdoc
* Closes the stream forcefully. Multiple `close()` calls are ignored.
*
* @return void
*/
public function close() {
protected function close() {
if ($this->resource !== null) {
return;
}

if (\is_resource($this->resource)) {
@\fclose($this->resource);
}

$this->resource = null;
$this->writable = false;

Expand All @@ -206,13 +206,10 @@ public function close() {
Loop::cancel($this->watcher);
}

/**
* @return resource|null Stream resource or null if end() has been called or the stream closed.
*/
public function getResource() {
return $this->resource;
}

public function __destruct() {
if ($this->autoClose) {
$this->close();
}
}
}
8 changes: 0 additions & 8 deletions test/InMemoryStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,4 @@ public function testSingleReadConsumesEverything() {
$this->assertNull(yield $stream->read());
});
}

public function testCloseClearsContents() {
Loop::run(function () {
$stream = new InMemoryStream("foobar");
$stream->close();
$this->assertNull(yield $stream->read());
});
}
}

0 comments on commit cfbf25e

Please sign in to comment.