Skip to content

Commit

Permalink
Merge 3385642 into d012955
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Oct 25, 2020
2 parents d012955 + 3385642 commit c3ccde1
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 65 deletions.
179 changes: 115 additions & 64 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Expand Up @@ -54,7 +54,7 @@ final class Http2ConnectionProcessor implements Http2Processor
private const WINDOW_INCREMENT = 1024 * 1024;

// Milliseconds to wait for pong (PING with ACK) frame before closing the connection.
private const PONG_TIMEOUT = 500;
private const PONG_TIMEOUT = 5000;

/** @var string 64-bit for ping. */
private $counter = "aaaaaaaa";
Expand Down Expand Up @@ -113,6 +113,15 @@ final class Http2ConnectionProcessor implements Http2Processor
/** @var callable[]|null */
private $onClose = [];

/** @var int */
private $hasTimeout = false;

/** @var bool */
private $hasWriteError = false;

/** @var int|null */
private $shutdown;

public function __construct(EncryptableSocket $socket)
{
$this->socket = $socket;
Expand Down Expand Up @@ -164,6 +173,8 @@ public function onClose(callable $onClose): void

public function close(): Promise
{
$this->shutdown(new SocketException('Socket to ' . $this->socket->getRemoteAddress() . ' closed'));

$this->socket->close();

if ($this->onClose !== null) {
Expand All @@ -180,21 +191,25 @@ public function close(): Promise

public function handlePong(string $data): void
{
$this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data);
if ($this->pongDeferred === null) {
return;
}

if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
$this->pongWatcher = null;
}

$this->hasTimeout = false;

$deferred = $this->pongDeferred;
$this->pongDeferred = null;
$deferred->resolve(true);
}

public function handlePing(string $data): void
{
if ($this->pongDeferred !== null) {
if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
$this->pongWatcher = null;
}

$deferred = $this->pongDeferred;
$this->pongDeferred = null;
$deferred->resolve(true);
}
$this->writeFrame(Http2Parser::PING, Http2Parser::ACK, 0, $data);
}

public function handleShutdown(int $lastId, int $error): void
Expand All @@ -209,7 +224,7 @@ public function handleShutdown(int $lastId, int $error): void
* @psalm-suppress DeprecatedClass
* @noinspection PhpDeprecationInspection
*/
$this->shutdown($lastId, new ClientHttp2ConnectionException($message, $error));
$this->shutdown(new ClientHttp2ConnectionException($message, $error), $lastId);
}

public function handleStreamWindowIncrement(int $streamId, int $windowSize): void
Expand Down Expand Up @@ -280,6 +295,8 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
$stream = $this->streams[$streamId];
$stream->resetInactivityWatcher();

$this->hasTimeout = false;

if ($stream->trailers) {
if ($stream->expectedLength && $stream->received !== $stream->expectedLength) {
$diff = $stream->expectedLength - $stream->received;
Expand Down Expand Up @@ -505,6 +522,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
);

if (!$this->streams[$streamId]->originalCancellation->isRequested()) {
$this->hasTimeout = true;
$this->ping(); // async ping, if other requests occur, they wait for it

$transferTimeout = $this->streams[$streamId]->request->getTransferTimeout();

$exception = new TimeoutException(
Expand Down Expand Up @@ -696,6 +716,7 @@ static function () {
$streamId,
\pack("N", Http2Parser::CANCEL)
);

$this->releaseStream($streamId, $exception);
});

Expand Down Expand Up @@ -769,9 +790,10 @@ public function handleConnectionException(Http2ConnectionException $exception):
* @noinspection PhpDeprecationInspection
*/
$this->shutdown(
null,
new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception)
);

$this->close();
}

public function handleData(int $streamId, string $data): void
Expand Down Expand Up @@ -870,13 +892,14 @@ public function handleStreamEnd(int $streamId): void

\assert($body !== null);

$body->complete();

