diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index e76b6631..8ab79629 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -439,19 +439,24 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers): voi $stream->trailers->promise() ); - $requestCompletion = $stream->requestBodyCompletion; $stream->responsePending = false; - $stream->pendingResponse->resolve(call(static function () use ($response, $requestCompletion) { - yield $requestCompletion->promise(); + $stream->pendingResponse->resolve(call(static function () use ($response, $stream) { + yield $stream->requestBodyCompletion->promise(); + + $stream->pendingResponse = null; return $response; })); - if ($this->serverWindow <= $stream->request->getBodySizeLimit() >> 1) { - $increment = $stream->request->getBodySizeLimit() - $this->serverWindow; - $this->serverWindow = $stream->request->getBodySizeLimit(); + if ($this->serverWindow <= self::MINIMUM_WINDOW) { + $this->serverWindow += self::MAX_INCREMENT; + $this->writeFrame(self::WINDOW_UPDATE, 0, 0, \pack("N", self::MAX_INCREMENT)); + } + + if ($stream->serverWindow <= self::MINIMUM_WINDOW) { + $stream->serverWindow += self::MAX_INCREMENT; - $this->writeFrame(self::WINDOW_UPDATE, 0, 0, \pack("N", $increment)); + $this->writeFrame(self::WINDOW_UPDATE, self::NO_FLAG, $streamId, \pack("N", self::MAX_INCREMENT)); } if (isset($headers["content-length"])) { @@ -610,6 +615,9 @@ static function () { $this->streams[$streamId] = $stream; + $stream->requestBodyComplete = true; + $stream->requestBodyCompletion->resolve(); + if ($parentStream->request->getPushHandler() === null) { $this->handleStreamException(new Http2StreamException("Push promise refused", $streamId, self::CANCEL)); @@ -957,6 +965,8 @@ public function request(Request $request, CancellationToken $cancellationToken, return yield $http2stream->pendingResponse->promise(); } + $responsePromise = $http2stream->pendingResponse->promise(); + $http2stream->requestBodyComplete = true; $http2stream->requestBodyCompletion->resolve(); @@ -966,10 +976,12 @@ public function request(Request $request, CancellationToken $cancellationToken, yield $eventListener->completeSendingRequest($request, $stream); } - return yield $http2stream->pendingResponse->promise(); + return yield $responsePromise; } catch (\Throwable $exception) { if (isset($this->streams[$streamId])) { - $http2stream->requestBodyCompletion->fail($exception); + if (!$http2stream->requestBodyComplete) { + $http2stream->requestBodyCompletion->fail($exception); + } $this->releaseStream($streamId, $exception); } @@ -1015,16 +1027,9 @@ private function run(): \Generator $parser = (new Http2Parser($this))->parse(); while (null !== $chunk = yield $this->socket->read()) { - $promise = $parser->send($chunk); - - \assert($promise === null || $promise instanceof Promise); - - while ($promise instanceof Promise) { - yield $promise; // Wait for promise to resolve before resuming parser and reading more data. - $promise = $parser->send(null); + $return = $parser->send($chunk); - \assert($promise === null || $promise instanceof Promise); - } + \assert($return === null); } $this->shutdown(null); @@ -1194,33 +1199,29 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo $stream = $this->streams[$streamId]; + $exception = $exception ?? new Http2StreamException( + "Stream closed unexpectedly", + $streamId, + self::INTERNAL_ERROR + ); + if ($stream->responsePending) { $stream->responsePending = false; - $stream->pendingResponse->fail($exception ?? new Http2StreamException( - "Stream closed unexpectedly", - $streamId, - self::INTERNAL_ERROR - )); + $pendingResponse = $stream->pendingResponse; + $stream->pendingResponse = null; + $pendingResponse->fail($exception); } if ($stream->body) { $body = $stream->body; $stream->body = null; - $body->fail($exception ?? new Http2StreamException( - "Stream closed unexpectedly", - $streamId, - self::INTERNAL_ERROR - )); + $body->fail($exception); } if ($stream->trailers) { $trailers = $stream->trailers; $stream->trailers = null; - $trailers->fail($exception ?? new Http2StreamException( - "Stream closed unexpectedly", - $streamId, - self::INTERNAL_ERROR - )); + $trailers->fail($exception); } unset($this->streams[$streamId]);