Skip to content

Commit

Permalink
Fix HTTP/2 connection shutdown to be graceful
Browse files Browse the repository at this point in the history
GOAWAY doesn't mean we should close the connection right away, but rather that we shouldn't send any new requests on that connection.

Additionally, if we experience timeouts, ping before making a new request, to ensure that non-responsive but non-idle connections are sorted out and not reused.
  • Loading branch information
kelunik committed Oct 25, 2020
1 parent 4cb01fd commit bdab581
Showing 1 changed file with 105 additions and 60 deletions.
165 changes: 105 additions & 60 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ final class Http2ConnectionProcessor implements Http2Processor
private const WINDOW_INCREMENT = 1024 * 1024;

// Milliseconds to wait for pong (PING with ACK) frame before closing the connection.
private const PONG_TIMEOUT = 500;
private const PONG_TIMEOUT = 5000;

/** @var string 64-bit for ping. */
private $counter = "aaaaaaaa";
Expand Down Expand Up @@ -113,6 +113,15 @@ final class Http2ConnectionProcessor implements Http2Processor
/** @var callable[]|null */
private $onClose = [];

/** @var int */
private $hasTimeout = false;

/** @var bool */
private $hasWriteError = false;

/** @var int|null */
private $shutdown;

public function __construct(EncryptableSocket $socket)
{
$this->socket = $socket;
Expand Down Expand Up @@ -164,6 +173,8 @@ public function onClose(callable $onClose): void

public function close(): Promise
{
$this->shutdown(new SocketException('Socket to ' . $this->socket->getRemoteAddress() . ' closed'));

$this->socket->close();

if ($this->onClose !== null) {
Expand All @@ -180,21 +191,25 @@ public function close(): Promise

public function handlePong(string $data): void
{
$this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data);
if ($this->pongDeferred === null) {
return;
}

if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
$this->pongWatcher = null;
}

$this->hasTimeout = false;

$deferred = $this->pongDeferred;
$this->pongDeferred = null;
$deferred->resolve(true);
}

public function handlePing(string $data): void
{
if ($this->pongDeferred !== null) {
if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
$this->pongWatcher = null;
}

$deferred = $this->pongDeferred;
$this->pongDeferred = null;
$deferred->resolve(true);
}
$this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data);
}

public function handleShutdown(int $lastId, int $error): void
Expand All @@ -209,7 +224,7 @@ public function handleShutdown(int $lastId, int $error): void
* @psalm-suppress DeprecatedClass
* @noinspection PhpDeprecationInspection
*/
$this->shutdown($lastId, new ClientHttp2ConnectionException($message, $error));
$this->shutdown(new ClientHttp2ConnectionException($message, $error), $lastId);
}

public function handleStreamWindowIncrement(int $streamId, int $windowSize): void
Expand Down Expand Up @@ -280,6 +295,8 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
$stream = $this->streams[$streamId];
$stream->resetInactivityWatcher();

$this->hasTimeout = false;

if ($stream->trailers) {
if ($stream->expectedLength && $stream->received !== $stream->expectedLength) {
$diff = $stream->expectedLength - $stream->received;
Expand Down Expand Up @@ -505,6 +522,8 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
);

if (!$this->streams[$streamId]->originalCancellation->isRequested()) {
$this->hasTimeout = true;

$transferTimeout = $this->streams[$streamId]->request->getTransferTimeout();

$exception = new TimeoutException(
Expand Down Expand Up @@ -769,9 +788,10 @@ public function handleConnectionException(Http2ConnectionException $exception):
* @noinspection PhpDeprecationInspection
*/
$this->shutdown(
null,
new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception)
);

$this->close();
}

public function handleData(int $streamId, string $data): void
Expand Down Expand Up @@ -902,6 +922,10 @@ public function handleStreamEnd(int $streamId): void

public function reserveStream(): void
{
if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) {
throw new \Error("Can't reserve stream after shutdown started");
}

--$this->remainingStreams;
}

Expand All @@ -914,12 +938,31 @@ public function unreserveStream(): void

public function getRemainingStreams(): int
{
if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) {
return 0;
}

return $this->remainingStreams;
}

