Skip to content

Commit

Permalink
ConnectionLimitingPool was mixing up connection IDs when closing conn…
Browse files Browse the repository at this point in the history
…ections
  • Loading branch information
descawed committed Nov 2, 2020
1 parent 47926d6 commit 18f7781
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 9 deletions.
21 changes: 12 additions & 9 deletions src/Connection/ConnectionLimitingPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -228,18 +228,18 @@ private function getStreamFor(string $uri, Request $request, CancellationToken $

$connectionPromise = $this->connectionFactory->create($request, $cancellation);

$connectionId = \spl_object_id($connectionPromise);
$promiseId = \spl_object_id($connectionPromise);
$this->connections[$uri] = $this->connections[$uri] ?? new \ArrayObject;
$this->connections[$uri][$connectionId] = $connectionPromise;
$this->connections[$uri][$promiseId] = $connectionPromise;

$connectionPromise->onResolve(function (?\Throwable $exception, ?Connection $connection) use (
&$deferred,
$uri,
$connectionId,
$promiseId,
$isHttps
): void {
if ($exception) {
$this->dropConnection($uri, $connectionId);
$this->dropConnection($uri, null, $promiseId);
if ($deferred !== null) {
$deferred->fail($exception); // Fail Deferred so Promise\first() below fails.
}
Expand All @@ -248,15 +248,16 @@ private function getStreamFor(string $uri, Request $request, CancellationToken $

\assert($connection !== null);

$connectionId = \spl_object_id($connection);
$this->openConnectionCount++;

if ($isHttps) {
$this->waitForPriorConnection[$uri] = \in_array('2', $connection->getProtocolVersions(), true);
}

$connection->onClose(function () use ($uri, $connectionId): void {
$connection->onClose(function () use ($uri, $connectionId, $promiseId): void {
$this->openConnectionCount--;
$this->dropConnection($uri, $connectionId);
$this->dropConnection($uri, $connectionId, $promiseId);
});
});

Expand Down Expand Up @@ -312,7 +313,9 @@ private function onReadyConnection(Connection $connection, string $uri): void

if ($this->activeRequestCounts[$connectionId] === 0) {
while (\count($this->idleConnections) > 64) { // not customizable for now
$idleConnection = \array_shift($this->idleConnections);
$idleConnection = \reset($this->idleConnections);
$key = \key($this->idleConnections);
unset($this->idleConnections[$key]);
$idleConnection->close();
}

Expand Down Expand Up @@ -349,9 +352,9 @@ private function removeWaiting(string $uri, int $deferredId): void
}
}

private function dropConnection(string $uri, int $connectionId): void
private function dropConnection(string $uri, ?int $connectionId, int $promiseId): void
{
unset($this->connections[$uri][$connectionId], $this->activeRequestCounts[$connectionId], $this->idleConnections[$connectionId]);
unset($this->connections[$uri][$promiseId], $this->activeRequestCounts[$connectionId], $this->idleConnections[$connectionId]);

if ($this->connections[$uri]->count() === 0) {
unset($this->connections[$uri], $this->waitForPriorConnection[$uri]);
Expand Down
85 changes: 85 additions & 0 deletions test/Connection/ConnectionLimitingPoolTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
use Amp\Promise;
use Amp\Socket\SocketAddress;
use Amp\Success;
use function Amp\asyncCall;
use function Amp\call;

class ConnectionLimitingPoolTest extends AsyncTestCase
{
Expand Down Expand Up @@ -111,4 +113,87 @@ public function testConnectionBecomingAvailableWhileConnecting(): \Generator

yield [$client->request($request), $client->request($request)];
}

private function createMockClosableConnection(Request $request): Promise
{
$content = 'open';
$busy = false;
$closeHandlers = [];

$stream = $this->createMock(Stream::class);
$stream->method('request')
->willReturnCallback(static function () use (&$content, $request, &$busy) {
return call(static function () use (&$content, $request, &$busy) {
// simulate a request taking some time
yield new Delayed(500);
$busy = false;
// we can't pass this as the value to Delayed because we need to capture $content after the delay completes
return new Response('1.1', 200, null, [], new InMemoryStream($content), $request, new Success(new Trailers([])));
});
});
$stream->method('getLocalAddress')
->willReturn(new SocketAddress('127.0.0.1'));
$stream->method('getRemoteAddress')
->willReturn(new SocketAddress('127.0.0.1'));

$connection = $this->createMock(Connection::class);
$connection->method('getStream')
->willReturnCallback(static function () use (&$content, $stream, &$busy) {
$result = new Delayed(1, $busy ? null : $stream);
$busy = true;
return $result;
});
$connection->method('getProtocolVersions')
->willReturn(['1.1', '1.0']);
$connection->expects($this->atMost(1))
->method('close')
->willReturnCallback(static function () use (&$content, &$closeHandlers, $connection) {
$content = 'closed';
foreach ($closeHandlers as $closeHandler)
asyncCall($closeHandler, $connection);
return new Success;
});
$connection->method('onClose')
->willReturnCallback(static function (callable $callback) use (&$closeHandlers) {
$closeHandlers[] = $callback;
});

return new Delayed(1, $connection);
}

public function testConnectionNotClosedWhileInUse(): \Generator
{
$request = new Request('http://localhost');

$factory = $this->createMock(ConnectionFactory::class);
$factory->method('create')
->willReturnCallback(function () use ($request) {
return $this->createMockClosableConnection($request);
});

$client = (new HttpClientBuilder)
->usingPool(new UnlimitedConnectionPool($factory))
->build();

// perform some number of requests. because of the delay in creating the connection and the delay in executing
// the request, the pool will have to open a new connection for each request.
$numRequests = 66;
$promises = [];
for ($i = 0; $i < $numRequests; $i++) {
$promises[] = $client->request($request);
}
yield $promises;

// all requests have completed and all connections are now idle. run through the connections again.
$promises = [];
for ($i = 0; $i < $numRequests; $i++) {
$promises[] = $client->request($request);
}
$responses = yield $promises;
foreach ($responses as $response) {
$data = yield $response->getBody()->buffer();
// if $data === 'closed', the connection was closed before the request completed
$this->assertNotSame('closed', $data);
}
}
}

0 comments on commit 18f7781

Please sign in to comment.