-
-
Notifications
You must be signed in to change notification settings - Fork 67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Connection Pool Refactor #182
Changes from 23 commits
4e46ec9
b09ced2
d042668
48239b5
9a0afad
766cadf
3fa3e34
69abeb1
fdaa26c
80c88d8
eab8ced
ac30d43
4de37a9
e0a7960
5e5590a
69ab56e
60a32ca
1756b32
69455ec
67624a7
b70f00b
ee40a6d
b59c8e4
783b63f
d1adfce
675b75d
1582c44
7856054
1662985
e721fcf
7c5862a
39b5a3b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
<?php | ||
|
||
namespace Amp\Http\Client\Connection; | ||
|
||
use Amp\CancellationToken; | ||
use Amp\Http\Client\Request; | ||
use Amp\Http\Client\Response; | ||
use Amp\Promise; | ||
use Amp\Socket\SocketAddress; | ||
use Amp\Socket\TlsInfo; | ||
|
||
interface Connection | ||
{ | ||
public const MAX_KEEP_ALIVE_TIMEOUT = 60; | ||
|
||
/** | ||
* @param Request $request | ||
* @param CancellationToken $token | ||
* | ||
* @return Promise<Response> | ||
*/ | ||
public function request(Request $request, CancellationToken $token): Promise; | ||
|
||
public function isBusy(): bool; | ||
|
||
public function close(): Promise; | ||
|
||
public function onClose(callable $onClose): void; | ||
|
||
public function getLocalAddress(): SocketAddress; | ||
|
||
public function getRemoteAddress(): SocketAddress; | ||
|
||
public function getTlsInfo(): ?TlsInfo; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
<?php | ||
|
||
namespace Amp\Http\Client\Connection; | ||
|
||
use Amp\CancellationToken; | ||
use Amp\Http\Client\Request; | ||
use Amp\Promise; | ||
|
||
interface ConnectionPool | ||
{ | ||
/** | ||
* @param Request $request | ||
* @param CancellationToken $token | ||
* | ||
* @return Promise<Connection> | ||
*/ | ||
public function getConnection(Request $request, CancellationToken $token): Promise; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,144 @@ | ||
<?php | ||
|
||
namespace Amp\Http\Client\Connection; | ||
|
||
use Amp\ByteStream\StreamException; | ||
use Amp\CancellationToken; | ||
use Amp\CancelledException; | ||
use Amp\Http\Client\Internal\CombinedCancellationToken; | ||
use Amp\Http\Client\Request; | ||
use Amp\Http\Client\SocketException; | ||
use Amp\Http\Client\TimeoutException; | ||
use Amp\Promise; | ||
use Amp\Socket; | ||
use Amp\Socket\ClientTlsContext; | ||
use Amp\Socket\ConnectContext; | ||
use Amp\Socket\Connector; | ||
use Amp\Socket\EncryptableSocket; | ||
use Amp\TimeoutCancellationToken; | ||
use function Amp\call; | ||
|
||
final class DefaultConnectionPool implements ConnectionPool | ||
{ | ||
/** @var Connector */ | ||
private $connector; | ||
|
||
/** @var Promise[][] */ | ||
private $connections = []; | ||
|
||
public function __construct(?Connector $connector = null) | ||
{ | ||
$this->connector = $connector ?? Socket\connector(); | ||
} | ||
|
||
public function getConnection(Request $request, CancellationToken $cancellation): Promise | ||
{ | ||
return call(function () use ($request, $cancellation) { | ||
$uri = $request->getUri(); | ||
$isHttps = $uri->getScheme() === 'https'; | ||
$defaultPort = $isHttps ? 443 : 80; | ||
|
||
$authority = $uri->getHost() . ':' . ($uri->getPort() ?: $defaultPort); | ||
$key = $uri->getScheme() . '://' . $authority; | ||
|
||
if (!empty($this->connections[$key])) { | ||
foreach ($this->connections[$key] as $index => $connection) { | ||
$connection = yield $connection; | ||
\assert($connection instanceof Connection); | ||
|
||
if (!$connection->isBusy()) { | ||
return $connection; | ||
} | ||
} | ||
|
||
++$index; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't safe. Suppose we have 3 concurrent connections which all allow just one request. Due to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, right, since the new connections won't be added to the list used by foreach. Hmm… might have to use a queue or something where that would be the case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd just do a keyless push and query the last key. |
||
} else { | ||
$this->connections[$key] = []; | ||
$index = 0; | ||
} | ||
|
||
$promise = $this->connections[$key][$index] = call(function () use ($request, $isHttps, $authority, $cancellation, $key, $index) { | ||
kelunik marked this conversation as resolved.
Show resolved
Hide resolved
|
||
$connectContext = new ConnectContext; | ||
|
||
if ($isHttps) { | ||
$tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost())) | ||
->withApplicationLayerProtocols(['http/1.1']) | ||
->withPeerCapturing(); | ||
|
||
if (\in_array('2.0', $request->getProtocolVersions(), true)) { | ||
$tlsContext = $tlsContext->withApplicationLayerProtocols(['h2', 'http/1.1']); | ||
} | ||
|
||
$connectContext = $connectContext->withTlsContext($tlsContext); | ||
} | ||
|
||
try { | ||
$checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout())); | ||
|
||
/** @var EncryptableSocket $socket */ | ||
$socket = yield $this->connector->connect('tcp://' . $authority, $connectContext, $checkoutCancellationToken); | ||
} catch (Socket\ConnectException $e) { | ||
throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e); | ||
} catch (CancelledException $e) { | ||
// In case of a user cancellation request, throw the expected exception | ||
$cancellation->throwIfRequested(); | ||
|
||
// Otherwise we ran into a timeout of our TimeoutCancellationToken | ||
throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e | ||
} | ||
|
||
if ($isHttps) { | ||
try { | ||
$tlsState = $socket->getTlsState(); | ||
if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) { | ||
$tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout())); | ||
yield $socket->setupTls($tlsCancellationToken); | ||
} elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) { | ||
throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')'); | ||
} | ||
} catch (StreamException $exception) { | ||
throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception); | ||
} catch (CancelledException $e) { | ||
// In case of a user cancellation request, throw the expected exception | ||
$cancellation->throwIfRequested(); | ||
|
||
// Otherwise we ran into a timeout of our TimeoutCancellationToken | ||
throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e | ||
} | ||
} | ||
|
||
if ($isHttps && $socket->getTlsInfo()->getApplicationLayerProtocol() === 'h2') { | ||
$connection = new Http2Connection($socket); | ||
} else { | ||
$connection = new Http1Connection($socket); | ||
} | ||
|
||
$connections = &$this->connections; | ||
$connection->onClose(static function () use (&$connections, $key, $index) { | ||
unset($connections[$key][$index]); | ||
|
||
if (empty($connections[$key])) { | ||
unset($connections[$key]); | ||
} | ||
}); | ||
|
||
return $connection; | ||
}); | ||
|
||
$promise->onResolve(function (?\Throwable $exception) use ($key, $index): void { | ||
if (!$exception) { | ||
return; | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We only push the connection to the array two lines below, so there shouldn't be any entry here to delete. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The promise is pushed to the array when it's created, so that needs to be deleted. |
||
|
||
// Connection failed, remove from list of connections. | ||
unset($this->connections[$key][$index]); | ||
|
||
if (empty($this->connections[$key])) { | ||
unset($this->connections[$key]); | ||
} | ||
}); | ||
|
||
return $promise; | ||
}); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should fallback to
connector()
instead.