public function request(Request $request, CancellationToken $cancellationToken, Stream $stream): Promise
{
return call(function () use ($request, $cancellationToken, $stream): \Generator {
if ($this->hasTimeout && !yield $this->ping()) {
$exception = new UnprocessedRequestException(
new SocketException(\sprintf(
"Socket to '%s' missed responding to PINGs",
(string) $this->socket->getRemoteAddress()
))
);

foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->abort($request, $exception);
}

throw $exception;
}

$this->idlePings = 0;
$this->cancelIdleWatcher();

Expand Down Expand Up @@ -1000,6 +1043,8 @@ public function request(Request $request, CancellationToken $cancellationToken,
);

if (!$originalCancellation->isRequested()) {
$this->hasTimeout = true;

$exception = new TimeoutException(
'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' ms',
0,
Expand Down Expand Up @@ -1116,7 +1161,7 @@ public function request(Request $request, CancellationToken $cancellationToken,
}

if ($exception instanceof StreamException) {
$exception = new SocketException('Failed to write request to socket: ' . $exception->getMessage());
$exception = new SocketException('Failed to write request (stream ' . $streamId . ') to socket: ' . $exception->getMessage(), 0, $exception);
}

throw $exception;
Expand Down Expand Up @@ -1161,17 +1206,24 @@ private function run(): \Generator
\assert($return === null);
}

$this->shutdown();
$this->shutdown(new ClientHttp2ConnectionException(
"The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''),
$this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN,
));

$this->close();
} catch (\Throwable $exception) {
/**
* @psalm-suppress DeprecatedClass
* @noinspection PhpDeprecationInspection
*/
$this->shutdown(null, new ClientHttp2ConnectionException(
$this->shutdown(new ClientHttp2ConnectionException(
"The HTTP/2 connection closed unexpectedly",
Http2Parser::INTERNAL_ERROR,
$exception
));

$this->close();
}
}

Expand All @@ -1184,7 +1236,14 @@ private function writeFrame(
\assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data)));

/** @noinspection PhpUnhandledExceptionInspection */
return $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data);
$promise = $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data);
$promise->onResolve(function ($error) {
if ($error) {
$this->hasWriteError = true;
}
});

return $promise;
}

private function applySetting(int $setting, int $value): void
Expand Down Expand Up @@ -1427,6 +1486,10 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo
if (!$this->streams && !$this->socket->isClosed()) {
$this->socket->unreference();
}

if (!$this->streams && $this->shutdown !== null) {
$this->close();
}
}

private function setupPingIfIdle(): void
Expand All @@ -1451,7 +1514,8 @@ private function setupPingIfIdle(): void

// Connection idle for 10 minutes
if ($this->idlePings >= 1) {
$this->shutdown();
$this->shutdown(new HttpException('Too many pending pings'));
$this->close();
return;
}

Expand Down Expand Up @@ -1490,11 +1554,21 @@ private function ping(): Promise
$this->pongDeferred = new Deferred;
$this->idlePings++;

$this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++);
$promise = $this->pongDeferred->promise();
$this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, function () {
$this->hasTimeout = false;

$deferred = $this->pongDeferred;
$this->pongDeferred = null;
$deferred->resolve(false);

$this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached'));
$this->close();
});

$this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, [$this, 'close']);
$this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++);

return $this->pongDeferred->promise();
return $promise;
}

/**
Expand All @@ -1504,30 +1578,11 @@ private function ping(): Promise
*
* @return Promise
*/
private function shutdown(?int $lastId = null, ?HttpException $reason = null): Promise
private function shutdown(HttpException $reason, ?int $lastId = null): Promise
{
if ($this->onClose === null) {
return new Success;
}

return call(function () use ($lastId, $reason) {
$code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN;
$lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0);
$goawayPromise = $this->writeFrame(
Http2Parser::GOAWAY,
Http2Parser::NO_FLAG,
0,
\pack("NN", $lastId, $code)
);

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;

foreach ($onClose as $callback) {
asyncCall($callback, $this);
}
}
$code = $reason ? $reason->getCode() : null;
$this->shutdown = $code ?? -1;

if ($this->settings !== null) {
$settings = $this->settings;
Expand All @@ -1538,25 +1593,15 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P
}

if ($this->streams) {
$reason = $reason ?? new SocketException("Connection closed");
$reason = $lastId !== null ? new UnprocessedRequestException($reason) : $reason;
foreach ($this->streams as $id => $stream) {
$this->releaseStream($id, $id > $lastId ? new UnprocessedRequestException($reason) : $reason);
}
}

if ($this->pongDeferred !== null) {
$this->pongDeferred->resolve(false);
}
if ($lastId !== null && $id <= $lastId) {
continue;
}

if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
$this->releaseStream($id, $reason);
}
}

$this->cancelIdleWatcher();

yield $goawayPromise;

$this->socket->close();
});
}

Expand Down

0 comments on commit bdab581

Please sign in to comment.