$trailers = $stream->trailers;
$stream->trailers = null;

\assert($trailers !== null);

$body->complete();

$trailers->resolve(call(function () use ($stream, $streamId) {
try {
foreach ($stream->request->getEventListeners() as $eventListener) {
Expand All @@ -897,11 +920,18 @@ public function handleStreamEnd(int $streamId): void

$this->setupPingIfIdle();

$this->releaseStream($streamId);
// Stream might be cancelled right after body completion
if (isset($this->streamId[$streamId])) {
$this->releaseStream($streamId);
}
}

public function reserveStream(): void
{
if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) {
throw new \Error("Can't reserve stream after shutdown started");
}

--$this->remainingStreams;
}

Expand All @@ -914,12 +944,31 @@ public function unreserveStream(): void

public function getRemainingStreams(): int
{
if ($this->shutdown !== null || $this->hasWriteError || $this->hasTimeout) {
return 0;
}

return $this->remainingStreams;
}

public function request(Request $request, CancellationToken $cancellationToken, Stream $stream): Promise
{
return call(function () use ($request, $cancellationToken, $stream): \Generator {
if ($this->hasTimeout && !yield $this->ping()) {
$exception = new UnprocessedRequestException(
new SocketException(\sprintf(
"Socket to '%s' missed responding to PINGs",
(string) $this->socket->getRemoteAddress()
))
);

foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->abort($request, $exception);
}

throw $exception;
}

$this->idlePings = 0;
$this->cancelIdleWatcher();

Expand Down Expand Up @@ -1000,6 +1049,9 @@ public function request(Request $request, CancellationToken $cancellationToken,
);

if (!$originalCancellation->isRequested()) {
$this->hasTimeout = true;
$this->ping(); // async ping, if other requests occur, they wait for it

$exception = new TimeoutException(
'Allowed transfer timeout exceeded, took longer than ' . $transferTimeout . ' ms',
0,
Expand Down Expand Up @@ -1116,7 +1168,7 @@ public function request(Request $request, CancellationToken $cancellationToken,
}

if ($exception instanceof StreamException) {
$exception = new SocketException('Failed to write request to socket: ' . $exception->getMessage());
$exception = new SocketException('Failed to write request (stream ' . $streamId . ') to socket: ' . $exception->getMessage(), 0, $exception);
}

throw $exception;
Expand Down Expand Up @@ -1161,17 +1213,24 @@ private function run(): \Generator
\assert($return === null);
}

$this->shutdown();
$this->shutdown(new ClientHttp2ConnectionException(
"The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''),
$this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN,
));

$this->close();
} catch (\Throwable $exception) {
/**
* @psalm-suppress DeprecatedClass
* @noinspection PhpDeprecationInspection
*/
$this->shutdown(null, new ClientHttp2ConnectionException(
"The HTTP/2 connection closed unexpectedly",
$this->shutdown(new ClientHttp2ConnectionException(
"The HTTP/2 connection closed unexpectedly: " . $exception->getMessage(),
Http2Parser::INTERNAL_ERROR,
$exception
));

$this->close();
}
}

Expand All @@ -1184,7 +1243,14 @@ private function writeFrame(
\assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data)));

/** @noinspection PhpUnhandledExceptionInspection */
return $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data);
$promise = $this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data);
$promise->onResolve(function ($error) {
if ($error) {
$this->hasWriteError = true;
}
});

return $promise;
}

private function applySetting(int $setting, int $value): void
Expand Down Expand Up @@ -1427,6 +1493,10 @@ private function releaseStream(int $streamId, ?\Throwable $exception = null): vo
if (!$this->streams && !$this->socket->isClosed()) {
$this->socket->unreference();
}

if (!$this->streams && $this->shutdown !== null) {
$this->close();
}
}

private function setupPingIfIdle(): void
Expand All @@ -1451,7 +1521,8 @@ private function setupPingIfIdle(): void

