Skip to content

Commit

Permalink
Add liveliness check to Connection interface
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Oct 10, 2019
1 parent a1566cd commit 1075298
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 25 deletions.
8 changes: 7 additions & 1 deletion src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,13 @@ public function getProtocolVersions(): array;
/**
* @return bool True if a stream is still available, false if the connection is completely busy.
*/
public function isBusy(): bool;
public function hasStreamAvailable(): bool;

/**
* @return Promise<bool> True if the connection is safe to use for a new request, false if a new connection should
* be opened.
*/
public function checkLiveliness(): Promise;

public function close(): Promise;

Expand Down
17 changes: 5 additions & 12 deletions src/Connection/DefaultConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -100,23 +100,16 @@ public function getStream(Request $request, CancellationToken $cancellation): Pr

\assert($connection instanceof Connection);

if ($connection->isBusy()) {
if ($connection->hasStreamAvailable()) {
continue; // Connection is currently used to full capacity.
}

if (!\array_intersect($request->getProtocolVersions(), $connection->getProtocolVersions())) {
continue; // Connection does not support any of the requested protocol versions.
}

if ($connection instanceof Http2Connection && $connection->isIdle() && !(yield $connection->ping())) {
continue; // Connection closed while idle.
}

if ($connection instanceof Http1Connection
&& $connection->getRemainingTime() < $this->timeoutGracePeriod
&& !$request->isIdempotent()
) {
continue; // Connection is at high-risk of closing before the request can be sent.
if (!yield $connection->checkLiveliness()) {
continue;
}

return $connection->getStream($request);
Expand Down Expand Up @@ -194,7 +187,7 @@ private function createConnection(
}

if (!$isHttps) {
return new Http1Connection($socket);
return new Http1Connection($socket, $this->timeoutGracePeriod);
}

try {
Expand Down Expand Up @@ -253,7 +246,7 @@ private function createConnection(
);
}

return new Http1Connection($socket);
return new Http1Connection($socket, $this->timeoutGracePeriod);
}

private function dropConnection(string $uri, string $connectionHash): void
Expand Down
17 changes: 13 additions & 4 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,21 @@ final class Http1Connection implements Connection
/** @var callable[]|null */
private $onClose = [];

/** @var int */
private $timeoutGracePeriod;

/** @var int */
private $estimatedClose;

public function __construct(Socket $socket)
public function __construct(Socket $socket, int $timeoutGracePeriod)
{
$this->socket = $socket;

if ($this->socket->isClosed()) {
$this->onClose = null;
}

$this->timeoutGracePeriod = $timeoutGracePeriod;
$this->estimatedClose = (int) ((\microtime(true) + self::MAX_KEEP_ALIVE_TIMEOUT) * 1000);
}

Expand All @@ -80,9 +84,14 @@ public function __destruct()
$this->close();
}

public function isBusy(): bool
public function hasStreamAvailable(): bool
{
return !$this->busy && !$this->socket->isClosed();
}

public function checkLiveliness(): Promise
{
return $this->busy || $this->socket->isClosed();
return new Success($this->getRemainingTime() > $this->timeoutGracePeriod);
}

public function onClose(callable $onClose): void
Expand Down Expand Up @@ -343,7 +352,7 @@ private function readResponse(
/**
* @return int Approximate number of milliseconds remaining until the connection is closed.
*/
public function getRemainingTime(): int
private function getRemainingTime(): int
{
return (int) \max(0, $this->estimatedClose - \microtime(true) * 1000);
}
Expand Down
25 changes: 17 additions & 8 deletions src/Connection/Http2Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public function getStream(Request $request): Stream
/**
* @return Promise<bool> Fulfilled with true if a pong is received within the timeout, false if none is received.
*/
public function ping(): Promise
private function ping(): Promise
{
if ($this->onClose === null) {
return new Success(false);
Expand All @@ -248,14 +248,23 @@ public function ping(): Promise
return $this->pongDeferred->promise();
}

public function isIdle(): bool
private function isIdle(): bool
{
return empty($this->streams) && $this->onClose !== null;
}

public function isBusy(): bool
public function hasStreamAvailable(): bool
{
return $this->remainingStreams <= 0 || $this->onClose === null;
return $this->remainingStreams > 0 && $this->onClose !== null;
}

public function checkLiveliness(): Promise
{
if (!$this->isIdle()) {
return new Success(true);
}

return $this->ping();
}

public function onClose(callable $onClose): void
Expand Down Expand Up @@ -1489,6 +1498,10 @@ private function shutdown(?int $lastId = null, ?\Throwable $reason = null): Prom
$this->pongDeferred->resolve(false);
}

if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
}

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
Expand All @@ -1498,10 +1511,6 @@ private function shutdown(?int $lastId = null, ?\Throwable $reason = null): Prom
}
}

if ($this->pongWatcher !== null) {
Loop::cancel($this->pongWatcher);
}

yield $promise;

$this->socket->close();
Expand Down

0 comments on commit 1075298

Please sign in to comment.