Skip to content

Commit

Permalink
Merge aed9d14 into 56fb65b
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Mar 4, 2020
2 parents 56fb65b + aed9d14 commit 5b2a41b
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 7 deletions.
28 changes: 27 additions & 1 deletion src/Connection/ConnectionLimitingPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ final class ConnectionLimitingPool implements ConnectionPool
/** @var Promise[][] */
private $connections = [];

/** @var int[] */
private $activeRequestCounts = [];

/** @var Deferred[][] */
private $waiting = [];

Expand Down Expand Up @@ -112,6 +115,9 @@ public function getStream(Request $request, CancellationToken $cancellation): Pr
/** @var Stream $stream */
[$connection, $stream] = yield from $this->getStreamFor($uri, $request, $cancellation);

$connectionId = \spl_object_id($connection);
$this->activeRequestCounts[$connectionId] = ($this->activeRequestCounts[$connectionId] ?? 0) + 1;

return HttpStream::fromStream(
$stream,
coroutine(function (Request $request, CancellationToken $cancellationToken) use (
Expand Down Expand Up @@ -170,6 +176,21 @@ private function getStreamFor(string $uri, Request $request, CancellationToken $
$stream = yield $this->getStreamFromConnection($connection, $request);

if ($stream === null) {
$connectionId = \spl_object_id($connection);

\assert(
!isset($this->activeRequestCounts[$connectionId])
|| $this->activeRequestCounts[$connectionId] >= 0
);

if (!$this->isAdditionalConnectionAllowed($uri)
&& ($this->activeRequestCounts[$connectionId] ?? 0) === 0
) {
// No additional connections allowed, but this connection is idle and unsuited for this request.
$connection->close();
break;
}

continue; // No stream available for the given request.
}

Expand Down Expand Up @@ -281,6 +302,11 @@ private function isAdditionalConnectionAllowed(string $uri): bool

private function onReadyConnection(Connection $connection, string $uri): void
{
$connectionId = \spl_object_id($connection);
if (isset($this->activeRequestCounts[$connectionId])) {
$this->activeRequestCounts[$connectionId]--;
}

if (empty($this->waiting[$uri])) {
return;
}
Expand All @@ -300,7 +326,7 @@ private function removeWaiting(string $uri, int $deferredId): void

private function dropConnection(string $uri, int $connectionId): void
{
unset($this->connections[$uri][$connectionId]);
unset($this->connections[$uri][$connectionId], $this->activeRequestCounts[$connectionId]);

if (empty($this->connections[$uri])) {
unset($this->connections[$uri], $this->waitForPriorConnection[$uri]);
Expand Down
13 changes: 7 additions & 6 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ final class Http1Connection implements Connection
private $timeoutGracePeriod;

/** @var int */
private $estimatedClose;
private $lastUsedAt;

/** @var bool */
private $explicitTimeout = false;
Expand All @@ -92,7 +92,7 @@ public function __construct(EncryptableSocket $socket, int $timeoutGracePeriod =
$this->remoteAddress = $socket->getRemoteAddress();
$this->tlsInfo = $socket->getTlsInfo();
$this->timeoutGracePeriod = $timeoutGracePeriod;
$this->estimatedClose = getCurrentTime() + self::MAX_KEEP_ALIVE_TIMEOUT * 1000;
$this->lastUsedAt = getCurrentTime();
}

public function __destruct()
Expand Down Expand Up @@ -157,7 +157,7 @@ public function getStream(Request $request): Promise
private function free(): Promise
{
$this->socket = null;
$this->estimatedClose = 0;
$this->lastUsedAt = 0;

if ($this->timeoutWatcher !== null) {
Loop::cancel($this->timeoutWatcher);
Expand All @@ -177,7 +177,7 @@ private function free(): Promise

private function hasStreamFor(Request $request): bool
{
$connectionUnlikelyToClose = $this->explicitTimeout && $this->getRemainingTime() > $this->timeoutGracePeriod;
$connectionUnlikelyToClose = $this->getRemainingTime() > $this->timeoutGracePeriod;

return !$this->busy
&& $this->socket
Expand Down Expand Up @@ -406,7 +406,6 @@ private function readResponse(
if ($timeout > 0 && $parser->getState() !== Http1Parser::BODY_IDENTITY_EOF) {
$this->timeoutWatcher = Loop::delay($timeout * 1000, [$this, 'close']);
Loop::unreference($this->timeoutWatcher);
$this->estimatedClose = getCurrentTime() + $timeout * 1000;
} else {
$this->close();
}
Expand Down Expand Up @@ -482,7 +481,9 @@ private function handleUpgradeResponse(Request $request, Response $response, str
*/
private function getRemainingTime(): int
{
return \max(0, $this->estimatedClose - getCurrentTime());
$timestamp = $this->lastUsedAt + $this->explicitTimeout ? $this->priorTimeout : $this->timeoutGracePeriod;

return \max(0, $timestamp - getCurrentTime());
}

private function withCancellation(Promise $promise, CancellationToken $cancellationToken): Promise
Expand Down

0 comments on commit 5b2a41b

Please sign in to comment.