Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Connection Pool Refactor #182

Merged
merged 32 commits into from
Jul 31, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
4e46ec9
Refactor from socket pool to stateful connection pool
trowski Jun 16, 2019
b09ced2
Move creating the socket to pool
trowski Jun 20, 2019
d042668
Mark busy until body is complete
trowski Jun 20, 2019
48239b5
Remove ConnectionInfo
trowski Jun 20, 2019
9a0afad
Decrease timeout in test
trowski Jun 20, 2019
766cadf
Forgot to remove ConnectionInfo class
trowski Jun 20, 2019
3fa3e34
Remove connection info from Response
trowski Jun 25, 2019
69abeb1
Support keep-alive
trowski Jun 28, 2019
fdaa26c
Use default or prior keep alive timeout if none is given
trowski Jul 12, 2019
80c88d8
Use new value component functions in amphp/http 1.3
trowski Jul 12, 2019
eab8ced
Do not replace promise with connection
trowski Jul 13, 2019
ac30d43
Move RequestWriter into Http1Connection
trowski Jul 13, 2019
4de37a9
Start of HTTP/2 connection
trowski Jul 13, 2019
e0a7960
Do not yield body part backpressure promise
trowski Jul 14, 2019
5e5590a
Fix settings issues causing disconnection
trowski Jul 14, 2019
69ab56e
Wait to ACK initial settings frame before sending first request
trowski Jul 14, 2019
60a32ca
Send null to parser after promise resolution
trowski Jul 15, 2019
1756b32
Add/normalize host header for HTTP/1.x; remove in HTTP/2
trowski Jul 26, 2019
69455ec
Port HTTP/2 improvements from http-server
trowski Jul 26, 2019
67624a7
Use hpack v2
trowski Jul 27, 2019
b70f00b
Remove unnecessary check
trowski Jul 27, 2019
ee40a6d
Require cancellation token in Connection
trowski Jul 27, 2019
b59c8e4
Fix rebase error
trowski Jul 27, 2019
783b63f
Fix pool reusing connections
trowski Jul 28, 2019
d1adfce
Various cleanup and organization
trowski Jul 29, 2019
675b75d
Normalize scheme and authority
trowski Jul 30, 2019
1582c44
Implement sending request body in HTTP/2
trowski Jul 30, 2019
7856054
Check content-length validity; do not remove chunked from transfer-en…
trowski Jul 30, 2019
1662985
Remove unused property of Http2Stream
trowski Jul 30, 2019
e721fcf
Simplify multiple connection handling
trowski Jul 30, 2019
7c5862a
Fix CS issues
trowski Jul 30, 2019
39b5a3b
Use null when creating stream if StringBody contains empty string
trowski Jul 30, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions src/Connection/DefaultConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ final class DefaultConnectionPool implements ConnectionPool
/** @var Connector */
private $connector;

/** @var Promise[][] */
/** @var \SplObjectStorage[] */
private $connections = [];

public function __construct(?Connector $connector = null)
Expand All @@ -41,23 +41,22 @@ public function getConnection(Request $request, CancellationToken $cancellation)
$authority = $uri->getHost() . ':' . ($uri->getPort() ?: $defaultPort);
$key = $uri->getScheme() . '://' . $authority;

if (!empty($this->connections[$key])) {
foreach ($this->connections[$key] as $index => $connection) {
if (isset($this->connections[$key])) {
foreach ($this->connections[$key] as $connection) {
$connection = yield $connection;
\assert($connection instanceof Connection);

if (!$connection->isBusy()) {
return $connection;
}
}

++$index;
} else {
$this->connections[$key] = [];
$index = 0;
$this->connections[$key] = new \SplObjectStorage;
}

$promise = $this->connections[$key][$index] = call(function () use ($request, $isHttps, $authority, $cancellation, $key, $index) {
\assert($this->connections[$key] instanceof \SplObjectStorage);

$promise = call(function () use (&$promise, $request, $isHttps, $authority, $cancellation, $key) {
$connectContext = new ConnectContext;

if ($isHttps) {
Expand Down Expand Up @@ -113,27 +112,31 @@ public function getConnection(Request $request, CancellationToken $cancellation)
$connection = new Http1Connection($socket);
}

\assert($promise instanceof Promise);

$connections = &$this->connections;
$connection->onClose(static function () use (&$connections, $key, $index) {
unset($connections[$key][$index]);
$connection->onClose(static function () use (&$connections, $key, $promise) {
$connections[$key]->detach($promise);

if (empty($connections[$key])) {
if (!$connections[$key]->count()) {
unset($connections[$key]);
}
});

return $connection;
});

$promise->onResolve(function (?\Throwable $exception) use ($key, $index): void {
$this->connections[$key]->attach($promise);

$promise->onResolve(function (?\Throwable $exception) use ($key, $promise): void {
if (!$exception) {
return;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only push the connection to the array two lines below, so there shouldn't be any entry here to delete.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The promise is pushed to the array when it's created, so that needs to be deleted.


// Connection failed, remove from list of connections.
unset($this->connections[$key][$index]);
$this->connections[$key]->detach($promise);

if (empty($this->connections[$key])) {
if (!$this->connections[$key]->count()) {
unset($this->connections[$key]);
}
});
Expand Down
17 changes: 14 additions & 3 deletions src/Connection/Http2Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -226,12 +226,12 @@ public function request(Request $request, CancellationToken $token): Promise

public function isBusy(): bool
{
return $this->remainingStreams > 0 || $this->socket->isClosed();
return $this->remainingStreams <= 0 || $this->socket->isClosed();
}

public function onClose(callable $onClose): void
{
if ($this->socket->isClosed()) {
if ($this->onClose === null) {
Promise\rethrow(call($onClose, $this));
return;
}
Expand All @@ -243,6 +243,12 @@ public function close(): Promise
{
$this->socket->close();

if (!empty($this->streams)) {
foreach ($this->streams as $id => $stream) {
$this->releaseStream($id);
}
}

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
Expand Down Expand Up @@ -308,8 +314,12 @@ private function run(): \Generator
\assert($promise === null || $promise instanceof Promise);
}
}

var_dump($chunk);
kelunik marked this conversation as resolved.
Show resolved Hide resolved
} catch (\Throwable $exception) {
// ?
foreach ($this->streams as $id => $stream) {
$this->releaseStream($id, $exception);
}
} finally {
$this->close();
}
Expand Down Expand Up @@ -954,6 +964,7 @@ private function parser(): \Generator
}

$status = $headers[":status"][0];
unset($headers[":status"]);

if ($stream->state & Http2Stream::RESERVED) {
$error = self::PROTOCOL_ERROR;
Expand Down