// Connection idle for 10 minutes
if ($this->idlePings >= 1) {
$this->shutdown();
$this->shutdown(new HttpException('Too many pending pings'));
$this->close();
return;
}

Expand Down Expand Up @@ -1490,11 +1561,21 @@ private function ping(): Promise
$this->pongDeferred = new Deferred;
$this->idlePings++;

$this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++);
$promise = $this->pongDeferred->promise();
$this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, function () {
$this->hasTimeout = false;

$deferred = $this->pongDeferred;
$this->pongDeferred = null;
$deferred->resolve(false);

$this->pongWatcher = Loop::delay(self::PONG_TIMEOUT, [$this, 'close']);
// Shutdown connection to stop new requests, but keep it open, as other responses might still arrive
$this->shutdown(new HttpException('PONG timeout of ' . self::PONG_TIMEOUT . 'ms reached'), \max(0, $this->streamId));
});

$this->writeFrame(Http2Parser::PING, 0, 0, $this->counter++);

return $this->pongDeferred->promise();
return $promise;
}

/**
Expand All @@ -1504,21 +1585,11 @@ private function ping(): Promise
*
* @return Promise
*/
private function shutdown(?int $lastId = null, ?HttpException $reason = null): Promise
private function shutdown(HttpException $reason, ?int $lastId = null): Promise
{
if ($this->onClose === null) {
return new Success;
}

return call(function () use ($lastId, $reason) {
$code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN;
$lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0);
$goawayPromise = $this->writeFrame(
Http2Parser::GOAWAY,
Http2Parser::NO_FLAG,
0,
\pack("NN", $lastId, $code)
);
$code = $reason ? $reason->getCode() : null;
$this->shutdown = $code ?? -1;

if ($this->settings !== null) {
$settings = $this->settings;
Expand All @@ -1529,34 +1600,15 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P
}

if ($this->streams) {
$reason = $reason ?? new SocketException("Connection closed");
$reason = $lastId !== null ? new UnprocessedRequestException($reason) : $reason;
foreach ($this->streams as $id => $stream) {
$this->releaseStream($id, $id > $lastId ? new UnprocessedRequestException($reason) : $reason);
}
}

if ($this->pongDeferred !== null) {
$this->pongDeferred->resolve(false);
}

if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
}

$this->cancelIdleWatcher();

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
if ($lastId !== null && $id <= $lastId) {
continue;
}

foreach ($onClose as $callback) {
asyncCall($callback, $this);
$this->releaseStream($id, $reason);
}
}

yield $goawayPromise;

$this->socket->close();
});
}

Expand Down Expand Up @@ -1653,8 +1705,7 @@ private function createStreamInactivityWatcher(int $streamId, int $timeout): ?st

$this->releaseStream(
$streamId,
new TimeoutException('Inactivity timeout exceeded, more than '
. $timeout . ' ms elapsed from last data received')
new TimeoutException("Inactivity timeout exceeded, more than {$timeout} ms elapsed from last data received")
);
});

Expand Down
3 changes: 2 additions & 1 deletion src/Interceptor/RetryRequests.php
Expand Up @@ -4,6 +4,7 @@

use Amp\CancellationToken;
use Amp\Http\Client\ApplicationInterceptor;
use Amp\Http\Client\Connection\Http2ConnectionException;
use Amp\Http\Client\Connection\UnprocessedRequestException;
use Amp\Http\Client\DelegateHttpClient;
use Amp\Http\Client\Internal\ForbidCloning;
Expand Down Expand Up @@ -39,7 +40,7 @@ public function request(
return yield $httpClient->request(clone $request, $cancellation);
} catch (UnprocessedRequestException $exception) {
// Request was deemed retryable by connection, so carry on.
} catch (SocketException $exception) {
} catch (SocketException | Http2ConnectionException $exception) {
if (!$request->isIdempotent()) {
throw $exception;
}
Expand Down

0 comments on commit c3ccde1

Please sign in to comment.