From bdab581a3e49bed77a3c858181cbb57a10d870df Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sun, 25 Oct 2020 16:21:04 +0100 Subject: [PATCH] Fix HTTP/2 connection shutdown to be graceful GOAWAY doesn't mean we should close the connection right away, but rather that we shouldn't send any new requests on that connection. Additionally, if we experience timeouts, ping before making a new request, to ensure that non-responsive but non-idle connections are sorted out and not reused. --- .../Internal/Http2ConnectionProcessor.php | 165 +++++++++++------- 1 file changed, 105 insertions(+), 60 deletions(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 6e495e88..9b28a575 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -54,7 +54,7 @@ final class Http2ConnectionProcessor implements Http2Processor private const WINDOW_INCREMENT = 1024 * 1024; // Milliseconds to wait for pong (PING with ACK) frame before closing the connection. - private const PONG_TIMEOUT = 500; + private const PONG_TIMEOUT = 5000; /** @var string 64-bit for ping. */ private $counter = "aaaaaaaa"; @@ -113,6 +113,15 @@ final class Http2ConnectionProcessor implements Http2Processor /** @var callable[]|null */ private $onClose = []; + /** @var int */ + private $hasTimeout = false; + + /** @var bool */ + private $hasWriteError = false; + + /** @var int|null */ + private $shutdown; + public function __construct(EncryptableSocket $socket) { $this->socket = $socket; @@ -164,6 +173,8 @@ public function onClose(callable $onClose): void public function close(): Promise { + $this->shutdown(new SocketException('Socket to ' . $this->socket->getRemoteAddress() . ' closed')); + $this->socket->close(); if ($this->onClose !== null) { @@ -180,21 +191,25 @@ public function close(): Promise public function handlePong(string $data): void { - $this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data); + if ($this->pongDeferred === null) { + return; + } + + if ($this->pongWatcher !== null) { + Loop::cancel($this->pongWatcher); + $this->pongWatcher = null; + } + + $this->hasTimeout = false; + + $deferred = $this->pongDeferred; + $this->pongDeferred = null; + $deferred->resolve(true); } public function handlePing(string $data): void { - if ($this->pongDeferred !== null) { - if ($this->pongWatcher !== null) { - Loop::cancel($this->pongWatcher); - $this->pongWatcher = null; - } - - $deferred = $this->pongDeferred; - $this->pongDeferred = null; - $deferred->resolve(true); - } + $this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data); } public function handleShutdown(int $lastId, int $error): void @@ -209,7 +224,7 @@ public function handleShutdown(int $lastId, int $error): void * @psalm-suppress DeprecatedClass * @noinspection PhpDeprecationInspection */ - $this->shutdown($lastId, new ClientHttp2ConnectionException($message, $error)); + $this->shutdown(new ClientHttp2ConnectionException($message, $error), $lastId); } public function handleStreamWindowIncrement(int $streamId, int $windowSize): void @@ -280,6 +295,8 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream = $this->streams[$streamId]; $stream->resetInactivityWatcher(); + $this->hasTimeout = false; + if ($stream->trailers) { if ($stream->expectedLength && $stream->received !== $stream->expectedLength) { $diff = $stream->expectedLength - $stream->received; @@ -505,6 +522,8 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool ); if (!$this->streams[$streamId]->originalCancellation->isRequested()) { + $this->hasTimeout = true; + $transferTimeout = $this->streams[$streamId]->request->getTransferTimeout(); $exception = new TimeoutException( @@ -769,9 +788,10 @@ public function handleConnectionException(Http2ConnectionException $exception): * @noinspection PhpDeprecationInspection */ $this->shutdown( - null, new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception) ); + + $this->close(); } public function handleData(int $streamId, string $data): void @@ -902,6 +922,10 @@ public function handleStreamEnd(int $streamId): void public function reserveStream(): void { + if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) { + throw new \Error("Can't reserve stream after shutdown started"); + } + --$this->remainingStreams; } @@ -914,12 +938,31 @@ public function unreserveStream(): void public function getRemainingStreams(): int { + if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) { + return 0; + } + return $this->remainingStreams; } public function request(Request $request, CancellationToken $cancellationToken, Stream $stream): Promise { return call(function () use ($request, $cancellationToken, $stream): \Generator { + if ($this->hasTimeout && !yield $this->ping()) { + $exception = new UnprocessedRequestException( + new SocketException(\sprintf( + "Socket to '%s' missed responding to PINGs", + (string) $this->socket->getRemoteAddress() + )) + ); + + foreach ($request->getEventListeners() as $eventListener) { + yield $eventListener->abort($request, $exception); + } + + throw $exception; + } + $this->idlePings = 0; $this->cancelIdleWatcher(); @@ -1000,6 +1043,8 @@ public function request(Request $request, CancellationToken $cancellationToken, ); if (!$originalCancellation->isRequested()) { + $this->hasTimeout = true; + $exception = new TimeoutException( 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' ms', 0, @@ -1116,7 +1161,7 @@ public function request(Request $request, CancellationToken $cancellationToken, } if ($exception instanceof StreamException) { - $exception = new SocketException('Failed to write request to socket: ' . $exception->getMessage()); + $exception = new SocketException('Failed to write request (stream ' . $streamId . ') to socket: ' . $exception->getMessage(), 0, $exception); } throw $exception; @@ -1161,17 +1206,24 @@ private function run(): \Generator \assert($return === null); } - $this->shutdown(); + $this->shutdown(new ClientHttp2ConnectionException( + "The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''), + $this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN, + )); + + $this->close(); } catch (\Throwable $exception) { /** * @psalm-suppress DeprecatedClass * @noinspection PhpDeprecationInspection */ - $this->shutdown(null, new ClientHttp2ConnectionException( + $this->shutdown(new ClientHttp2ConnectionException( "The HTTP/2 connection closed unexpectedly", Http2Parser::INTERNAL_ERROR, $exception )); + + $this->close(); } } @@ -1184,7 +1236,14 @@ private function writeFrame( \assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data))); /** @noinspection PhpUnhandledExceptionInspection */ - return $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data); + $promise = $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data); + $promise->onResolve(function ($error) { + if ($error) { + $this->hasWriteError = true; + } + }); + + return $promise; } private function applySetting(int $setting, int $value): void @@ -1427,6 +1486,10 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo if (!$this->streams && !$this->socket->isClosed()) { $this->socket->unreference(); } + + if (!$this->streams && $this->shutdown !== null) { + $this->close(); + } } private function setupPingIfIdle(): void @@ -1451,7 +1514,8 @@ private function setupPingIfIdle(): void // Connection idle for 10 minutes if ($this->idlePings >= 1) { - $this->shutdown(); + $this->shutdown(new HttpException('Too many pending pings')); + $this->close(); return; } @@ -1490,11 +1554,21 @@ private function ping(): Promise $this->pongDeferred = new Deferred; $this->idlePings++; - $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++); + $promise = $this->pongDeferred->promise(); + $this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, function () { + $this->hasTimeout = false; + + $deferred = $this->pongDeferred; + $this->pongDeferred = null; + $deferred->resolve(false); + + $this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached')); + $this->close(); + }); - $this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, [$this, 'close']); + $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++); - return $this->pongDeferred->promise(); + return $promise; } /** @@ -1504,30 +1578,11 @@ private function ping(): Promise * * @return Promise */ - private function shutdown(?int $lastId = null, ?HttpException $reason = null): Promise + private function shutdown(HttpException $reason, ?int $lastId = null): Promise { - if ($this->onClose === null) { - return new Success; - } - return call(function () use ($lastId, $reason) { - $code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN; - $lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0); - $goawayPromise = $this->writeFrame( - Http2Parser::GOAWAY, - Http2Parser::NO_FLAG, - 0, - \pack("NN", $lastId, $code) - ); - - if ($this->onClose !== null) { - $onClose = $this->onClose; - $this->onClose = null; - - foreach ($onClose as $callback) { - asyncCall($callback, $this); - } - } + $code = $reason ? $reason->getCode() : null; + $this->shutdown = $code ?? -1; if ($this->settings !== null) { $settings = $this->settings; @@ -1538,25 +1593,15 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P } if ($this->streams) { - $reason = $reason ?? new SocketException("Connection closed"); + $reason = $lastId !== null ? new UnprocessedRequestException($reason) : $reason; foreach ($this->streams as $id => $stream) { - $this->releaseStream($id, $id > $lastId ? new UnprocessedRequestException($reason) : $reason); - } - } - - if ($this->pongDeferred !== null) { - $this->pongDeferred->resolve(false); - } + if ($lastId !== null && $id <= $lastId) { + continue; + } - if ($this->pongWatcher !== null) { - Loop::cancel($this->pongWatcher); + $this->releaseStream($id, $reason); + } } - - $this->cancelIdleWatcher(); - - yield $goawayPromise; - - $this->socket->close(); }); }