Skip to content

Commit

Permalink
Await request to be fully written before returning response
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Nov 30, 2019
1 parent 37a5985 commit 4e41f65
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 46 deletions.
82 changes: 39 additions & 43 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Expand Up @@ -288,7 +288,7 @@ public function handleConnectionWindowIncrement($windowSize): void
return;
}

if ($stream->buffer === '' || $stream->clientWindow <= 0) {
if ($stream->requestBodyBuffer === '' || $stream->clientWindow <= 0) {
continue;
}

Expand Down Expand Up @@ -439,9 +439,13 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers): voi
$stream->trailers->promise()
);

$pendingResponse = $stream->pendingResponse;
$stream->pendingResponse = null;
$pendingResponse->resolve($response);
$requestCompletion = $stream->requestBodyCompletion;
$stream->responsePending = false;
$stream->pendingResponse->resolve(call(static function () use ($response, $requestCompletion) {
yield $requestCompletion->promise();

return $response;
}));

if ($this->serverWindow <= $stream->request->getBodySizeLimit() >> 1) {
$increment = $stream->request->getBodySizeLimit() - $this->serverWindow;
Expand Down Expand Up @@ -734,32 +738,19 @@ public function handleData(int $streamId, string $data): void

$promise = $stream->body->emit($data);

if ($stream->serverWindow <= self::MINIMUM_WINDOW) {
$promise->onResolve(function (?\Throwable $exception) use ($streamId): void {
if ($exception || !isset($this->streams[$streamId])) {
return;
}

$stream = $this->streams[$streamId];

if ($stream->serverWindow > self::MINIMUM_WINDOW) {
return;
}

$increment = \min(
$stream->request->getBodySizeLimit() - $stream->received - $stream->serverWindow,
self::MAX_INCREMENT
);
$promise->onResolve(function (?\Throwable $exception) use ($streamId): void {
if ($exception || !isset($this->streams[$streamId])) {
return;
}

if ($increment <= 0) {
return;
}
$stream = $this->streams[$streamId];

$stream->serverWindow += $increment;
if ($stream->serverWindow <= self::MINIMUM_WINDOW) {
$stream->serverWindow += self::MAX_INCREMENT;

$this->writeFrame(self::WINDOW_UPDATE, self::NO_FLAG, $streamId, \pack("N", $increment));
});
}
$this->writeFrame(self::WINDOW_UPDATE, self::NO_FLAG, $streamId, \pack("N", self::MAX_INCREMENT));
}
});
}

public function handleSettings(array $settings): void
Expand Down Expand Up @@ -937,6 +928,9 @@ public function request(Request $request, CancellationToken $cancellationToken,
yield $eventListener->completeSendingRequest($request, $stream);
}

$http2stream->requestBodyComplete = true;
$http2stream->requestBodyCompletion->resolve();

return yield $http2stream->pendingResponse->promise();
}

Expand All @@ -963,7 +957,8 @@ public function request(Request $request, CancellationToken $cancellationToken,
return yield $http2stream->pendingResponse->promise();
}

$http2stream->bufferComplete = true;
$http2stream->requestBodyComplete = true;
$http2stream->requestBodyCompletion->resolve();

yield $this->writeData($http2stream, $buffer);

Expand All @@ -974,6 +969,8 @@ public function request(Request $request, CancellationToken $cancellationToken,
return yield $http2stream->pendingResponse->promise();
} catch (\Throwable $exception) {
if (isset($this->streams[$streamId])) {
$http2stream->requestBodyCompletion->fail($exception);

$this->releaseStream($streamId, $exception);
}

Expand Down Expand Up @@ -1071,7 +1068,7 @@ private function applySetting(int $setting, int $value): void
return;
}

