From 0f018b0f1d0509dff03ae0b80a5e67f596772536 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 16 Jun 2019 13:27:50 -0500 Subject: [PATCH 1/2] Refactor from socket pool to stateful connection pool --- src/ClientBuilder.php | 12 +- src/Connection/Connection.php | 29 +++ src/Connection/ConnectionPool.php | 28 +++ src/Connection/DefaultConnectionPool.php | 41 +++++ .../Http1Connection.php} | 66 ++++--- src/{Driver => Connection}/Http1Parser.php | 2 +- src/ConnectionInfo.php | 11 ++ src/Driver/DefaultHttpDriverFactory.php | 19 -- src/Driver/HttpDriver.php | 23 --- src/Driver/HttpDriverFactory.php | 22 --- src/SocketClient.php | 168 ++++++++---------- test/ParserTest.php | 2 +- test/TimeoutTest.php | 10 +- 13 files changed, 234 insertions(+), 199 deletions(-) create mode 100644 src/Connection/Connection.php create mode 100644 src/Connection/ConnectionPool.php create mode 100644 src/Connection/DefaultConnectionPool.php rename src/{Driver/Http1Driver.php => Connection/Http1Connection.php} (87%) rename src/{Driver => Connection}/Http1Parser.php (99%) delete mode 100644 src/Driver/DefaultHttpDriverFactory.php delete mode 100644 src/Driver/HttpDriver.php delete mode 100644 src/Driver/HttpDriverFactory.php diff --git a/src/ClientBuilder.php b/src/ClientBuilder.php index ec247014..3585b094 100644 --- a/src/ClientBuilder.php +++ b/src/ClientBuilder.php @@ -3,18 +3,18 @@ namespace Amp\Http\Client; use Amp\Http\Client\Internal\ApplicationInterceptorClient; -use Amp\Socket\SocketPool; -use Amp\Socket\UnlimitedSocketPool; +use Amp\Socket\Connector; +use Amp\Socket\DnsConnector; final class ClientBuilder { - private $socketPool; + private $connector; private $networkInterceptors = []; private $applicationInterceptors = []; - public function __construct(?SocketPool $socketPool = null) + public function __construct(?Connector $connector = null) { - $this->socketPool = $socketPool ?? new UnlimitedSocketPool; + $this->connector = $connector ?? new DnsConnector; } public function addNetworkInterceptor(NetworkInterceptor $networkInterceptor): self @@ -33,7 +33,7 @@ public function addApplicationInterceptor(ApplicationInterceptor $applicationInt public function build(): Client { - $client = new SocketClient($this->socketPool); + $client = new SocketClient($this->connector); foreach ($this->networkInterceptors as $networkInterceptor) { $client->addNetworkInterceptor($networkInterceptor); } diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php new file mode 100644 index 00000000..e83444ff --- /dev/null +++ b/src/Connection/Connection.php @@ -0,0 +1,29 @@ + + */ + public function request(Request $request, ?CancellationToken $token = null): Promise; + + /** + * @return ConnectionInfo + */ + public function getConnectionInfo(): ConnectionInfo; + + public function isClosed(): bool; + + public function close(): Promise; +} diff --git a/src/Connection/ConnectionPool.php b/src/Connection/ConnectionPool.php new file mode 100644 index 00000000..5f08383c --- /dev/null +++ b/src/Connection/ConnectionPool.php @@ -0,0 +1,28 @@ +connections[$socket->getRemoteAddress()->toString()] = $connection; + return $connection; + } + + public function getConnection(Request $request): ?Connection + { + $uri = $request->getUri(); + + $address = $uri->getHost(); + $port = $uri->getPort(); + + if ($port !== null) { + $address .= ':' . $port; + } + + if (isset($this->connections[$address])) { + return $this->connections[$address]; + } + + return null; + } + + public function getApplicationLayerProtocols(): array + { + return ['http/1.1']; + } +} diff --git a/src/Driver/Http1Driver.php b/src/Connection/Http1Connection.php similarity index 87% rename from src/Driver/Http1Driver.php rename to src/Connection/Http1Connection.php index afb9c09d..e3049cea 100644 --- a/src/Driver/Http1Driver.php +++ b/src/Connection/Http1Connection.php @@ -1,6 +1,6 @@ socket = $socket; + $this->connectionInfo = ConnectionInfo::fromSocket($socket); + } + + public function getConnectionInfo(): ConnectionInfo + { + return $this->connectionInfo; + } + + public function isClosed(): bool + { + return $this->socket->isClosed(); + } + + public function close(): Promise + { + $this->socket->close(); + return new Success; + } + /** @inheritdoc */ - public function request( - Socket $socket, - ConnectionInfo $connectionInfo, - Request $request, - ?CancellationToken $cancellation = null - ): Promise { - return call(function () use ($request, $socket, $connectionInfo, $cancellation) { + public function request(Request $request, ?CancellationToken $cancellation = null): Promise { + return call(function () use ($request, $cancellation) { $cancellation = $cancellation ?? new NullCancellationToken; /** @var Request $request */ @@ -58,18 +78,16 @@ public function request( $readingCancellation = $cancellation; } - $cancellationId = $readingCancellation->subscribe(static function () use ($socket) { - $socket->close(); - }); + $cancellationId = $readingCancellation->subscribe([$this->socket, 'close']); $completionDeferred->promise()->onResolve(static function () use ($readingCancellation, $cancellationId) { $readingCancellation->unsubscribe($cancellationId); }); try { - yield RequestWriter::writeRequest($socket, $request, $protocolVersion); + yield RequestWriter::writeRequest($this->socket, $request, $protocolVersion); - return yield from $this->doRead($socket, $request, $connectionInfo, $cancellation, $readingCancellation, $completionDeferred); + return yield from $this->doRead($request, $cancellation, $readingCancellation, $completionDeferred); } catch (HttpException $e) { $cancellation->throwIfRequested(); @@ -98,9 +116,7 @@ private function buildRequest(Request $request): \Generator } /** - * @param Socket $socket * @param Request $request - * @param ConnectionInfo $connectionInfo * @param CancellationToken $originalCancellation * @param CancellationToken $readingCancellation * @param Deferred $completionDeferred @@ -111,9 +127,7 @@ private function buildRequest(Request $request): \Generator * @throws CancelledException */ private function doRead( - Socket $socket, Request $request, - ConnectionInfo $connectionInfo, CancellationToken $originalCancellation, CancellationToken $readingCancellation, Deferred $completionDeferred @@ -127,10 +141,10 @@ private function doRead( $backpressure = $bodyEmitter->emit($data); }; - $parser = new Http1Parser($request, $connectionInfo, $bodyCallback); + $parser = new Http1Parser($request, $this->connectionInfo, $bodyCallback); try { - while (null !== $chunk = yield $socket->read()) { + while (null !== $chunk = yield $this->socket->read()) { $response = $parser->parse($chunk); if ($response === null) { continue; @@ -138,9 +152,7 @@ private function doRead( $bodyCancellationSource = new CancellationTokenSource; $bodyCancellationToken = new CombinedCancellationToken($readingCancellation, $bodyCancellationSource->getToken()); - $bodyCancellationToken->subscribe(static function () use ($socket) { - $socket->close(); - }); + $bodyCancellationToken->subscribe([$this->socket, 'close']); $response = $response ->withBody(new ResponseBodyStream(new IteratorStream($bodyEmitter->iterate()), $bodyCancellationSource)) @@ -156,7 +168,6 @@ private function doRead( $originalCancellation, $readingCancellation, $bodyCancellationToken, - $socket, &$backpressure ) { try { @@ -177,7 +188,7 @@ private function doRead( if (!$backpressure instanceof Success) { yield $this->withCancellation($backpressure, $bodyCancellationToken); } - } while (null !== $chunk = yield $socket->read()); + } while (null !== $chunk = yield $this->socket->read()); $originalCancellation->throwIfRequested(); @@ -194,13 +205,13 @@ private function doRead( } if ($this->shouldCloseSocketAfterResponse($response) || $parser->getState() === Http1Parser::BODY_IDENTITY_EOF) { - $socket->close(); + $this->socket->close(); } $bodyEmitter->complete(); $completionDeferred->resolve(); } catch (\Throwable $e) { - $socket->close(); + $this->socket->close(); $bodyEmitter->fail($e); $completionDeferred->fail($e); @@ -300,7 +311,6 @@ private function normalizeTraceRequest(Request $request): Request return $request; } - private function shouldCloseSocketAfterResponse(Response $response): bool { $request = $response->getRequest(); diff --git a/src/Driver/Http1Parser.php b/src/Connection/Http1Parser.php similarity index 99% rename from src/Driver/Http1Parser.php rename to src/Connection/Http1Parser.php index 6872cfba..2a45f994 100644 --- a/src/Driver/Http1Parser.php +++ b/src/Connection/Http1Parser.php @@ -1,6 +1,6 @@ getLocalAddress(), + $socket->getRemoteAddress(), + $socket instanceof EncryptableSocket ? $socket->getTlsInfo() : null + ); + } + public function __construct(SocketAddress $localAddress, SocketAddress $remoteAddress, ?TlsInfo $tlsInfo = null) { $this->localAddress = $localAddress; diff --git a/src/Driver/DefaultHttpDriverFactory.php b/src/Driver/DefaultHttpDriverFactory.php deleted file mode 100644 index d01d8587..00000000 --- a/src/Driver/DefaultHttpDriverFactory.php +++ /dev/null @@ -1,19 +0,0 @@ - - */ - public function request(Socket $socket, ConnectionInfo $connectionInfo, Request $request, ?CancellationToken $token = null): Promise; -} diff --git a/src/Driver/HttpDriverFactory.php b/src/Driver/HttpDriverFactory.php deleted file mode 100644 index 5d916f1e..00000000 --- a/src/Driver/HttpDriverFactory.php +++ /dev/null @@ -1,22 +0,0 @@ -socketPool = $socketPool ?? new UnlimitedSocketPool; - $this->driverFactory = $driverFactory ?? new Driver\DefaultHttpDriverFactory; + $this->connector = $connector ?? new DnsConnector; + $this->connectionPool = $driverFactory ?? new Connection\DefaultConnectionPool; $this->networkInterceptors = []; } @@ -48,104 +48,82 @@ public function request(Request $request, CancellationToken $cancellation = null return call(function () use ($request, $cancellation) { $cancellation = $cancellation ?? new NullCancellationToken; - $isHttps = $request->getUri()->getScheme() === 'https'; - $defaultPort = $isHttps ? 443 : 80; + $request = $this->normalizeRequestHeaders($request); - $authority = $request->getUri()->getHost() . ':' . ($request->getUri()->getPort() ?: $defaultPort); - $socketUri = "tcp://{$authority}"; + $connection = $this->connectionPool->getConnection($request); - $connectContext = new ConnectContext; + if ($connection === null) { + $socket = yield from $this->createSocket($request, $cancellation); + $connection = $this->connectionPool->createConnection($socket, $request); + } - if ($request->getUri()->getScheme() === 'https') { - $tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost())) - ->withPeerName($request->getUri()->getHost()) - ->withApplicationLayerProtocols($this->driverFactory->getApplicationLayerProtocols()) - ->withPeerCapturing(); + $client = new CallableNetworkClient(function () use ($connection, $request, $cancellation): Promise { + return $connection->request($request, $cancellation); + }); - $connectContext = $connectContext->withTlsContext($tlsContext); - } + $client = new NetworkInterceptorClient($client, $connection->getConnectionInfo(), ...$this->networkInterceptors); + + return yield $client->request($request, $cancellation); + }); + } - try { - $checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout())); + private function createSocket(Request $request, CancellationToken $cancellation): \Generator + { + $isHttps = $request->getUri()->getScheme() === 'https'; + $defaultPort = $isHttps ? 443 : 80; - /** @var EncryptableSocket $socket */ - $socket = yield $this->socketPool->checkout($socketUri, $connectContext, $checkoutCancellationToken); - } catch (SocketException $e) { - throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e); - } catch (CancelledException $e) { - // In case of a user cancellation request, throw the expected exception - $cancellation->throwIfRequested(); + $authority = $request->getUri()->getHost() . ':' . ($request->getUri()->getPort() ?: $defaultPort); + $socketUri = "tcp://{$authority}"; - // Otherwise we ran into a timeout of our TimeoutCancellationToken - throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e - } + $connectContext = new ConnectContext; - $request = $this->normalizeRequestHeaders($request); + if ($request->getUri()->getScheme() === 'https') { + $tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost())) + ->withPeerName($request->getUri()->getHost()) + ->withApplicationLayerProtocols($this->connectionPool->getApplicationLayerProtocols()) + ->withPeerCapturing(); + + $connectContext = $connectContext->withTlsContext($tlsContext); + } + + try { + $checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout())); + + /** @var EncryptableSocket $socket */ + $socket = yield $this->connector->connect($socketUri, $connectContext, $checkoutCancellationToken); + } catch (SocketException $e) { + throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e); + } catch (CancelledException $e) { + // In case of a user cancellation request, throw the expected exception + $cancellation->throwIfRequested(); + + // Otherwise we ran into a timeout of our TimeoutCancellationToken + throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e + } - try { - try { - $socket->reference(); - - if ($isHttps) { - $tlsState = $socket->getTlsState(); - if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) { - $tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout())); - yield $socket->setupTls($tlsCancellationToken); - } elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) { - throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')'); - } - } - } catch (StreamException $exception) { - throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception); - } catch (CancelledException $e) { - // In case of a user cancellation request, throw the expected exception - $cancellation->throwIfRequested(); - - // Otherwise we ran into a timeout of our TimeoutCancellationToken - throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e - } - - $connectionInfo = new ConnectionInfo( - $socket->getLocalAddress(), - $socket->getRemoteAddress(), - $socket->getTlsInfo() - ); - - $driver = $this->driverFactory->selectDriver($connectionInfo, $request); - - $client = new CallableNetworkClient(function () use ( - $driver, - $request, - $socket, - $connectionInfo, - $cancellation - ): Promise { - return $driver->request($socket, $connectionInfo, $request, $cancellation); - }); - - $client = new NetworkInterceptorClient($client, $connectionInfo, ...$this->networkInterceptors); - - /** @var Response $response */ - $response = yield $client->request($request, $cancellation); - - $response->getCompletionPromise()->onResolve(function ($error) use ($socket) { - if ($error || $socket->isClosed()) { - $this->socketPool->clear($socket); - $socket->close(); - } else { - $socket->unreference(); - $this->socketPool->checkin($socket); - } - }); - - return $response; - } catch (\Throwable $e) { - $this->socketPool->clear($socket); - $socket->close(); - - throw $e; + if (!$isHttps) { + return $socket; + } + + try { + $tlsState = $socket->getTlsState(); + if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) { + $tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout())); + yield $socket->setupTls($tlsCancellationToken); + } elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) { + throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')'); } - }); + } catch (StreamException $exception) { + throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception); + } catch (CancelledException $e) { + // In case of a user cancellation request, throw the expected exception + $cancellation->throwIfRequested(); + + // Otherwise we ran into a timeout of our TimeoutCancellationToken + throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e + } + + return $socket; } private function normalizeRequestHeaders(Request $request): Request diff --git a/test/ParserTest.php b/test/ParserTest.php index 85e62957..ba12a15b 100644 --- a/test/ParserTest.php +++ b/test/ParserTest.php @@ -2,7 +2,7 @@ namespace Amp\Http\Client; -use Amp\Http\Client\Driver\Http1Parser; +use Amp\Http\Client\Connection\Http1Parser; use Amp\Socket\SocketAddress; use PHPUnit\Framework\TestCase; diff --git a/test/TimeoutTest.php b/test/TimeoutTest.php index d3f0474c..f8f6a9a7 100644 --- a/test/TimeoutTest.php +++ b/test/TimeoutTest.php @@ -68,8 +68,9 @@ public function testTimeoutDuringConnect(): \Generator // dummy watcher, because socket pool doesn't do anything }); - $this->client = (new ClientBuilder(new Socket\UnlimitedSocketPool(10000, new class implements Socket\Connector { - public function connect( + $connector = $this->createMock(Socket\Connector::class); + $connector->method('connect') + ->willReturnCallback(function ( string $uri, ?Socket\ConnectContext $connectContext = null, ?CancellationToken $token = null @@ -83,8 +84,9 @@ public function connect( } return $deferred->promise(); // never resolve - } - })))->build(); + }); + + $this->client = (new ClientBuilder($connector))->build(); $this->expectException(TimeoutException::class); $this->expectExceptionMessage("Connection to 'localhost:1337' timed out, took longer than 100 ms"); From a390a730d582eb66d86fdd0d88d075088d6f2512 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 20 Jun 2019 00:27:44 -0500 Subject: [PATCH 2/2] Move creating the socket to pool --- src/Connection/Connection.php | 2 + src/Connection/ConnectionPool.php | 22 ++--- src/Connection/DefaultConnectionPool.php | 107 ++++++++++++++++++----- src/Connection/Http1Connection.php | 14 ++- src/SocketClient.php | 74 +--------------- 5 files changed, 110 insertions(+), 109 deletions(-) diff --git a/src/Connection/Connection.php b/src/Connection/Connection.php index e83444ff..c14f276c 100644 --- a/src/Connection/Connection.php +++ b/src/Connection/Connection.php @@ -23,6 +23,8 @@ public function request(Request $request, ?CancellationToken $token = null): Pro */ public function getConnectionInfo(): ConnectionInfo; + public function isBusy(): bool; + public function isClosed(): bool; public function close(): Promise; diff --git a/src/Connection/ConnectionPool.php b/src/Connection/ConnectionPool.php index 5f08383c..a7e8c4d8 100644 --- a/src/Connection/ConnectionPool.php +++ b/src/Connection/ConnectionPool.php @@ -2,27 +2,17 @@ namespace Amp\Http\Client\Connection; +use Amp\CancellationToken; use Amp\Http\Client\Request; -use Amp\Socket\Socket; +use Amp\Promise; interface ConnectionPool { /** - * @param Socket $socket + * @param Request $request + * @param CancellationToken $token * - * @return Connection + * @return Promise */ - public function createConnection(Socket $socket): Connection; - - /** - * @param Request $request - * - * @return Connection|null - */ - public function getConnection(Request $request): ?Connection; - - /** - * @return string[] A list of supported application-layer protocols (ALPNs). - */ - public function getApplicationLayerProtocols(): array; + public function getConnection(Request $request, CancellationToken $token): Promise; } diff --git a/src/Connection/DefaultConnectionPool.php b/src/Connection/DefaultConnectionPool.php index 106b1c11..ae0e457d 100644 --- a/src/Connection/DefaultConnectionPool.php +++ b/src/Connection/DefaultConnectionPool.php @@ -2,40 +2,107 @@ namespace Amp\Http\Client\Connection; +use Amp\ByteStream\StreamException; +use Amp\CancellationToken; +use Amp\CancelledException; +use Amp\Http\Client\Internal\CombinedCancellationToken; use Amp\Http\Client\Request; -use Amp\Socket\Socket; +use Amp\Http\Client\SocketException; +use Amp\Http\Client\TimeoutException; +use Amp\Promise; +use Amp\Socket\ClientTlsContext; +use Amp\Socket\ConnectContext; +use Amp\Socket\Connector; +use Amp\Socket; +use Amp\Socket\EncryptableSocket; +use Amp\TimeoutCancellationToken; +use function Amp\call; final class DefaultConnectionPool implements ConnectionPool { + private const APPLICATION_LAYER_PROTOCOLS = ['http/1.1']; + + /** @var Connector */ + private $connector; + + /** @var Connection[][] */ private $connections = []; - public function createConnection(Socket $socket): Connection + public function __construct(?Connector $connector = null) { - $connection = new Http1Connection($socket); - $this->connections[$socket->getRemoteAddress()->toString()] = $connection; - return $connection; + $this->connector = $connector ?? Socket\connector(); } - public function getConnection(Request $request): ?Connection + public function getConnection(Request $request, CancellationToken $cancellation): Promise { - $uri = $request->getUri(); + return call(function () use ($request, $cancellation) { + $isHttps = $request->getUri()->getScheme() === 'https'; + $defaultPort = $isHttps ? 443 : 80; - $address = $uri->getHost(); - $port = $uri->getPort(); + $authority = $request->getUri()->getHost() . ':' . ($request->getUri()->getPort() ?: $defaultPort); - if ($port !== null) { - $address .= ':' . $port; - } + if (isset($this->connections[$authority])) { + foreach ($this->connections[$authority] as $connection) { + \assert($connection instanceof Connection); + if (!$connection->isBusy()) { + return $connection; + } + } + } - if (isset($this->connections[$address])) { - return $this->connections[$address]; - } + $connectContext = new ConnectContext; - return null; - } + if ($isHttps) { + $tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost())) + ->withApplicationLayerProtocols(self::APPLICATION_LAYER_PROTOCOLS) + ->withPeerCapturing(); - public function getApplicationLayerProtocols(): array - { - return ['http/1.1']; + $connectContext = $connectContext->withTlsContext($tlsContext); + } + + try { + $checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout())); + + /** @var EncryptableSocket $socket */ + $socket = yield $this->connector->connect('tcp://' . $authority, $connectContext, $checkoutCancellationToken); + } catch (Socket\ConnectException $e) { + throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e); + } catch (CancelledException $e) { + // In case of a user cancellation request, throw the expected exception + $cancellation->throwIfRequested(); + + // Otherwise we ran into a timeout of our TimeoutCancellationToken + throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e + } + + if ($isHttps) { + try { + $tlsState = $socket->getTlsState(); + if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) { + $tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout())); + yield $socket->setupTls($tlsCancellationToken); + } elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) { + throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')'); + } + } catch (StreamException $exception) { + throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception); + } catch (CancelledException $e) { + // In case of a user cancellation request, throw the expected exception + $cancellation->throwIfRequested(); + + // Otherwise we ran into a timeout of our TimeoutCancellationToken + throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e + } + } + + if (!isset($this->connections[$authority])) { + $this->connections[$authority] = []; + } + + $connection = new Http1Connection($socket); + $this->connections[$authority][] = $connection; + + return $connection; + }); } } diff --git a/src/Connection/Http1Connection.php b/src/Connection/Http1Connection.php index e3049cea..7fa7a70b 100644 --- a/src/Connection/Http1Connection.php +++ b/src/Connection/Http1Connection.php @@ -37,6 +37,7 @@ final class Http1Connection implements Connection { private $socket; private $connectionInfo; + private $busy = false; public function __construct(Socket $socket) { @@ -49,6 +50,11 @@ public function getConnectionInfo(): ConnectionInfo return $this->connectionInfo; } + public function isBusy(): bool + { + return $this->busy; + } + public function isClosed(): bool { return $this->socket->isClosed(); @@ -65,6 +71,8 @@ public function request(Request $request, ?CancellationToken $cancellation = nul return call(function () use ($request, $cancellation) { $cancellation = $cancellation ?? new NullCancellationToken; + $this->busy = true; + /** @var Request $request */ $request = yield from $this->buildRequest($request); $protocolVersion = $this->determineProtocolVersion($request); @@ -87,7 +95,11 @@ public function request(Request $request, ?CancellationToken $cancellation = nul try { yield RequestWriter::writeRequest($this->socket, $request, $protocolVersion); - return yield from $this->doRead($request, $cancellation, $readingCancellation, $completionDeferred); + $response = yield from $this->doRead($request, $cancellation, $readingCancellation, $completionDeferred); + + $this->busy = false; + + return $response; } catch (HttpException $e) { $cancellation->throwIfRequested(); diff --git a/src/SocketClient.php b/src/SocketClient.php index d702fb98..8a7c8fef 100644 --- a/src/SocketClient.php +++ b/src/SocketClient.php @@ -2,20 +2,13 @@ namespace Amp\Http\Client; -use Amp\ByteStream\StreamException; use Amp\CancellationToken; -use Amp\CancelledException; use Amp\Http\Client\Internal\CallableNetworkClient; -use Amp\Http\Client\Internal\CombinedCancellationToken; use Amp\Http\Client\Internal\NetworkInterceptorClient; use Amp\NullCancellationToken; use Amp\Promise; -use Amp\Socket\ClientTlsContext; -use Amp\Socket\ConnectContext; use Amp\Socket\Connector; use Amp\Socket\DnsConnector; -use Amp\Socket\EncryptableSocket; -use Amp\TimeoutCancellationToken; use function Amp\call; /** @@ -50,12 +43,8 @@ public function request(Request $request, CancellationToken $cancellation = null $request = $this->normalizeRequestHeaders($request); - $connection = $this->connectionPool->getConnection($request); - - if ($connection === null) { - $socket = yield from $this->createSocket($request, $cancellation); - $connection = $this->connectionPool->createConnection($socket, $request); - } + $connection = yield $this->connectionPool->getConnection($request, $cancellation); + \assert($connection instanceof Connection\Connection); $client = new CallableNetworkClient(function () use ($connection, $request, $cancellation): Promise { return $connection->request($request, $cancellation); @@ -67,65 +56,6 @@ public function request(Request $request, CancellationToken $cancellation = null }); } - private function createSocket(Request $request, CancellationToken $cancellation): \Generator - { - $isHttps = $request->getUri()->getScheme() === 'https'; - $defaultPort = $isHttps ? 443 : 80; - - $authority = $request->getUri()->getHost() . ':' . ($request->getUri()->getPort() ?: $defaultPort); - $socketUri = "tcp://{$authority}"; - - $connectContext = new ConnectContext; - - if ($request->getUri()->getScheme() === 'https') { - $tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost())) - ->withPeerName($request->getUri()->getHost()) - ->withApplicationLayerProtocols($this->connectionPool->getApplicationLayerProtocols()) - ->withPeerCapturing(); - - $connectContext = $connectContext->withTlsContext($tlsContext); - } - - try { - $checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout())); - - /** @var EncryptableSocket $socket */ - $socket = yield $this->connector->connect($socketUri, $connectContext, $checkoutCancellationToken); - } catch (SocketException $e) { - throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e); - } catch (CancelledException $e) { - // In case of a user cancellation request, throw the expected exception - $cancellation->throwIfRequested(); - - // Otherwise we ran into a timeout of our TimeoutCancellationToken - throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e - } - - if (!$isHttps) { - return $socket; - } - - try { - $tlsState = $socket->getTlsState(); - if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) { - $tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout())); - yield $socket->setupTls($tlsCancellationToken); - } elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) { - throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')'); - } - } catch (StreamException $exception) { - throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception); - } catch (CancelledException $e) { - // In case of a user cancellation request, throw the expected exception - $cancellation->throwIfRequested(); - - // Otherwise we ran into a timeout of our TimeoutCancellationToken - throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e - } - - return $socket; - } - private function normalizeRequestHeaders(Request $request): Request { $request = $this->normalizeRequestHostHeader($request);