Skip to content

Commit

Permalink
Tally active requests in pool to close unsuited, idle connections
Browse files Browse the repository at this point in the history
Fixes #256.
  • Loading branch information
trowski committed Mar 4, 2020
1 parent cfc38c8 commit 457a7d4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 11 deletions.
21 changes: 20 additions & 1 deletion src/Connection/ConnectionLimitingPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ final class ConnectionLimitingPool implements ConnectionPool
/** @var Promise[][] */
private $connections = [];

/** @var int[] */
private $activeRequestCounts = [];

/** @var Deferred[][] */
private $waiting = [];

Expand Down Expand Up @@ -112,6 +115,9 @@ public function getStream(Request $request, CancellationToken $cancellation): Pr
/** @var Stream $stream */
[$connection, $stream] = yield from $this->getStreamFor($uri, $request, $cancellation);

$connectionId = \spl_object_id($connection);
$this->activeRequestCounts[$connectionId] = ($this->activeRequestCounts[$connectionId] ?? 0) + 1;

return HttpStream::fromStream(
$stream,
coroutine(function (Request $request, CancellationToken $cancellationToken) use (
Expand Down Expand Up @@ -170,6 +176,14 @@ private function getStreamFor(string $uri, Request $request, CancellationToken $
$stream = yield $this->getStreamFromConnection($connection, $request);

if ($stream === null) {
if (!$this->isAdditionalConnectionAllowed($uri)
&& $this->activeRequestCounts[\spl_object_id($connection)] === 0
) {
// No additional connections allowed, but this connection is idle and unsuited for this request.
$connection->close();
break;
}

continue; // No stream available for the given request.
}

Expand Down Expand Up @@ -281,6 +295,11 @@ private function isAdditionalConnectionAllowed(string $uri): bool

private function onReadyConnection(Connection $connection, string $uri): void
{
$connectionId = \spl_object_id($connection);
if (isset($this->activeRequestCounts[$connectionId])) {
$this->activeRequestCounts[$connectionId]--;
}

if (empty($this->waiting[$uri])) {
return;
}
Expand All @@ -300,7 +319,7 @@ private function removeWaiting(string $uri, int $deferredId): void

private function dropConnection(string $uri, int $connectionId): void
{
unset($this->connections[$uri][$connectionId]);
unset($this->connections[$uri][$connectionId], $this->activeRequestCounts[$connectionId]);

if (empty($this->connections[$uri])) {
unset($this->connections[$uri], $this->waitForPriorConnection[$uri]);
Expand Down
14 changes: 4 additions & 10 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ final class Http1Connection implements Connection
/** @var string|null Keep alive timeout watcher ID. */
private $timeoutWatcher;

/** @var int Keep-Alive timeout from last response. */
private $priorTimeout = self::MAX_KEEP_ALIVE_TIMEOUT;
/** @var int Keep-Alive timeout from last response, initially the timeout grace period. */
private $priorTimeout;

/** @var callable[]|null */
private $onClose = [];
Expand All @@ -73,9 +73,6 @@ final class Http1Connection implements Connection
/** @var int */
private $estimatedClose;

/** @var bool */
private $explicitTimeout = false;

/** @var SocketAddress */
private $localAddress;

Expand All @@ -91,7 +88,7 @@ public function __construct(EncryptableSocket $socket, int $timeoutGracePeriod =
$this->localAddress = $socket->getLocalAddress();
$this->remoteAddress = $socket->getRemoteAddress();
$this->tlsInfo = $socket->getTlsInfo();
$this->timeoutGracePeriod = $timeoutGracePeriod;
$this->priorTimeout = $this->timeoutGracePeriod = $timeoutGracePeriod;
$this->estimatedClose = getCurrentTime() + self::MAX_KEEP_ALIVE_TIMEOUT * 1000;
}

Expand Down Expand Up @@ -177,7 +174,7 @@ private function free(): Promise

private function hasStreamFor(Request $request): bool
{
$connectionUnlikelyToClose = $this->explicitTimeout && $this->getRemainingTime() > $this->timeoutGracePeriod;
$connectionUnlikelyToClose = $this->getRemainingTime() > $this->timeoutGracePeriod;

return !$this->busy
&& $this->socket
Expand Down Expand Up @@ -540,9 +537,6 @@ private function determineKeepAliveTimeout(Response $response): int
$params = Http\createFieldValueComponentMap(Http\parseFieldValueComponents($response, 'keep-alive'));

$timeout = (int) ($params['timeout'] ?? $this->priorTimeout);
if (isset($params['timeout'])) {
$this->explicitTimeout = true;
}

return $this->priorTimeout = \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT);
}
Expand Down

0 comments on commit 457a7d4

Please sign in to comment.