if ($stream->buffer === '' || $stream->clientWindow <= 0) {
if ($stream->requestBodyBuffer === '' || $stream->clientWindow <= 0) {
continue;
}

Expand Down Expand Up @@ -1122,7 +1119,7 @@ private function applySetting(int $setting, int $value): void
private function writeBufferedData(Http2Stream $stream): Promise
{
$windowSize = \min($this->clientWindow, $stream->clientWindow);
$length = \strlen($stream->buffer);
$length = \strlen($stream->requestBodyBuffer);

if ($length <= $windowSize) {
if ($stream->windowSizeIncrease) {
Expand All @@ -1135,21 +1132,21 @@ private function writeBufferedData(Http2Stream $stream): Promise
$stream->clientWindow -= $length;

if ($length > $this->frameSizeLimit) {
$chunks = \str_split($stream->buffer, $this->frameSizeLimit);
$stream->buffer = \array_pop($chunks);
$chunks = \str_split($stream->requestBodyBuffer, $this->frameSizeLimit);
$stream->requestBodyBuffer = \array_pop($chunks);

foreach ($chunks as $chunk) {
$this->writeFrame(self::DATA, self::NO_FLAG, $stream->id, $chunk);
}
}

if ($stream->bufferComplete) {
$promise = $this->writeFrame(self::DATA, self::END_STREAM, $stream->id, $stream->buffer);
if ($stream->requestBodyComplete) {
$promise = $this->writeFrame(self::DATA, self::END_STREAM, $stream->id, $stream->requestBodyBuffer);
} else {
$promise = $this->writeFrame(self::DATA, self::NO_FLAG, $stream->id, $stream->buffer);
$promise = $this->writeFrame(self::DATA, self::NO_FLAG, $stream->id, $stream->requestBodyBuffer);
}

$stream->buffer = "";
$stream->requestBodyBuffer = "";

return $promise;
}
Expand All @@ -1162,7 +1159,7 @@ private function writeBufferedData(Http2Stream $stream): Promise
$deferred->resolve();
}

$data = $stream->buffer;
$data = $stream->requestBodyBuffer;
$end = $windowSize - $this->frameSizeLimit;

$stream->clientWindow -= $windowSize;
Expand All @@ -1179,7 +1176,7 @@ private function writeBufferedData(Http2Stream $stream): Promise
\substr($data, $off, $windowSize - $off)
);

$stream->buffer = \substr($data, $windowSize);
$stream->requestBodyBuffer = \substr($data, $windowSize);

return $promise;
}
Expand All @@ -1197,10 +1194,9 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo

$stream = $this->streams[$streamId];

if ($stream->pendingResponse) {
$pendingResponse = $stream->pendingResponse;
$stream->pendingResponse = null;
$pendingResponse->fail($exception ?? new Http2StreamException(
if ($stream->responsePending) {
$stream->responsePending = false;
$stream->pendingResponse->fail($exception ?? new Http2StreamException(
"Stream closed unexpectedly",
$streamId,
self::INTERNAL_ERROR
Expand Down Expand Up @@ -1400,7 +1396,7 @@ private function generateHeaders(Request $request): \Generator

private function writeData(Http2Stream $stream, string $data): Promise
{
$stream->buffer .= $data;
$stream->requestBodyBuffer .= $data;

return $this->writeBufferedData($stream);
}
Expand Down
13 changes: 10 additions & 3 deletions src/Connection/Internal/Http2Stream.php
Expand Up @@ -32,9 +32,12 @@ final class Http2Stream
/** @var Response|null */
public $response;

/** @var Deferred|null */
/** @var Deferred */
public $pendingResponse;

/** @var bool */
public $responsePending = true;

/** @var Emitter|null */
public $body;

Expand All @@ -54,10 +57,13 @@ final class Http2Stream
public $clientWindow;

/** @var string */
public $buffer = '';
public $requestBodyBuffer = '';

/** @var bool */
public $bufferComplete = false;
public $requestBodyComplete = false;

/** @var Deferred */
public $requestBodyCompletion;

/** @var int Integer between 1 and 256 */
public $weight = 16;
Expand Down Expand Up @@ -89,5 +95,6 @@ public function __construct(
$this->serverWindow = $serverSize;
$this->clientWindow = $clientSize;
$this->pendingResponse = new Deferred;
$this->requestBodyCompletion = new Deferred;
}
}
29 changes: 29 additions & 0 deletions test/ClientHttpBinIntegrationTest.php
Expand Up @@ -608,6 +608,35 @@ public function testHttp2SupportBody(): \Generator
$this->assertSame('2', $response->getProtocolVersion());
}

public function testHttp2SupportLargeBody(): \Generator
{
$request = new Request('https://http2.pro/api/v1', 'POST');
$request->setBody(\str_repeat(',', 256 * 1024)); // larger than initial stream window

/** @var Response $response */
$response = yield $this->client->request($request);
$body = yield $response->getBody()->buffer();
$json = \json_decode($body, true);

$this->assertSame(1, $json['http2']);
$this->assertSame('HTTP/2.0', $json['protocol']);
$this->assertSame(1, $json['push']);
$this->assertSame('2', $response->getProtocolVersion());
}

public function testHttp2SupportLargeResponseBody(): \Generator
{
$request = new Request('https://1906714720.rsc.cdn77.org/img/cdn77-test-3mb.jpg', 'GET');
$request->setTransferTimeout(100000);
$request->setBodySizeLimit(10000000000);

/** @var Response $response */
$response = yield $this->client->request($request);
yield $response->getBody()->buffer();

$this->assertSame(200, $response->getStatus());
}

public function testConcurrentSlowNetworkInterceptor(): \Generator
{
$this->givenNetworkInterceptor(new ModifyRequest(static function (Request $request) {
Expand Down

0 comments on commit 4e41f65

Please sign in to comment.