-
-
Notifications
You must be signed in to change notification settings - Fork 66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] Connection Pool Refactor #182
Merged
Merged
Changes from all commits
Commits
Show all changes
32 commits
Select commit
Hold shift + click to select a range
4e46ec9
Refactor from socket pool to stateful connection pool
trowski b09ced2
Move creating the socket to pool
trowski d042668
Mark busy until body is complete
trowski 48239b5
Remove ConnectionInfo
trowski 9a0afad
Decrease timeout in test
trowski 766cadf
Forgot to remove ConnectionInfo class
trowski 3fa3e34
Remove connection info from Response
trowski 69abeb1
Support keep-alive
trowski fdaa26c
Use default or prior keep alive timeout if none is given
trowski 80c88d8
Use new value component functions in amphp/http 1.3
trowski eab8ced
Do not replace promise with connection
trowski ac30d43
Move RequestWriter into Http1Connection
trowski 4de37a9
Start of HTTP/2 connection
trowski e0a7960
Do not yield body part backpressure promise
trowski 5e5590a
Fix settings issues causing disconnection
trowski 69ab56e
Wait to ACK initial settings frame before sending first request
trowski 60a32ca
Send null to parser after promise resolution
trowski 1756b32
Add/normalize host header for HTTP/1.x; remove in HTTP/2
trowski 69455ec
Port HTTP/2 improvements from http-server
trowski 67624a7
Use hpack v2
trowski b70f00b
Remove unnecessary check
trowski ee40a6d
Require cancellation token in Connection
trowski b59c8e4
Fix rebase error
trowski 783b63f
Fix pool reusing connections
trowski d1adfce
Various cleanup and organization
trowski 675b75d
Normalize scheme and authority
trowski 1582c44
Implement sending request body in HTTP/2
trowski 7856054
Check content-length validity; do not remove chunked from transfer-en…
trowski 1662985
Remove unused property of Http2Stream
trowski e721fcf
Simplify multiple connection handling
trowski 7c5862a
Fix CS issues
trowski 39b5a3b
Use null when creating stream if StringBody contains empty string
trowski File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
<?php | ||
|
||
namespace Amp\Http\Client\Connection; | ||
|
||
use Amp\CancellationToken; | ||
use Amp\Http\Client\Request; | ||
use Amp\Http\Client\Response; | ||
use Amp\Promise; | ||
use Amp\Socket\SocketAddress; | ||
use Amp\Socket\TlsInfo; | ||
|
||
interface Connection | ||
{ | ||
public const MAX_KEEP_ALIVE_TIMEOUT = 60; | ||
|
||
/** | ||
* @param Request $request | ||
* @param CancellationToken $token | ||
* | ||
* @return Promise<Response> | ||
*/ | ||
public function request(Request $request, CancellationToken $token): Promise; | ||
|
||
public function isBusy(): bool; | ||
|
||
public function close(): Promise; | ||
|
||
public function onClose(callable $onClose): void; | ||
|
||
public function getLocalAddress(): SocketAddress; | ||
|
||
public function getRemoteAddress(): SocketAddress; | ||
|
||
public function getTlsInfo(): ?TlsInfo; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
<?php | ||
|
||
namespace Amp\Http\Client\Connection; | ||
|
||
use Amp\CancellationToken; | ||
use Amp\Http\Client\Request; | ||
use Amp\Promise; | ||
|
||
interface ConnectionPool | ||
{ | ||
/** | ||
* @param Request $request | ||
* @param CancellationToken $token | ||
* | ||
* @return Promise<Connection> | ||
*/ | ||
public function getConnection(Request $request, CancellationToken $token): Promise; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
<?php | ||
|
||
namespace Amp\Http\Client\Connection; | ||
|
||
use Amp\ByteStream\StreamException; | ||
use Amp\CancellationToken; | ||
use Amp\CancelledException; | ||
use Amp\Http\Client\Internal\CombinedCancellationToken; | ||
use Amp\Http\Client\Request; | ||
use Amp\Http\Client\SocketException; | ||
use Amp\Http\Client\TimeoutException; | ||
use Amp\Promise; | ||
use Amp\Socket; | ||
use Amp\Socket\ClientTlsContext; | ||
use Amp\Socket\ConnectContext; | ||
use Amp\Socket\Connector; | ||
use Amp\Socket\EncryptableSocket; | ||
use Amp\TimeoutCancellationToken; | ||
use function Amp\call; | ||
|
||
final class DefaultConnectionPool implements ConnectionPool | ||
{ | ||
/** @var Connector */ | ||
private $connector; | ||
|
||
/** @var \SplObjectStorage[] */ | ||
private $connections = []; | ||
|
||
public function __construct(?Connector $connector = null) | ||
{ | ||
$this->connector = $connector ?? Socket\connector(); | ||
} | ||
|
||
public function getConnection(Request $request, CancellationToken $cancellation): Promise | ||
{ | ||
return call(function () use ($request, $cancellation) { | ||
$uri = $request->getUri(); | ||
$scheme = \strtolower($uri->getScheme()); | ||
$isHttps = $scheme === 'https'; | ||
$defaultPort = $isHttps ? 443 : 80; | ||
|
||
$host = \strtolower($uri->getHost()); | ||
$port = $uri->getPort() ?? $defaultPort; | ||
|
||
if ($host === '') { | ||
throw new \Error('A host must be provided in the URI'); | ||
} | ||
|
||
$authority = $host . ':' . $port; | ||
$key = $scheme . '://' . $authority; | ||
|
||
$this->connections[$key] = $this->connections[$key] ?? new \SplObjectStorage; | ||
|
||
foreach ($this->connections[$key] as $connection) { | ||
$connection = yield $connection; | ||
\assert($connection instanceof Connection); | ||
|
||
if (!$connection->isBusy()) { | ||
return $connection; | ||
} | ||
} | ||
|
||
$promise = call(function () use (&$promise, $request, $isHttps, $authority, $cancellation, $key) { | ||
$connectContext = new ConnectContext; | ||
|
||
if ($isHttps) { | ||
$tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost())) | ||
->withApplicationLayerProtocols(['http/1.1']) | ||
->withPeerCapturing(); | ||
|
||
if (\in_array('2.0', $request->getProtocolVersions(), true)) { | ||
$tlsContext = $tlsContext->withApplicationLayerProtocols(['h2', 'http/1.1']); | ||
} | ||
|
||
$connectContext = $connectContext->withTlsContext($tlsContext); | ||
} | ||
|
||
try { | ||
$checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout())); | ||
|
||
/** @var EncryptableSocket $socket */ | ||
$socket = yield $this->connector->connect('tcp://' . $authority, $connectContext, $checkoutCancellationToken); | ||
} catch (Socket\ConnectException $e) { | ||
throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e); | ||
} catch (CancelledException $e) { | ||
// In case of a user cancellation request, throw the expected exception | ||
$cancellation->throwIfRequested(); | ||
|
||
// Otherwise we ran into a timeout of our TimeoutCancellationToken | ||
throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e | ||
} | ||
|
||
if ($isHttps) { | ||
try { | ||
$tlsState = $socket->getTlsState(); | ||
if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) { | ||
$tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout())); | ||
yield $socket->setupTls($tlsCancellationToken); | ||
} elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) { | ||
throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')'); | ||
} | ||
} catch (StreamException $exception) { | ||
throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception); | ||
} catch (CancelledException $e) { | ||
// In case of a user cancellation request, throw the expected exception | ||
$cancellation->throwIfRequested(); | ||
|
||
// Otherwise we ran into a timeout of our TimeoutCancellationToken | ||
throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e | ||
} | ||
} | ||
|
||
if ($isHttps && $socket->getTlsInfo()->getApplicationLayerProtocol() === 'h2') { | ||
$connection = new Http2Connection($socket); | ||
} else { | ||
$connection = new Http1Connection($socket); | ||
} | ||
|
||
\assert($promise instanceof Promise); | ||
|
||
$connection->onClose(function () use ($key, $promise) { | ||
$this->connections[$key]->detach($promise); | ||
|
||
if (!$this->connections[$key]->count()) { | ||
unset($this->connections[$key]); | ||
} | ||
}); | ||
|
||
return $connection; | ||
}); | ||
|
||
$this->connections[$key]->attach($promise); | ||
|
||
$promise->onResolve(function (?\Throwable $exception) use ($key, $promise): void { | ||
if (!$exception) { | ||
return; | ||
} | ||
|
||
// Connection failed, remove from list of connections. | ||
$this->connections[$key]->detach($promise); | ||
|
||
if (!$this->connections[$key]->count()) { | ||
unset($this->connections[$key]); | ||
} | ||
}); | ||
|
||
return $promise; | ||
}); | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We only push the connection to the array two lines below, so there shouldn't be any entry here to delete.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The promise is pushed to the array when it's created, so that needs to be deleted.