From 12ffe9f24cd254d91ed87c62356c9a2fbea616de Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sat, 24 Oct 2020 15:19:09 +0200 Subject: [PATCH 1/5] Invoke closure callbacks before failing streams Without this, the connection doesn't get dropped early enough, so the failed streams are retried on the same connection. --- .../Internal/Http2ConnectionProcessor.php | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 312306d5..6e495e88 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -1520,6 +1520,15 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P \pack("NN", $lastId, $code) ); + if ($this->onClose !== null) { + $onClose = $this->onClose; + $this->onClose = null; + + foreach ($onClose as $callback) { + asyncCall($callback, $this); + } + } + if ($this->settings !== null) { $settings = $this->settings; $this->settings = null; @@ -1545,15 +1554,6 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P $this->cancelIdleWatcher(); - if ($this->onClose !== null) { - $onClose = $this->onClose; - $this->onClose = null; - - foreach ($onClose as $callback) { - asyncCall($callback, $this); - } - } - yield $goawayPromise; $this->socket->close(); From 4cb01fda0d16f1d910e9e878ee9d9beead479955 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sat, 24 Oct 2020 15:20:15 +0200 Subject: [PATCH 2/5] Retry idempotent requests on Http2ConnectionException --- src/Interceptor/RetryRequests.php | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Interceptor/RetryRequests.php b/src/Interceptor/RetryRequests.php index 9b87ea24..3de28420 100644 --- a/src/Interceptor/RetryRequests.php +++ b/src/Interceptor/RetryRequests.php @@ -4,6 +4,7 @@ use Amp\CancellationToken; use Amp\Http\Client\ApplicationInterceptor; +use Amp\Http\Client\Connection\Http2ConnectionException; use Amp\Http\Client\Connection\UnprocessedRequestException; use Amp\Http\Client\DelegateHttpClient; use Amp\Http\Client\Internal\ForbidCloning; @@ -39,7 +40,7 @@ public function request( return yield $httpClient->request(clone $request, $cancellation); } catch (UnprocessedRequestException $exception) { // Request was deemed retryable by connection, so carry on. - } catch (SocketException $exception) { + } catch (SocketException | Http2ConnectionException $exception) { if (!$request->isIdempotent()) { throw $exception; } From bdab581a3e49bed77a3c858181cbb57a10d870df Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sun, 25 Oct 2020 16:21:04 +0100 Subject: [PATCH 3/5] Fix HTTP/2 connection shutdown to be graceful GOAWAY doesn't mean we should close the connection right away, but rather that we shouldn't send any new requests on that connection. Additionally, if we experience timeouts, ping before making a new request, to ensure that non-responsive but non-idle connections are sorted out and not reused. --- .../Internal/Http2ConnectionProcessor.php | 165 +++++++++++------- 1 file changed, 105 insertions(+), 60 deletions(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 6e495e88..9b28a575 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -54,7 +54,7 @@ final class Http2ConnectionProcessor implements Http2Processor private const WINDOW_INCREMENT = 1024 * 1024; // Milliseconds to wait for pong (PING with ACK) frame before closing the connection. - private const PONG_TIMEOUT = 500; + private const PONG_TIMEOUT = 5000; /** @var string 64-bit for ping. */ private $counter = "aaaaaaaa"; @@ -113,6 +113,15 @@ final class Http2ConnectionProcessor implements Http2Processor /** @var callable[]|null */ private $onClose = []; + /** @var int */ + private $hasTimeout = false; + + /** @var bool */ + private $hasWriteError = false; + + /** @var int|null */ + private $shutdown; + public function __construct(EncryptableSocket $socket) { $this->socket = $socket; @@ -164,6 +173,8 @@ public function onClose(callable $onClose): void public function close(): Promise { + $this->shutdown(new SocketException('Socket to ' . $this->socket->getRemoteAddress() . ' closed')); + $this->socket->close(); if ($this->onClose !== null) { @@ -180,21 +191,25 @@ public function close(): Promise public function handlePong(string $data): void { - $this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data); + if ($this->pongDeferred === null) { + return; + } + + if ($this->pongWatcher !== null) { + Loop::cancel($this->pongWatcher); + $this->pongWatcher = null; + } + + $this->hasTimeout = false; + + $deferred = $this->pongDeferred; + $this->pongDeferred = null; + $deferred->resolve(true); } public function handlePing(string $data): void { - if ($this->pongDeferred !== null) { - if ($this->pongWatcher !== null) { - Loop::cancel($this->pongWatcher); - $this->pongWatcher = null; - } - - $deferred = $this->pongDeferred; - $this->pongDeferred = null; - $deferred->resolve(true); - } + $this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data); } public function handleShutdown(int $lastId, int $error): void @@ -209,7 +224,7 @@ public function handleShutdown(int $lastId, int $error): void * @psalm-suppress DeprecatedClass * @noinspection PhpDeprecationInspection */ - $this->shutdown($lastId, new ClientHttp2ConnectionException($message, $error)); + $this->shutdown(new ClientHttp2ConnectionException($message, $error), $lastId); } public function handleStreamWindowIncrement(int $streamId, int $windowSize): void @@ -280,6 +295,8 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream = $this->streams[$streamId]; $stream->resetInactivityWatcher(); + $this->hasTimeout = false; + if ($stream->trailers) { if ($stream->expectedLength && $stream->received !== $stream->expectedLength) { $diff = $stream->expectedLength - $stream->received; @@ -505,6 +522,8 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool ); if (!$this->streams[$streamId]->originalCancellation->isRequested()) { + $this->hasTimeout = true; + $transferTimeout = $this->streams[$streamId]->request->getTransferTimeout(); $exception = new TimeoutException( @@ -769,9 +788,10 @@ public function handleConnectionException(Http2ConnectionException $exception): * @noinspection PhpDeprecationInspection */ $this->shutdown( - null, new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception) ); + + $this->close(); } public function handleData(int $streamId, string $data): void @@ -902,6 +922,10 @@ public function handleStreamEnd(int $streamId): void public function reserveStream(): void { + if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) { + throw new \Error("Can't reserve stream after shutdown started"); + } + --$this->remainingStreams; } @@ -914,12 +938,31 @@ public function unreserveStream(): void public function getRemainingStreams(): int { + if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) { + return 0; + } + return $this->remainingStreams; } public function request(Request $request, CancellationToken $cancellationToken, Stream $stream): Promise { return call(function () use ($request, $cancellationToken, $stream): \Generator { + if ($this->hasTimeout && !yield $this->ping()) { + $exception = new UnprocessedRequestException( + new SocketException(\sprintf( + "Socket to '%s' missed responding to PINGs", + (string) $this->socket->getRemoteAddress() + )) + ); + + foreach ($request->getEventListeners() as $eventListener) { + yield $eventListener->abort($request, $exception); + } + + throw $exception; + } + $this->idlePings = 0; $this->cancelIdleWatcher(); @@ -1000,6 +1043,8 @@ public function request(Request $request, CancellationToken $cancellationToken, ); if (!$originalCancellation->isRequested()) { + $this->hasTimeout = true; + $exception = new TimeoutException( 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' ms', 0, @@ -1116,7 +1161,7 @@ public function request(Request $request, CancellationToken $cancellationToken, } if ($exception instanceof StreamException) { - $exception = new SocketException('Failed to write request to socket: ' . $exception->getMessage()); + $exception = new SocketException('Failed to write request (stream ' . $streamId . ') to socket: ' . $exception->getMessage(), 0, $exception); } throw $exception; @@ -1161,17 +1206,24 @@ private function run(): \Generator \assert($return === null); } - $this->shutdown(); + $this->shutdown(new ClientHttp2ConnectionException( + "The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''), + $this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN, + )); + + $this->close(); } catch (\Throwable $exception) { /** * @psalm-suppress DeprecatedClass * @noinspection PhpDeprecationInspection */ - $this->shutdown(null, new ClientHttp2ConnectionException( + $this->shutdown(new ClientHttp2ConnectionException( "The HTTP/2 connection closed unexpectedly", Http2Parser::INTERNAL_ERROR, $exception )); + + $this->close(); } } @@ -1184,7 +1236,14 @@ private function writeFrame( \assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data))); /** @noinspection PhpUnhandledExceptionInspection */ - return $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data); + $promise = $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data); + $promise->onResolve(function ($error) { + if ($error) { + $this->hasWriteError = true; + } + }); + + return $promise; } private function applySetting(int $setting, int $value): void @@ -1427,6 +1486,10 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo if (!$this->streams && !$this->socket->isClosed()) { $this->socket->unreference(); } + + if (!$this->streams && $this->shutdown !== null) { + $this->close(); + } } private function setupPingIfIdle(): void @@ -1451,7 +1514,8 @@ private function setupPingIfIdle(): void // Connection idle for 10 minutes if ($this->idlePings >= 1) { - $this->shutdown(); + $this->shutdown(new HttpException('Too many pending pings')); + $this->close(); return; } @@ -1490,11 +1554,21 @@ private function ping(): Promise $this->pongDeferred = new Deferred; $this->idlePings++; - $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++); + $promise = $this->pongDeferred->promise(); + $this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, function () { + $this->hasTimeout = false; + + $deferred = $this->pongDeferred; + $this->pongDeferred = null; + $deferred->resolve(false); + + $this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached')); + $this->close(); + }); - $this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, [$this, 'close']); + $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++); - return $this->pongDeferred->promise(); + return $promise; } /** @@ -1504,30 +1578,11 @@ private function ping(): Promise * * @return Promise */ - private function shutdown(?int $lastId = null, ?HttpException $reason = null): Promise + private function shutdown(HttpException $reason, ?int $lastId = null): Promise { - if ($this->onClose === null) { - return new Success; - } - return call(function () use ($lastId, $reason) { - $code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN; - $lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0); - $goawayPromise = $this->writeFrame( - Http2Parser::GOAWAY, - Http2Parser::NO_FLAG, - 0, - \pack("NN", $lastId, $code) - ); - - if ($this->onClose !== null) { - $onClose = $this->onClose; - $this->onClose = null; - - foreach ($onClose as $callback) { - asyncCall($callback, $this); - } - } + $code = $reason ? $reason->getCode() : null; + $this->shutdown = $code ?? -1; if ($this->settings !== null) { $settings = $this->settings; @@ -1538,25 +1593,15 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P } if ($this->streams) { - $reason = $reason ?? new SocketException("Connection closed"); + $reason = $lastId !== null ? new UnprocessedRequestException($reason) : $reason; foreach ($this->streams as $id => $stream) { - $this->releaseStream($id, $id > $lastId ? new UnprocessedRequestException($reason) : $reason); - } - } - - if ($this->pongDeferred !== null) { - $this->pongDeferred->resolve(false); - } + if ($lastId !== null && $id <= $lastId) { + continue; + } - if ($this->pongWatcher !== null) { - Loop::cancel($this->pongWatcher); + $this->releaseStream($id, $reason); + } } - - $this->cancelIdleWatcher(); - - yield $goawayPromise; - - $this->socket->close(); }); } From cf9c02de676d321a0ee16c7951dbf28de6e43f64 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sun, 25 Oct 2020 16:47:43 +0100 Subject: [PATCH 4/5] Start ping on timeout instead of on the next request --- src/Connection/Internal/Http2ConnectionProcessor.php | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 9b28a575..97e75f9e 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -523,6 +523,7 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool if (!$this->streams[$streamId]->originalCancellation->isRequested()) { $this->hasTimeout = true; + $this->ping(); // async ping, if other requests occur, they wait for it $transferTimeout = $this->streams[$streamId]->request->getTransferTimeout(); @@ -1044,6 +1045,7 @@ public function request(Request $request, CancellationToken $cancellationToken, if (!$originalCancellation->isRequested()) { $this->hasTimeout = true; + $this->ping(); // async ping, if other requests occur, they wait for it $exception = new TimeoutException( 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' ms', @@ -1218,7 +1220,7 @@ private function run(): \Generator * @noinspection PhpDeprecationInspection */ $this->shutdown(new ClientHttp2ConnectionException( - "The HTTP/2 connection closed unexpectedly", + "The HTTP/2 connection closed unexpectedly: " . $exception->getMessage(), Http2Parser::INTERNAL_ERROR, $exception )); @@ -1562,8 +1564,8 @@ private function ping(): Promise $this->pongDeferred = null; $deferred->resolve(false); - $this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached')); - $this->close(); + // Shutdown connection to stop new requests, but keep it open, as other responses might still arrive + $this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached'), \max(0, $this->streamId)); }); $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++); From 3385642d68f4166d275b394fbfd7f95e0b6928ca Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sun, 25 Oct 2020 17:38:32 +0100 Subject: [PATCH 5/5] Fix issues if response is cancelled just after body completes --- src/Connection/Internal/Http2ConnectionProcessor.php | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 97e75f9e..a6e19c98 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -716,6 +716,7 @@ static function () { $streamId, \pack("N", Http2Parser::CANCEL) ); + $this->releaseStream($streamId, $exception); }); @@ -891,13 +892,14 @@ public function handleStreamEnd(int $streamId): void \assert($body !== null); - $body->complete(); $trailers = $stream->trailers; $stream->trailers = null; \assert($trailers !== null); + $body->complete(); + $trailers->resolve(call(function () use ($stream, $streamId) { try { foreach ($stream->request->getEventListeners() as $eventListener) { @@ -918,7 +920,10 @@ public function handleStreamEnd(int $streamId): void $this->setupPingIfIdle(); - $this->releaseStream($streamId); + // Stream might be cancelled right after body completion + if (isset($this->streamId[$streamId])) { + $this->releaseStream($streamId); + } } public function reserveStream(): void @@ -1700,8 +1705,7 @@ private function createStreamInactivityWatcher(int $streamId, int $timeout): ?st $this->releaseStream( $streamId, - new TimeoutException('Inactivity timeout exceeded, more than ' - . $timeout . ' ms elapsed from last data received') + new TimeoutException("Inactivity timeout exceeded, more than {$timeout} ms elapsed from last data received") ); });