diff --git a/src/Connection/ConnectionLimitingPool.php b/src/Connection/ConnectionLimitingPool.php index e3936a26..9b5714b4 100644 --- a/src/Connection/ConnectionLimitingPool.php +++ b/src/Connection/ConnectionLimitingPool.php @@ -228,18 +228,18 @@ private function getStreamFor(string $uri, Request $request, CancellationToken $ $connectionPromise = $this->connectionFactory->create($request, $cancellation); - $connectionId = \spl_object_id($connectionPromise); + $promiseId = \spl_object_id($connectionPromise); $this->connections[$uri] = $this->connections[$uri] ?? new \ArrayObject; - $this->connections[$uri][$connectionId] = $connectionPromise; + $this->connections[$uri][$promiseId] = $connectionPromise; $connectionPromise->onResolve(function (?\Throwable $exception, ?Connection $connection) use ( &$deferred, $uri, - $connectionId, + $promiseId, $isHttps ): void { if ($exception) { - $this->dropConnection($uri, $connectionId); + $this->dropConnection($uri, null, $promiseId); if ($deferred !== null) { $deferred->fail($exception); // Fail Deferred so Promise\first() below fails. } @@ -248,15 +248,16 @@ private function getStreamFor(string $uri, Request $request, CancellationToken $ \assert($connection !== null); + $connectionId = \spl_object_id($connection); $this->openConnectionCount++; if ($isHttps) { $this->waitForPriorConnection[$uri] = \in_array('2', $connection->getProtocolVersions(), true); } - $connection->onClose(function () use ($uri, $connectionId): void { + $connection->onClose(function () use ($uri, $connectionId, $promiseId): void { $this->openConnectionCount--; - $this->dropConnection($uri, $connectionId); + $this->dropConnection($uri, $connectionId, $promiseId); }); }); @@ -312,7 +313,9 @@ private function onReadyConnection(Connection $connection, string $uri): void if ($this->activeRequestCounts[$connectionId] === 0) { while (\count($this->idleConnections) > 64) { // not customizable for now - $idleConnection = \array_shift($this->idleConnections); + $idleConnection = \reset($this->idleConnections); + $key = \key($this->idleConnections); + unset($this->idleConnections[$key]); $idleConnection->close(); } @@ -349,9 +352,9 @@ private function removeWaiting(string $uri, int $deferredId): void } } - private function dropConnection(string $uri, int $connectionId): void + private function dropConnection(string $uri, ?int $connectionId, int $promiseId): void { - unset($this->connections[$uri][$connectionId], $this->activeRequestCounts[$connectionId], $this->idleConnections[$connectionId]); + unset($this->connections[$uri][$promiseId], $this->activeRequestCounts[$connectionId], $this->idleConnections[$connectionId]); if ($this->connections[$uri]->count() === 0) { unset($this->connections[$uri], $this->waitForPriorConnection[$uri]); diff --git a/test/Connection/ConnectionLimitingPoolTest.php b/test/Connection/ConnectionLimitingPoolTest.php index 1110f9d4..662ce6b1 100644 --- a/test/Connection/ConnectionLimitingPoolTest.php +++ b/test/Connection/ConnectionLimitingPoolTest.php @@ -12,6 +12,8 @@ use Amp\Promise; use Amp\Socket\SocketAddress; use Amp\Success; +use function Amp\asyncCall; +use function Amp\call; class ConnectionLimitingPoolTest extends AsyncTestCase { @@ -111,4 +113,87 @@ public function testConnectionBecomingAvailableWhileConnecting(): \Generator yield [$client->request($request), $client->request($request)]; } + + private function createMockClosableConnection(Request $request): Promise + { + $content = 'open'; + $busy = false; + $closeHandlers = []; + + $stream = $this->createMock(Stream::class); + $stream->method('request') + ->willReturnCallback(static function () use (&$content, $request, &$busy) { + return call(static function () use (&$content, $request, &$busy) { + // simulate a request taking some time + yield new Delayed(500); + $busy = false; + // we can't pass this as the value to Delayed because we need to capture $content after the delay completes + return new Response('1.1', 200, null, [], new InMemoryStream($content), $request, new Success(new Trailers([]))); + }); + }); + $stream->method('getLocalAddress') + ->willReturn(new SocketAddress('127.0.0.1')); + $stream->method('getRemoteAddress') + ->willReturn(new SocketAddress('127.0.0.1')); + + $connection = $this->createMock(Connection::class); + $connection->method('getStream') + ->willReturnCallback(static function () use (&$content, $stream, &$busy) { + $result = new Delayed(1, $busy ? null : $stream); + $busy = true; + return $result; + }); + $connection->method('getProtocolVersions') + ->willReturn(['1.1', '1.0']); + $connection->expects($this->atMost(1)) + ->method('close') + ->willReturnCallback(static function () use (&$content, &$closeHandlers, $connection) { + $content = 'closed'; + foreach ($closeHandlers as $closeHandler) + asyncCall($closeHandler, $connection); + return new Success; + }); + $connection->method('onClose') + ->willReturnCallback(static function (callable $callback) use (&$closeHandlers) { + $closeHandlers[] = $callback; + }); + + return new Delayed(1, $connection); + } + + public function testConnectionNotClosedWhileInUse(): \Generator + { + $request = new Request('http://localhost'); + + $factory = $this->createMock(ConnectionFactory::class); + $factory->method('create') + ->willReturnCallback(function () use ($request) { + return $this->createMockClosableConnection($request); + }); + + $client = (new HttpClientBuilder) + ->usingPool(new UnlimitedConnectionPool($factory)) + ->build(); + + // perform some number of requests. because of the delay in creating the connection and the delay in executing + // the request, the pool will have to open a new connection for each request. + $numRequests = 66; + $promises = []; + for ($i = 0; $i < $numRequests; $i++) { + $promises[] = $client->request($request); + } + yield $promises; + + // all requests have completed and all connections are now idle. run through the connections again. + $promises = []; + for ($i = 0; $i < $numRequests; $i++) { + $promises[] = $client->request($request); + } + $responses = yield $promises; + foreach ($responses as $response) { + $data = yield $response->getBody()->buffer(); + // if $data === 'closed', the connection was closed before the request completed + $this->assertNotSame('closed', $data); + } + } }