Skip to content

Commit

Permalink
Use Request-to-key-mapper like LimitedConnectionPool
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jan 15, 2020
1 parent 52ff1e0 commit 990379e
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 28 deletions.
57 changes: 31 additions & 26 deletions src/Connection/FiniteConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ final class FiniteConnectionPool implements ConnectionPool
/** @var int */
private $openConnectionCount = 0;

/** @var callable */
private $requestToKeyMapper;

/**
* Create a connection pool that limits the number of connections per authority to $maxConnections.
*
Expand All @@ -49,18 +52,32 @@ final class FiniteConnectionPool implements ConnectionPool
*
* @return self
*/
public static function perAuthority(int $maxConnections, ?ConnectionFactory $connectionFactory = null): self
public static function byAuthority(int $maxConnections, ?ConnectionFactory $connectionFactory = null): self
{
return new self($maxConnections, $connectionFactory);
return new self($maxConnections, function (Request $request): string {
$uri = $request->getUri();
$scheme = $uri->getScheme();

$isHttps = $scheme === 'https';
$defaultPort = $isHttps ? 443 : 80;

$host = $uri->getHost();
$port = $uri->getPort() ?? $defaultPort;


$authority = $host . ':' . $port;
return $scheme . '://' . $authority;
}, $connectionFactory);
}

private function __construct(int $maxConnections, ?ConnectionFactory $connectionFactory = null)
private function __construct(int $maxConnections, callable $requestToKeyMapper, ?ConnectionFactory $connectionFactory = null)
{
if ($maxConnections < 1) {
throw new \Error('The number of max connections per authority must be greater than 0');
}

$this->maxConnections = $maxConnections;
$this->requestToKeyMapper = $requestToKeyMapper;
$this->connectionFactory = $connectionFactory ?? new DefaultConnectionFactory;
}

Expand Down Expand Up @@ -90,57 +107,45 @@ public function getOpenConnectionCount(): int
public function getStream(Request $request, CancellationToken $cancellation): Promise
{
return call(function () use ($request, $cancellation) {
$uri = $request->getUri();
$scheme = $uri->getScheme();

$isHttps = $scheme === 'https';
$defaultPort = $isHttps ? 443 : 80;

$host = $uri->getHost();
$port = $uri->getPort() ?? $defaultPort;

if ($host === '') {
throw new InvalidRequestException($request, 'A host must be provided in the request URI: ' . $uri);
}

$authority = $host . ':' . $port;
$uri = $scheme . '://' . $authority;
$key = ($this->requestToKeyMapper)($request);

/** @var Stream $stream */
$stream = yield from $this->fetchStream($uri, $isHttps, $request, $cancellation);
$stream = yield from $this->fetchStream($key, $request, $cancellation);

return HttpStream::fromStream(
$stream,
coroutine(function (Request $request, CancellationToken $cancellationToken) use (
$stream,
$uri
$key
) {
try {
/** @var Response $response */
$response = yield $stream->request($request, $cancellationToken);

// await response being completely received
$response->getTrailers()->onResolve(function () use ($uri): void {
$this->release($uri);
$response->getTrailers()->onResolve(function () use ($key): void {
$this->release($key);
});
} catch (\Throwable $e) {
$this->release($uri);
$this->release($key);
throw $e;
}

return $response;
}),
function () use ($uri): void {
$this->release($uri);
function () use ($key): void {
$this->release($key);
}
);
});
}

private function fetchStream(string $uri, bool $isHttps, Request $request, CancellationToken $cancellation): \Generator
private function fetchStream(string $uri, Request $request, CancellationToken $cancellation): \Generator
{
$this->totalStreamRequests++;

$isHttps = $request->getUri()->getScheme() === 'https';

$connections = $this->connections[$uri] ?? new \ArrayObject;

do {
Expand Down
4 changes: 2 additions & 2 deletions test/Connection/FiniteConnectionPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class FiniteConnectionPoolTest extends AsyncTestCase
public function testSingleConnection(): \Generator
{
$client = (new HttpClientBuilder)
->usingPool(FiniteConnectionPool::perAuthority(1))
->usingPool(FiniteConnectionPool::byAuthority(1))
->build();

$this->setTimeout(10000);
Expand All @@ -26,7 +26,7 @@ public function testSingleConnection(): \Generator
public function testTwoConnections(): \Generator
{
$client = (new HttpClientBuilder)
->usingPool(FiniteConnectionPool::perAuthority(2))
->usingPool(FiniteConnectionPool::byAuthority(2))
->build();

$this->setTimeout(4000);
Expand Down

0 comments on commit 990379e

Please sign in to comment.