Skip to content

Commit

Permalink
Proper fix for remote closed streams
Browse files Browse the repository at this point in the history
Fixes #40. Streams on MacOS (and possibly FreeBSD) that are closed by the remote still allow writing, returning a non-zero from fwrite(). EOF then is false, since data was written to the buffer. EOF needed to be checked before calling fwrite().
  • Loading branch information
trowski committed Apr 3, 2018
1 parent 5403767 commit 5aeb553
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 27 deletions.
39 changes: 15 additions & 24 deletions lib/ResourceOutputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,6 @@ public function __construct($stream, int $chunkSize = null) {
$resource = &$this->resource;

$this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, $chunkSize, &$writable, &$resource) {
static $emptyWrites = 0;

try {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
Expand All @@ -68,30 +66,11 @@ public function __construct($stream, int $chunkSize = null) {
continue;
}

// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
// Use conditional, because PHP doesn't like getting null passed
if ($chunkSize) {
$written = @\fwrite($stream, $data, $chunkSize);
} else {
$written = @\fwrite($stream, $data);
}

\assert($written !== false, "Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");
if ($written === 0) {
// fwrite will also return 0 if the buffer is already full.
if ($emptyWrites++ < self::MAX_CONSECUTIVE_EMPTY_WRITES && \is_resource($stream) && !@\feof($stream)) {
$writes->unshift([$data, $previous, $deferred]);
return;
}

if (!\is_resource($stream) || @\feof($stream)) {
$resource = null;
$writable = false;

$message = "Failed to write to stream";
if ($error = \error_get_last()) {
$message .= \sprintf("; %s", $error["message"]);
}
$exception = new StreamException($message);
$exception = new StreamException("The stream was closed by the peer");
$deferred->fail($exception);
while (!$writes->isEmpty()) {
list(, , $deferred) = $writes->shift();
Expand All @@ -102,7 +81,15 @@ public function __construct($stream, int $chunkSize = null) {
return;
}

$emptyWrites = 0;
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
// Use conditional, because PHP doesn't like getting null passed
if ($chunkSize) {
$written = @\fwrite($stream, $data, $chunkSize);
} else {
$written = @\fwrite($stream, $data);
}

\assert($written !== false, "Trying to write on a previously fclose()'d resource. Do NOT manually fclose() resources the loop still has a reference to.");

if ($length > $written) {
$data = \substr($data, $written);
Expand Down Expand Up @@ -168,6 +155,10 @@ private function send(string $data, bool $end = false): Promise {
return new Success(0);
}

if (!\is_resource($this->resource) || @\feof($this->resource)) {
return new Failure(new StreamException("The stream was closed by the peer"));
}

// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
// Use conditional, because PHP doesn't like getting null passed.
if ($this->chunkSize) {
Expand Down
6 changes: 3 additions & 3 deletions test/ResourceOutputStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function testBrokenPipe() {
\fclose($b);

$this->expectException(StreamException::class);
$this->expectExceptionMessage("Failed to write to stream; fwrite():");
$this->expectExceptionMessage("The stream was closed by the peer");
wait($stream->write("foobar"));
}

Expand All @@ -54,7 +54,7 @@ public function testClosedRemoteSocket() {
\fclose($b);

$this->expectException(StreamException::class);
$this->expectExceptionMessage("Failed to write to stream; fwrite():");
$this->expectExceptionMessage("The stream was closed by the peer");

// The first write still succeeds somehow...
wait($stream->write("foobar"));
Expand All @@ -81,7 +81,7 @@ public function testClosedRemoteSocketWithFork() {
\fclose($b);

$this->expectException(StreamException::class);
$this->expectExceptionMessage("Failed to write to stream; fwrite():");
$this->expectExceptionMessage("The stream was closed by the peer");

try {
// The first write still succeeds somehow...
Expand Down

0 comments on commit 5aeb553

Please sign in to comment.