Skip to content

Commit

Permalink
Merge e2440b4 into a78fc65
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Feb 26, 2020
2 parents a78fc65 + e2440b4 commit 511434a
Show file tree
Hide file tree
Showing 8 changed files with 540 additions and 227 deletions.
1 change: 1 addition & 0 deletions composer-require-check.json
Expand Up @@ -23,6 +23,7 @@
"Amp\\File\\Driver",
"Amp\\File\\open",
"Amp\\File\\size",
"Amp\\Http\\Client\\Connection\\LimitedConnectionPool",
"AMP_DEBUG_HTTP2_FRAMES"
],
"php-core-extensions": [
Expand Down
4 changes: 2 additions & 2 deletions examples/concurrency/2-limits-per-host.php
@@ -1,8 +1,8 @@
<?php

use Amp\CancellationToken;
use Amp\Http\Client\Connection\LimitedConnectionPool;
use Amp\Http\Client\Connection\Stream;
use Amp\Http\Client\Connection\StreamLimitingConnectionPool;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\HttpException;
Expand All @@ -19,7 +19,7 @@
Loop::run(static function () {
try {
// Limit to one concurrent request per host
$pool = LimitedConnectionPool::byHost(new UnlimitedConnectionPool, new LocalKeyedSemaphore(1));
$pool = StreamLimitingConnectionPool::byHost(new UnlimitedConnectionPool, new LocalKeyedSemaphore(1));

$logger = new class implements NetworkInterceptor {
public function requestViaNetwork(
Expand Down
308 changes: 308 additions & 0 deletions src/Connection/CountLimitingConnectionPool.php
@@ -0,0 +1,308 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Deferred;
use Amp\Http\Client\Internal\ForbidSerialization;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\MultiReasonException;
use Amp\Promise;
use Amp\Success;
use function Amp\call;
use function Amp\coroutine;

final class CountLimitingConnectionPool implements ConnectionPool
{
use ForbidSerialization;

/** @var int */
private $connectionLimit;

/** @var ConnectionFactory */
private $connectionFactory;

/** @var Promise[][] */
private $connections = [];

/** @var Deferred[][] */
private $waiting = [];

/** @var bool[] */
private $waitForPriorConnection = [];

/** @var int */
private $totalConnectionAttempts = 0;

/** @var int */
private $totalStreamRequests = 0;

/** @var int */
private $openConnectionCount = 0;

/**
* Create a connection pool that limits the number of connections per authority.
*
* @param int $connectionLimit Maximum number of connections allowed to a single authority.
* @param ConnectionFactory|null $connectionFactory
*
* @return self
*/
public static function byAuthority(int $connectionLimit, ?ConnectionFactory $connectionFactory = null): self
{
return new self($connectionLimit, $connectionFactory);
}

private static function formatUri(Request $request): string
{
$uri = $request->getUri();
$scheme = $uri->getScheme();

$isHttps = $scheme === 'https';
$defaultPort = $isHttps ? 443 : 80;

$host = $uri->getHost();
$port = $uri->getPort() ?? $defaultPort;

$authority = $host . ':' . $port;
return $scheme . '://' . $authority;
}

private function __construct(int $connectionLimit, ?ConnectionFactory $connectionFactory = null)
{
if ($connectionLimit < 1) {
throw new \Error('The connection limit must be greater than 0');
}

$this->connectionLimit = $connectionLimit;
$this->connectionFactory = $connectionFactory ?? new DefaultConnectionFactory;
}

public function __clone()
{
$this->connections = [];
$this->totalConnectionAttempts = 0;
$this->totalStreamRequests = 0;
$this->openConnectionCount = 0;
}

public function getTotalConnectionAttempts(): int
{
return $this->totalConnectionAttempts;
}

public function getTotalStreamRequests(): int
{
return $this->totalStreamRequests;
}

public function getOpenConnectionCount(): int
{
return $this->openConnectionCount;
}

public function getStream(Request $request, CancellationToken $cancellation): Promise
{
return call(function () use ($request, $cancellation) {
$this->totalStreamRequests++;

$uri = self::formatUri($request);

/** @var Stream $stream */
[$connection, $stream] = yield from $this->getStreamFor($uri, $request, $cancellation);

return HttpStream::fromStream(
$stream,
coroutine(function (Request $request, CancellationToken $cancellationToken) use (
$connection,
$stream,
$uri
) {
try {
/** @var Response $response */
$response = yield $stream->request($request, $cancellationToken);
} catch (\Throwable $e) {
$this->onReadyConnection($connection, $uri);
throw $e;
}

// await response being completely received
$response->getTrailers()->onResolve(function () use ($connection, $uri): void {
$this->onReadyConnection($connection, $uri);
});

return $response;
}),
function () use ($connection, $uri): void {
$this->onReadyConnection($connection, $uri);
}
);
});
}

private function getStreamFor(string $uri, Request $request, CancellationToken $cancellation): \Generator
{
$isHttps = $request->getUri()->getScheme() === 'https';

$connections = $this->connections[$uri] ?? new \ArrayObject;

do {
foreach ($connections as $connectionPromise) {
\assert($connectionPromise instanceof Promise);

try {
if ($isHttps && ($this->waitForPriorConnection[$uri] ?? true)) {
// Wait for first successful connection if using a secure connection (maybe we can use HTTP/2).
$connection = yield $connectionPromise;
} else {
$connection = yield Promise\first([$connectionPromise, new Success]);
if ($connection === null) {
continue;
}
}
} catch (\Exception $exception) {
continue; // Ignore cancellations and errors of other requests.
}

\assert($connection instanceof Connection);

$stream = yield $this->getStreamFromConnection($connection, $request);

if ($stream === null) {
continue; // No stream available for the given request.
}

return [$connection, $stream];
}

$deferred = new Deferred;
$deferredId = \spl_object_id($deferred);

$this->waiting[$uri][$deferredId] = $deferred;
$deferredPromise = $deferred->promise();
$deferredPromise->onResolve(function () use ($uri, $deferredId): void {
$this->removeWaiting($uri, $deferredId);
});

if ($this->isAdditionalConnectionAllowed($uri)) {
break;
}

$connection = yield $deferredPromise;

\assert($connection instanceof Connection);

$stream = yield $this->getStreamFromConnection($connection, $request);

if ($stream === null) {
continue; // Wait for a different connection to become available.
}

return [$connection, $stream];
} while (true);

$this->totalConnectionAttempts++;

$connectionPromise = $this->connectionFactory->create($request, $cancellation);

$connectionId = \spl_object_id($connectionPromise);
$this->connections[$uri] = $this->connections[$uri] ?? new \ArrayObject;
$this->connections[$uri][$connectionId] = $connectionPromise;

$connectionPromise->onResolve(function (?\Throwable $exception, ?Connection $connection) use (
&$deferred,
$uri,
$connectionId,
$isHttps
): void {
if ($exception) {
$this->dropConnection($uri, $connectionId);
if ($deferred !== null) {
$deferred->fail($exception); // Fail Deferred so Promise\first() below fails.
}
return;
}

$this->openConnectionCount++;

if ($isHttps) {
$this->waitForPriorConnection[$uri] = \in_array('2', $connection->getProtocolVersions(), true);
}

$connection->onClose(function () use ($uri, $connectionId): void {
$this->openConnectionCount--;
$this->dropConnection($uri, $connectionId);
});
});

try {
$connection = yield Promise\first([$connectionPromise, $deferredPromise]);
} catch (MultiReasonException $exception) {
[$exception] = $exception->getReasons(); // The first reason is why the connection failed.
throw $exception;
}

$deferred = null; // Null reference so connection promise handler does not double-resolve the Deferred.
$this->removeWaiting($uri, $deferredId); // Deferred no longer needed for this request.

\assert($connection instanceof Connection);

$stream = yield $this->getStreamFromConnection($connection, $request);

if ($stream === null) {
// Reused connection did not have an available stream for the given request.
$connection = yield $connectionPromise; // Wait for new connection request instead.

$stream = yield $this->getStreamFromConnection($connection, $request);

if ($stream === null) {
// Other requests used the new connection first, so we need to go around again.
return yield from $this->getStreamFor($uri, $request, $cancellation);
}
}

return [$connection, $stream];
}

private function getStreamFromConnection(Connection $connection, Request $request): Promise
{
if (!\array_intersect($request->getProtocolVersions(), $connection->getProtocolVersions())) {
return new Success; // Connection does not support any of the requested protocol versions.
}

return $connection->getStream($request);
}

private function isAdditionalConnectionAllowed(string $uri): bool
{
return \count($this->connections[$uri] ?? []) < $this->connectionLimit;
}

private function onReadyConnection(Connection $connection, string $uri): void
{
if (empty($this->waiting[$uri])) {
return;
}

$deferred = \array_shift($this->waiting[$uri]);
$deferred->resolve($connection);
}

private function removeWaiting(string $uri, int $deferredId): void
{
unset($this->waiting[$uri][$deferredId]);
if (empty($this->waiting[$uri])) {
unset($this->waiting[$uri]);
}
}

private function dropConnection(string $uri, int $connectionId): void
{
unset($this->connections[$uri][$connectionId]);

if (empty($this->connections[$uri])) {
unset($this->connections[$uri], $this->waitForPriorConnection[$uri]);
}
}
}

0 comments on commit 511434a

Please sign in to comment.