Skip to content

Commit

Permalink
Fix response control flow
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Dec 2, 2019
1 parent 4e41f65 commit 727126d
Showing 1 changed file with 34 additions and 33 deletions.
67 changes: 34 additions & 33 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Expand Up @@ -439,19 +439,24 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers): voi
$stream->trailers->promise()
);

$requestCompletion = $stream->requestBodyCompletion;
$stream->responsePending = false;
$stream->pendingResponse->resolve(call(static function () use ($response, $requestCompletion) {
yield $requestCompletion->promise();
$stream->pendingResponse->resolve(call(static function () use ($response, $stream) {
yield $stream->requestBodyCompletion->promise();

$stream->pendingResponse = null;

return $response;
}));

if ($this->serverWindow <= $stream->request->getBodySizeLimit() >> 1) {
$increment = $stream->request->getBodySizeLimit() - $this->serverWindow;
$this->serverWindow = $stream->request->getBodySizeLimit();
if ($this->serverWindow <= self::MINIMUM_WINDOW) {
$this->serverWindow += self::MAX_INCREMENT;
$this->writeFrame(self::WINDOW_UPDATE, 0, 0, \pack("N", self::MAX_INCREMENT));
}

if ($stream->serverWindow <= self::MINIMUM_WINDOW) {
$stream->serverWindow += self::MAX_INCREMENT;

$this->writeFrame(self::WINDOW_UPDATE, 0, 0, \pack("N", $increment));
$this->writeFrame(self::WINDOW_UPDATE, self::NO_FLAG, $streamId, \pack("N", self::MAX_INCREMENT));
}

if (isset($headers["content-length"])) {
Expand Down Expand Up @@ -610,6 +615,9 @@ static function () {

$this->streams[$streamId] = $stream;

$stream->requestBodyComplete = true;
$stream->requestBodyCompletion->resolve();

if ($parentStream->request->getPushHandler() === null) {
$this->handleStreamException(new Http2StreamException("Push promise refused", $streamId, self::CANCEL));

Expand Down Expand Up @@ -957,6 +965,8 @@ public function request(Request $request, CancellationToken $cancellationToken,
return yield $http2stream->pendingResponse->promise();
}

$responsePromise = $http2stream->pendingResponse->promise();

$http2stream->requestBodyComplete = true;
$http2stream->requestBodyCompletion->resolve();

Expand All @@ -966,10 +976,12 @@ public function request(Request $request, CancellationToken $cancellationToken,
yield $eventListener->completeSendingRequest($request, $stream);
}

return yield $http2stream->pendingResponse->promise();
return yield $responsePromise;
} catch (\Throwable $exception) {
if (isset($this->streams[$streamId])) {
$http2stream->requestBodyCompletion->fail($exception);
if (!$http2stream->requestBodyComplete) {
$http2stream->requestBodyCompletion->fail($exception);
}

$this->releaseStream($streamId, $exception);
}
Expand Down Expand Up @@ -1015,16 +1027,9 @@ private function run(): \Generator
$parser = (new Http2Parser($this))->parse();

while (null !== $chunk = yield $this->socket->read()) {
$promise = $parser->send($chunk);

\assert($promise === null || $promise instanceof Promise);

while ($promise instanceof Promise) {
yield $promise; // Wait for promise to resolve before resuming parser and reading more data.
$promise = $parser->send(null);
$return = $parser->send($chunk);

\assert($promise === null || $promise instanceof Promise);
}
\assert($return === null);
}

$this->shutdown(null);
Expand Down Expand Up @@ -1194,33 +1199,29 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo

$stream = $this->streams[$streamId];

$exception = $exception ?? new Http2StreamException(
"Stream closed unexpectedly",
$streamId,
self::INTERNAL_ERROR
);

if ($stream->responsePending) {
$stream->responsePending = false;
$stream->pendingResponse->fail($exception ?? new Http2StreamException(
"Stream closed unexpectedly",
$streamId,
self::INTERNAL_ERROR
));
$pendingResponse = $stream->pendingResponse;
$stream->pendingResponse = null;
$pendingResponse->fail($exception);
}

if ($stream->body) {
$body = $stream->body;
$stream->body = null;
$body->fail($exception ?? new Http2StreamException(
"Stream closed unexpectedly",
$streamId,
self::INTERNAL_ERROR
));
$body->fail($exception);
}

if ($stream->trailers) {
$trailers = $stream->trailers;
$stream->trailers = null;
$trailers->fail($exception ?? new Http2StreamException(
"Stream closed unexpectedly",
$streamId,
self::INTERNAL_ERROR
));
$trailers->fail($exception);
}

unset($this->streams[$streamId]);
Expand Down

0 comments on commit 727126d

Please sign in to comment.