From 0dd0eb85390b3ff057994c8f5936070da18ec8b3 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 1 May 2020 12:29:55 -0500 Subject: [PATCH] Fail only on first write of 0 in on-writable watcher --- .travis.yml | 11 ++++++++-- lib/ResourceOutputStream.php | 35 +++++++++++++++++++------------ test/ResourceOutputStreamTest.php | 32 ++++++++++++++++++++++++++++ 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5bc6d1c..9901bd9 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,4 @@ -sudo: false +os: linux language: php @@ -10,7 +10,14 @@ php: - 7.4 - nightly -matrix: +jobs: + include: + - name: macOS + os: osx + language: generic + before_install: + - curl -s http://getcomposer.org/installer | php + - mv composer.phar /usr/local/bin/composer allow_failures: - php: nightly fast_finish: true diff --git a/lib/ResourceOutputStream.php b/lib/ResourceOutputStream.php index 8bf3fdc..f10f1a5 100644 --- a/lib/ResourceOutputStream.php +++ b/lib/ResourceOutputStream.php @@ -13,7 +13,9 @@ */ final class ResourceOutputStream implements OutputStream { + /** @deprecated No longer used. */ const MAX_CONSECUTIVE_EMPTY_WRITES = 3; + const LARGE_CHUNK_SIZE = 128 * 1024; /** @var resource|null */ @@ -58,7 +60,14 @@ public function __construct($stream, int $chunkSize = null) $resource = &$this->resource; $this->watcher = Loop::onWritable($stream, static function ($watcher, $stream) use ($writes, &$chunkSize, &$writable, &$resource) { - static $emptyWrites = 0; + $firstWrite = true; + + // Using error handler to verify that a write of zero bytes was not due an error. + // @see https://github.com/reactphp/stream/pull/150 + $error = 0; + \set_error_handler(static function (int $errno) use (&$error) { + $error = $errno; + }); try { while (!$writes->isEmpty()) { @@ -97,22 +106,18 @@ public function __construct($stream, int $chunkSize = null) throw new StreamException($message); } + $written = (int) $written; // Cast potential false to 0. + // Broken pipes between processes on macOS/FreeBSD do not detect EOF properly. - if ($written === 0 || $written === false) { - if ($emptyWrites++ > self::MAX_CONSECUTIVE_EMPTY_WRITES) { - $message = "Failed to write to stream after multiple attempts"; - if ($error = \error_get_last()) { - $message .= \sprintf("; %s", $error["message"]); - } - throw new StreamException($message); + // fwrite() may write zero bytes on subsequent calls due to the buffer filling again. + if ($written === 0 && $error !== 0 && $firstWrite) { + $message = "Failed to write to stream"; + if ($error = \error_get_last()) { + $message .= \sprintf("; %s", $error["message"]); } - - $writes->unshift([$data, $previous, $deferred]); - return; + throw new StreamException($message); } - $emptyWrites = 0; - if ($length > $written) { $data = \substr($data, $written); $writes->unshift([$data, $written + $previous, $deferred]); @@ -120,6 +125,8 @@ public function __construct($stream, int $chunkSize = null) } $deferred->resolve($written + $previous); + + $firstWrite = false; } } catch (\Throwable $exception) { $resource = null; @@ -137,6 +144,8 @@ public function __construct($stream, int $chunkSize = null) if ($writes->isEmpty()) { Loop::disable($watcher); } + + \restore_error_handler(); } }); diff --git a/test/ResourceOutputStreamTest.php b/test/ResourceOutputStreamTest.php index a8bf7b1..5b0eb41 100644 --- a/test/ResourceOutputStreamTest.php +++ b/test/ResourceOutputStreamTest.php @@ -2,8 +2,10 @@ namespace Amp\ByteStream\Test; +use Amp\ByteStream\ResourceInputStream; use Amp\ByteStream\ResourceOutputStream; use Amp\ByteStream\StreamException; +use Amp\Delayed; use Amp\PHPUnit\AsyncTestCase; class ResourceOutputStreamTest extends AsyncTestCase @@ -68,6 +70,36 @@ public function testClosedRemoteSocket() // The first write still succeeds somehow... yield $stream->write("foobar"); + + // A delay seems required for the OS to realize the socket is indeed closed. + yield new Delayed(10); + yield $stream->write("foobar"); } + + /** + * @requires PHPUnit >= 7 + * + * @see https://github.com/reactphp/stream/pull/150 + */ + public function testUploadBiggerBlockSecure() + { + $size = 2 ** 18; // 256kb + + $resource = \stream_socket_client('tls://httpbin.org:443'); + + $output = new ResourceOutputStream($resource); + + $body = \str_repeat('.', $size); + + yield $output->write("POST /post HTTP/1.0\r\nHost: httpbin.org\r\nContent-Length: $size\r\n\r\n" . $body); + + $input = new ResourceInputStream($resource); + $buffer = ''; + while (null !== ($chunk = yield $input->read())) { + $buffer .= $chunk; + } + + $this->assertStringContainsString($body, $buffer); + } }