diff --git a/src/Connection/Internal/Http2ConnectionProcessor.php b/src/Connection/Internal/Http2ConnectionProcessor.php index 312306d5..a6e19c98 100644 --- a/src/Connection/Internal/Http2ConnectionProcessor.php +++ b/src/Connection/Internal/Http2ConnectionProcessor.php @@ -54,7 +54,7 @@ final class Http2ConnectionProcessor implements Http2Processor private const WINDOW_INCREMENT = 1024 * 1024; // Milliseconds to wait for pong (PING with ACK) frame before closing the connection. - private const PONG_TIMEOUT = 500; + private const PONG_TIMEOUT = 5000; /** @var string 64-bit for ping. */ private $counter = "aaaaaaaa"; @@ -113,6 +113,15 @@ final class Http2ConnectionProcessor implements Http2Processor /** @var callable[]|null */ private $onClose = []; + /** @var int */ + private $hasTimeout = false; + + /** @var bool */ + private $hasWriteError = false; + + /** @var int|null */ + private $shutdown; + public function __construct(EncryptableSocket $socket) { $this->socket = $socket; @@ -164,6 +173,8 @@ public function onClose(callable $onClose): void public function close(): Promise { + $this->shutdown(new SocketException('Socket to ' . $this->socket->getRemoteAddress() . ' closed')); + $this->socket->close(); if ($this->onClose !== null) { @@ -180,21 +191,25 @@ public function close(): Promise public function handlePong(string $data): void { - $this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data); + if ($this->pongDeferred === null) { + return; + } + + if ($this->pongWatcher !== null) { + Loop::cancel($this->pongWatcher); + $this->pongWatcher = null; + } + + $this->hasTimeout = false; + + $deferred = $this->pongDeferred; + $this->pongDeferred = null; + $deferred->resolve(true); } public function handlePing(string $data): void { - if ($this->pongDeferred !== null) { - if ($this->pongWatcher !== null) { - Loop::cancel($this->pongWatcher); - $this->pongWatcher = null; - } - - $deferred = $this->pongDeferred; - $this->pongDeferred = null; - $deferred->resolve(true); - } + $this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data); } public function handleShutdown(int $lastId, int $error): void @@ -209,7 +224,7 @@ public function handleShutdown(int $lastId, int $error): void * @psalm-suppress DeprecatedClass * @noinspection PhpDeprecationInspection */ - $this->shutdown($lastId, new ClientHttp2ConnectionException($message, $error)); + $this->shutdown(new ClientHttp2ConnectionException($message, $error), $lastId); } public function handleStreamWindowIncrement(int $streamId, int $windowSize): void @@ -280,6 +295,8 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool $stream = $this->streams[$streamId]; $stream->resetInactivityWatcher(); + $this->hasTimeout = false; + if ($stream->trailers) { if ($stream->expectedLength && $stream->received !== $stream->expectedLength) { $diff = $stream->expectedLength - $stream->received; @@ -505,6 +522,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool ); if (!$this->streams[$streamId]->originalCancellation->isRequested()) { + $this->hasTimeout = true; + $this->ping(); // async ping, if other requests occur, they wait for it + $transferTimeout = $this->streams[$streamId]->request->getTransferTimeout(); $exception = new TimeoutException( @@ -696,6 +716,7 @@ static function () { $streamId, \pack("N", Http2Parser::CANCEL) ); + $this->releaseStream($streamId, $exception); }); @@ -769,9 +790,10 @@ public function handleConnectionException(Http2ConnectionException $exception): * @noinspection PhpDeprecationInspection */ $this->shutdown( - null, new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception) ); + + $this->close(); } public function handleData(int $streamId, string $data): void @@ -870,13 +892,14 @@ public function handleStreamEnd(int $streamId): void \assert($body !== null); - $body->complete(); $trailers = $stream->trailers; $stream->trailers = null; \assert($trailers !== null); + $body->complete(); + $trailers->resolve(call(function () use ($stream, $streamId) { try { foreach ($stream->request->getEventListeners() as $eventListener) { @@ -897,11 +920,18 @@ public function handleStreamEnd(int $streamId): void $this->setupPingIfIdle(); - $this->releaseStream($streamId); + // Stream might be cancelled right after body completion + if (isset($this->streamId[$streamId])) { + $this->releaseStream($streamId); + } } public function reserveStream(): void { + if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) { + throw new \Error("Can't reserve stream after shutdown started"); + } + --$this->remainingStreams; } @@ -914,12 +944,31 @@ public function unreserveStream(): void public function getRemainingStreams(): int { + if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) { + return 0; + } + return $this->remainingStreams; } public function request(Request $request, CancellationToken $cancellationToken, Stream $stream): Promise { return call(function () use ($request, $cancellationToken, $stream): \Generator { + if ($this->hasTimeout && !yield $this->ping()) { + $exception = new UnprocessedRequestException( + new SocketException(\sprintf( + "Socket to '%s' missed responding to PINGs", + (string) $this->socket->getRemoteAddress() + )) + ); + + foreach ($request->getEventListeners() as $eventListener) { + yield $eventListener->abort($request, $exception); + } + + throw $exception; + } + $this->idlePings = 0; $this->cancelIdleWatcher(); @@ -1000,6 +1049,9 @@ public function request(Request $request, CancellationToken $cancellationToken, ); if (!$originalCancellation->isRequested()) { + $this->hasTimeout = true; + $this->ping(); // async ping, if other requests occur, they wait for it + $exception = new TimeoutException( 'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' ms', 0, @@ -1116,7 +1168,7 @@ public function request(Request $request, CancellationToken $cancellationToken, } if ($exception instanceof StreamException) { - $exception = new SocketException('Failed to write request to socket: ' . $exception->getMessage()); + $exception = new SocketException('Failed to write request (stream ' . $streamId . ') to socket: ' . $exception->getMessage(), 0, $exception); } throw $exception; @@ -1161,17 +1213,24 @@ private function run(): \Generator \assert($return === null); } - $this->shutdown(); + $this->shutdown(new ClientHttp2ConnectionException( + "The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''), + $this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN, + )); + + $this->close(); } catch (\Throwable $exception) { /** * @psalm-suppress DeprecatedClass * @noinspection PhpDeprecationInspection */ - $this->shutdown(null, new ClientHttp2ConnectionException( - "The HTTP/2 connection closed unexpectedly", + $this->shutdown(new ClientHttp2ConnectionException( + "The HTTP/2 connection closed unexpectedly: " . $exception->getMessage(), Http2Parser::INTERNAL_ERROR, $exception )); + + $this->close(); } } @@ -1184,7 +1243,14 @@ private function writeFrame( \assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data))); /** @noinspection PhpUnhandledExceptionInspection */ - return $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data); + $promise = $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data); + $promise->onResolve(function ($error) { + if ($error) { + $this->hasWriteError = true; + } + }); + + return $promise; } private function applySetting(int $setting, int $value): void @@ -1427,6 +1493,10 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo if (!$this->streams && !$this->socket->isClosed()) { $this->socket->unreference(); } + + if (!$this->streams && $this->shutdown !== null) { + $this->close(); + } } private function setupPingIfIdle(): void @@ -1451,7 +1521,8 @@ private function setupPingIfIdle(): void // Connection idle for 10 minutes if ($this->idlePings >= 1) { - $this->shutdown(); + $this->shutdown(new HttpException('Too many pending pings')); + $this->close(); return; } @@ -1490,11 +1561,21 @@ private function ping(): Promise $this->pongDeferred = new Deferred; $this->idlePings++; - $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++); + $promise = $this->pongDeferred->promise(); + $this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, function () { + $this->hasTimeout = false; + + $deferred = $this->pongDeferred; + $this->pongDeferred = null; + $deferred->resolve(false); - $this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, [$this, 'close']); + // Shutdown connection to stop new requests, but keep it open, as other responses might still arrive + $this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached'), \max(0, $this->streamId)); + }); + + $this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++); - return $this->pongDeferred->promise(); + return $promise; } /** @@ -1504,21 +1585,11 @@ private function ping(): Promise * * @return Promise */ - private function shutdown(?int $lastId = null, ?HttpException $reason = null): Promise + private function shutdown(HttpException $reason, ?int $lastId = null): Promise { - if ($this->onClose === null) { - return new Success; - } - return call(function () use ($lastId, $reason) { - $code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN; - $lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0); - $goawayPromise = $this->writeFrame( - Http2Parser::GOAWAY, - Http2Parser::NO_FLAG, - 0, - \pack("NN", $lastId, $code) - ); + $code = $reason ? $reason->getCode() : null; + $this->shutdown = $code ?? -1; if ($this->settings !== null) { $settings = $this->settings; @@ -1529,34 +1600,15 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P } if ($this->streams) { - $reason = $reason ?? new SocketException("Connection closed"); + $reason = $lastId !== null ? new UnprocessedRequestException($reason) : $reason; foreach ($this->streams as $id => $stream) { - $this->releaseStream($id, $id > $lastId ? new UnprocessedRequestException($reason) : $reason); - } - } - - if ($this->pongDeferred !== null) { - $this->pongDeferred->resolve(false); - } - - if ($this->pongWatcher !== null) { - Loop::cancel($this->pongWatcher); - } - - $this->cancelIdleWatcher(); - - if ($this->onClose !== null) { - $onClose = $this->onClose; - $this->onClose = null; + if ($lastId !== null && $id <= $lastId) { + continue; + } - foreach ($onClose as $callback) { - asyncCall($callback, $this); + $this->releaseStream($id, $reason); } } - - yield $goawayPromise; - - $this->socket->close(); }); } @@ -1653,8 +1705,7 @@ private function createStreamInactivityWatcher(int $streamId, int $timeout): ?st $this->releaseStream( $streamId, - new TimeoutException('Inactivity timeout exceeded, more than ' - . $timeout . ' ms elapsed from last data received') + new TimeoutException("Inactivity timeout exceeded, more than {$timeout} ms elapsed from last data received") ); }); diff --git a/src/Interceptor/RetryRequests.php b/src/Interceptor/RetryRequests.php index 9b87ea24..3de28420 100644 --- a/src/Interceptor/RetryRequests.php +++ b/src/Interceptor/RetryRequests.php @@ -4,6 +4,7 @@ use Amp\CancellationToken; use Amp\Http\Client\ApplicationInterceptor; +use Amp\Http\Client\Connection\Http2ConnectionException; use Amp\Http\Client\Connection\UnprocessedRequestException; use Amp\Http\Client\DelegateHttpClient; use Amp\Http\Client\Internal\ForbidCloning; @@ -39,7 +40,7 @@ public function request( return yield $httpClient->request(clone $request, $cancellation); } catch (UnprocessedRequestException $exception) { // Request was deemed retryable by connection, so carry on. - } catch (SocketException $exception) { + } catch (SocketException | Http2ConnectionException $exception) { if (!$request->isIdempotent()) { throw $exception; }