From 4e41f6583180635064fa8bb2fbe519b722b8828c Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sat, 30 Nov 2019 23:49:34 +0100 Subject: [PATCH] Await request to be fully written before returning response --- .../Internal/Http2ConnectionProcessor.php | 82 +++++++++---------- src/Connection/Internal/Http2Stream.php | 13 ++- test/ClientHttpBinIntegrationTest.php | 29 +++++++ 3 files changed, 78 insertions(+), 46 deletions(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 88e85ee2..e76b6631 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -288,7 +288,7 @@ public function handleConnectionWindowIncrement($windowSize): void return; } - if ($stream->buffer === '' || $stream->clientWindow <= 0) { + if ($stream->requestBodyBuffer === '' || $stream->clientWindow <= 0) { continue; } @@ -439,9 +439,13 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers): voi $stream->trailers->promise() ); - $pendingResponse = $stream->pendingResponse; - $stream->pendingResponse = null; - $pendingResponse->resolve($response); + $requestCompletion = $stream->requestBodyCompletion; + $stream->responsePending = false; + $stream->pendingResponse->resolve(call(static function () use ($response, $requestCompletion) { + yield $requestCompletion->promise(); + + return $response; + })); if ($this->serverWindow <= $stream->request->getBodySizeLimit() >> 1) { $increment = $stream->request->getBodySizeLimit() - $this->serverWindow; @@ -734,32 +738,19 @@ public function handleData(int $streamId, string $data): void $promise = $stream->body->emit($data); - if ($stream->serverWindow <= self::MINIMUM_WINDOW) { - $promise->onResolve(function (?\Throwable $exception) use ($streamId): void { - if ($exception || !isset($this->streams[$streamId])) { - return; - } - - $stream = $this->streams[$streamId]; - - if ($stream->serverWindow > self::MINIMUM_WINDOW) { - return; - } - - $increment = \min( - $stream->request->getBodySizeLimit() - $stream->received - $stream->serverWindow, - self::MAX_INCREMENT - ); + $promise->onResolve(function (?\Throwable $exception) use ($streamId): void { + if ($exception || !isset($this->streams[$streamId])) { + return; + } - if ($increment <= 0) { - return; - } + $stream = $this->streams[$streamId]; - $stream->serverWindow += $increment; + if ($stream->serverWindow <= self::MINIMUM_WINDOW) { + $stream->serverWindow += self::MAX_INCREMENT; - $this->writeFrame(self::WINDOW_UPDATE, self::NO_FLAG, $streamId, \pack("N", $increment)); - }); - } + $this->writeFrame(self::WINDOW_UPDATE, self::NO_FLAG, $streamId, \pack("N", self::MAX_INCREMENT)); + } + }); } public function handleSettings(array $settings): void @@ -937,6 +928,9 @@ public function request(Request $request, CancellationToken $cancellationToken, yield $eventListener->completeSendingRequest($request, $stream); } + $http2stream->requestBodyComplete = true; + $http2stream->requestBodyCompletion->resolve(); + return yield $http2stream->pendingResponse->promise(); } @@ -963,7 +957,8 @@ public function request(Request $request, CancellationToken $cancellationToken, return yield $http2stream->pendingResponse->promise(); } - $http2stream->bufferComplete = true; + $http2stream->requestBodyComplete = true; + $http2stream->requestBodyCompletion->resolve(); yield $this->writeData($http2stream, $buffer); @@ -974,6 +969,8 @@ public function request(Request $request, CancellationToken $cancellationToken, return yield $http2stream->pendingResponse->promise(); } catch (\Throwable $exception) { if (isset($this->streams[$streamId])) { + $http2stream->requestBodyCompletion->fail($exception); + $this->releaseStream($streamId, $exception); } @@ -1071,7 +1068,7 @@ private function applySetting(int $setting, int $value): void return; } - if ($stream->buffer === '' || $stream->clientWindow <= 0) { + if ($stream->requestBodyBuffer === '' || $stream->clientWindow <= 0) { continue; } @@ -1122,7 +1119,7 @@ private function applySetting(int $setting, int $value): void private function writeBufferedData(Http2Stream $stream): Promise { $windowSize = \min($this->clientWindow, $stream->clientWindow); - $length = \strlen($stream->buffer); + $length = \strlen($stream->requestBodyBuffer); if ($length <= $windowSize) { if ($stream->windowSizeIncrease) { @@ -1135,21 +1132,21 @@ private function writeBufferedData(Http2Stream $stream): Promise $stream->clientWindow -= $length; if ($length > $this->frameSizeLimit) { - $chunks = \str_split($stream->buffer, $this->frameSizeLimit); - $stream->buffer = \array_pop($chunks); + $chunks = \str_split($stream->requestBodyBuffer, $this->frameSizeLimit); + $stream->requestBodyBuffer = \array_pop($chunks); foreach ($chunks as $chunk) { $this->writeFrame(self::DATA, self::NO_FLAG, $stream->id, $chunk); } } - if ($stream->bufferComplete) { - $promise = $this->writeFrame(self::DATA, self::END_STREAM, $stream->id, $stream->buffer); + if ($stream->requestBodyComplete) { + $promise = $this->writeFrame(self::DATA, self::END_STREAM, $stream->id, $stream->requestBodyBuffer); } else { - $promise = $this->writeFrame(self::DATA, self::NO_FLAG, $stream->id, $stream->buffer); + $promise = $this->writeFrame(self::DATA, self::NO_FLAG, $stream->id, $stream->requestBodyBuffer); } - $stream->buffer = ""; + $stream->requestBodyBuffer = ""; return $promise; } @@ -1162,7 +1159,7 @@ private function writeBufferedData(Http2Stream $stream): Promise $deferred->resolve(); } - $data = $stream->buffer; + $data = $stream->requestBodyBuffer; $end = $windowSize - $this->frameSizeLimit; $stream->clientWindow -= $windowSize; @@ -1179,7 +1176,7 @@ private function writeBufferedData(Http2Stream $stream): Promise \substr($data, $off, $windowSize - $off) ); - $stream->buffer = \substr($data, $windowSize); + $stream->requestBodyBuffer = \substr($data, $windowSize); return $promise; } @@ -1197,10 +1194,9 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo $stream = $this->streams[$streamId]; - if ($stream->pendingResponse) { - $pendingResponse = $stream->pendingResponse; - $stream->pendingResponse = null; - $pendingResponse->fail($exception ?? new Http2StreamException( + if ($stream->responsePending) { + $stream->responsePending = false; + $stream->pendingResponse->fail($exception ?? new Http2StreamException( "Stream closed unexpectedly", $streamId, self::INTERNAL_ERROR @@ -1400,7 +1396,7 @@ private function generateHeaders(Request $request): \Generator private function writeData(Http2Stream $stream, string $data): Promise { - $stream->buffer .= $data; + $stream->requestBodyBuffer .= $data; return $this->writeBufferedData($stream); } diff --git a/src/Connection/Internal/Http2Stream.php b/src/Connection/Internal/Http2Stream.php index 6b027aac..f3021be1 100644 --- a/src/Connection/Internal/Http2Stream.php +++ b/src/Connection/Internal/Http2Stream.php @@ -32,9 +32,12 @@ final class Http2Stream /** @var Response|null */ public $response; - /** @var Deferred|null */ + /** @var Deferred */ public $pendingResponse; + /** @var bool */ + public $responsePending = true; + /** @var Emitter|null */ public $body; @@ -54,10 +57,13 @@ final class Http2Stream public $clientWindow; /** @var string */ - public $buffer = ''; + public $requestBodyBuffer = ''; /** @var bool */ - public $bufferComplete = false; + public $requestBodyComplete = false; + + /** @var Deferred */ + public $requestBodyCompletion; /** @var int Integer between 1 and 256 */ public $weight = 16; @@ -89,5 +95,6 @@ public function __construct( $this->serverWindow = $serverSize; $this->clientWindow = $clientSize; $this->pendingResponse = new Deferred; + $this->requestBodyCompletion = new Deferred; } } diff --git a/test/ClientHttpBinIntegrationTest.php b/test/ClientHttpBinIntegrationTest.php index 957fb680..d1372ab1 100644 --- a/test/ClientHttpBinIntegrationTest.php +++ b/test/ClientHttpBinIntegrationTest.php @@ -608,6 +608,35 @@ public function testHttp2SupportBody(): \Generator $this->assertSame('2', $response->getProtocolVersion()); } + public function testHttp2SupportLargeBody(): \Generator + { + $request = new Request('https://http2.pro/api/v1', 'POST'); + $request->setBody(\str_repeat(',', 256 * 1024)); // larger than initial stream window + + /** @var Response $response */ + $response = yield $this->client->request($request); + $body = yield $response->getBody()->buffer(); + $json = \json_decode($body, true); + + $this->assertSame(1, $json['http2']); + $this->assertSame('HTTP/2.0', $json['protocol']); + $this->assertSame(1, $json['push']); + $this->assertSame('2', $response->getProtocolVersion()); + } + + public function testHttp2SupportLargeResponseBody(): \Generator + { + $request = new Request('https://1906714720.rsc.cdn77.org/img/cdn77-test-3mb.jpg', 'GET'); + $request->setTransferTimeout(100000); + $request->setBodySizeLimit(10000000000); + + /** @var Response $response */ + $response = yield $this->client->request($request); + yield $response->getBody()->buffer(); + + $this->assertSame(200, $response->getStatus()); + } + public function testConcurrentSlowNetworkInterceptor(): \Generator { $this->givenNetworkInterceptor(new ModifyRequest(static function (Request $request) {