Skip to content

Commit

Permalink
Support keep-alive
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jun 28, 2019
1 parent 7520500 commit 5bd4a38
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 62 deletions.
6 changes: 4 additions & 2 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

interface Connection
{
public const MAX_KEEP_ALIVE_TIMEOUT = 60;

/**
* @param Request $request
* @param CancellationToken|null $token
Expand All @@ -21,10 +23,10 @@ public function request(Request $request, ?CancellationToken $token = null): Pro

public function isBusy(): bool;

public function isClosed(): bool;

public function close(): Promise;

public function onClose(callable $onClose): void;

public function getLocalAddress(): SocketAddress;

public function getRemoteAddress(): SocketAddress;
Expand Down
121 changes: 77 additions & 44 deletions src/Connection/DefaultConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ final class DefaultConnectionPool implements ConnectionPool
/** @var Connector */
private $connector;

/** @var Connection[][] */
/** @var Promise[][] */
private $connections = [];

public function __construct(?Connector $connector = null)
Expand All @@ -36,73 +36,106 @@ public function __construct(?Connector $connector = null)
public function getConnection(Request $request, CancellationToken $cancellation): Promise
{
return call(function () use ($request, $cancellation) {
$isHttps = $request->getUri()->getScheme() === 'https';
$uri = $request->getUri();
$isHttps = $uri->getScheme() === 'https';
$defaultPort = $isHttps ? 443 : 80;

$authority = $request->getUri()->getHost() . ':' . ($request->getUri()->getPort() ?: $defaultPort);
$authority = $uri->getHost() . ':' . ($uri->getPort() ?: $defaultPort);
$key = $uri->getScheme() . '://' . $authority;

if (!empty($this->connections[$key])) {
foreach ($this->connections[$key] as $index => $connection) {
if ($connection instanceof Promise) {
$connection = yield $connection;

This comment has been minimized.

Copy link
@kelunik

kelunik Jun 28, 2019

Member

Should we really await the connection here? I guess we have to in order to share a single HTTP/2 connection if HTTP/2 is available.

This comment has been minimized.

Copy link
@kelunik

kelunik Jun 28, 2019

Member

If we have HTTP/1 connections, we maybe skip the yield here?

This comment has been minimized.

Copy link
@trowski

trowski Jun 28, 2019

Author Member

Not sure if the logic mess is worth it. The second connection attempt must wait because it doesn't know if an HTTP/2 connection will be returned. There's also no guarantee that a second connection will use the same protocol.

}

if (isset($this->connections[$authority])) {
foreach ($this->connections[$authority] as $connection) {
\assert($connection instanceof Connection);

if (!$connection->isBusy()) {
return $connection;
}
}
}

$connectContext = new ConnectContext;

if ($isHttps) {
$tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost()))
->withApplicationLayerProtocols(self::APPLICATION_LAYER_PROTOCOLS)
->withPeerCapturing();

$connectContext = $connectContext->withTlsContext($tlsContext);
++$index;
} else {
$this->connections[$key] = [];
$index = 0;
}

try {
$checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout()));
$promise = $this->connections[$key][$index] = call(function () use ($request, $isHttps, $authority, $cancellation, $key, $index) {
$connectContext = new ConnectContext;

/** @var EncryptableSocket $socket */
$socket = yield $this->connector->connect('tcp://' . $authority, $connectContext, $checkoutCancellationToken);
} catch (Socket\ConnectException $e) {
throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e);
} catch (CancelledException $e) {
// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();
if ($isHttps) {
$tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost()))
->withApplicationLayerProtocols(self::APPLICATION_LAYER_PROTOCOLS)
->withPeerCapturing();

// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e
}
$connectContext = $connectContext->withTlsContext($tlsContext);
}

if ($isHttps) {
try {
$tlsState = $socket->getTlsState();
if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) {
$tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout()));
yield $socket->setupTls($tlsCancellationToken);
} elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) {
throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')');
}
} catch (StreamException $exception) {
throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception);
$checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout()));

/** @var EncryptableSocket $socket */
$socket = yield $this->connector->connect('tcp://' . $authority, $connectContext, $checkoutCancellationToken);
} catch (Socket\ConnectException $e) {
throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e);
} catch (CancelledException $e) {
// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();

// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e
throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e
}
}

if (!isset($this->connections[$authority])) {
$this->connections[$authority] = [];
}
if ($isHttps) {
try {
$tlsState = $socket->getTlsState();
if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) {
$tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout()));
yield $socket->setupTls($tlsCancellationToken);
} elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) {
throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')');
}
} catch (StreamException $exception) {
throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception);
} catch (CancelledException $e) {
// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();

// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e
}
}

$connection = new Http1Connection($socket);

$connections = &$this->connections;
$connection->onClose(static function () use (&$connections, $key, $index) {
unset($connections[$key][$index]);

if (empty($connections[$key])) {
unset($connections[$key]);
}
});

return $connection;
});

$promise->onResolve(function (?\Throwable $exception, ?Connection $connection) use ($key, $index): void {
if ($exception) {
unset($this->connections[$key][$index]);

if (empty($this->connections[$key])) {
unset($this->connections[$key]);
}
return;
}

$connection = new Http1Connection($socket);
$this->connections[$authority][] = $connection;
$this->connections[$key][$index] = $connection;
});

return $connection;
return $promise;
});
}
}
84 changes: 69 additions & 15 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
use Amp\Http\Client\Response;
use Amp\Http\Client\SocketException;
use Amp\Http\Client\TimeoutException;
use Amp\Loop;
use Amp\NullCancellationToken;
use Amp\Promise;
use Amp\Socket\EncryptableSocket;
Expand All @@ -37,9 +38,21 @@
*/
final class Http1Connection implements Connection
{
/** @var Socket */
private $socket;

/** @var bool */
private $busy = false;

/** @var int Number of requests made on this connection. */
private $requestCounter = 0;

/** @var string|null Keep alive timeout watcher ID. */
private $timeoutWatcher;

/** @var callable[]|null */
private $onClose = [];

public function __construct(Socket $socket)
{
$this->socket = $socket;
Expand All @@ -50,14 +63,29 @@ public function isBusy(): bool
return $this->busy;
}

public function isClosed(): bool
public function onClose(callable $onClose): void
{
return $this->socket->isClosed();
if ($this->socket->isClosed()) {
Promise\rethrow(call($onClose, $this));
return;
}

$this->onClose[] = $onClose;
}

public function close(): Promise
{
$this->socket->close();

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;

foreach ($onClose as $callback) {
Promise\rethrow(call($callback, $this));
}
}

return new Success;
}

Expand All @@ -83,6 +111,11 @@ public function request(Request $request, ?CancellationToken $cancellation = nul
$cancellation = $cancellation ?? new NullCancellationToken;

$this->busy = true;
++$this->requestCounter;

if ($this->timeoutWatcher !== null) {
Loop::cancel($this->timeoutWatcher);
}

/** @var Request $request */
$request = yield from $this->buildRequest($request);
Expand All @@ -97,7 +130,7 @@ public function request(Request $request, ?CancellationToken $cancellation = nul
$readingCancellation = $cancellation;
}

$cancellationId = $readingCancellation->subscribe([$this->socket, 'close']);
$cancellationId = $readingCancellation->subscribe([$this, 'close']);

$busy = &$this->busy;
$completionDeferred->promise()->onResolve(static function () use (&$busy, $readingCancellation, $cancellationId) {
Expand Down Expand Up @@ -173,7 +206,7 @@ private function doRead(

$bodyCancellationSource = new CancellationTokenSource;
$bodyCancellationToken = new CombinedCancellationToken($readingCancellation, $bodyCancellationSource->getToken());
$bodyCancellationToken->subscribe([$this->socket, 'close']);
$bodyCancellationToken->subscribe([$this, 'close']);

$response = $response
->withBody(new ResponseBodyStream(new IteratorStream($bodyEmitter->iterate()), $bodyCancellationSource))
Expand Down Expand Up @@ -225,14 +258,17 @@ private function doRead(
}
}

if ($this->shouldCloseSocketAfterResponse($response) || $parser->getState() === Http1Parser::BODY_IDENTITY_EOF) {
$this->socket->close();
if ($timeout = $this->determineKeepAliveTimeout($response)) {
$this->timeoutWatcher = Loop::delay($timeout * 1000, [$this, 'close']);
Loop::unreference($this->timeoutWatcher);
} else {
$this->close();
}

$bodyEmitter->complete();
$completionDeferred->resolve();
} catch (\Throwable $e) {
$this->socket->close();
$this->close();

$bodyEmitter->fail($e);
$completionDeferred->fail($e);
Expand Down Expand Up @@ -332,26 +368,44 @@ private function normalizeTraceRequest(Request $request): Request
return $request;
}

private function shouldCloseSocketAfterResponse(Response $response): bool
private function determineKeepAliveTimeout(Response $response): int
{
$request = $response->getRequest();

$requestConnHeader = $request->getHeader('connection');
$responseConnHeader = $response->getHeader('connection');

if ($requestConnHeader && !\strcasecmp($requestConnHeader, 'close')) {
return true;
if (!\strcasecmp($requestConnHeader, 'close')) {
return 0;
}

if ($response->getProtocolVersion() === '1.0') {
return 0;
}

if (\strcasecmp($responseConnHeader, 'keep-alive')) {
return 0;
}

if ($responseConnHeader && !\strcasecmp($responseConnHeader, 'close')) {
return true;
$responseKeepAliveHeader = $response->getHeader('keep-alive');

if ($responseKeepAliveHeader === null) {
return self::MAX_KEEP_ALIVE_TIMEOUT;
}

$parts = \array_map('trim', \explode(',', $responseKeepAliveHeader));

$params = [];
foreach ($parts as $part) {
[$key, $value] = \array_map('trim', \explode('=', $part)) + [null, null];
$params[$key] = (int) $value;
}

if (!$responseConnHeader && $response->getProtocolVersion() === '1.0') {
return true;
if ($this->requestCounter >= $params['max'] ?? \PHP_INT_MAX) {
return 0;
}

return false;
return \min(\max(0, $params['timeout'] ?? 0), self::MAX_KEEP_ALIVE_TIMEOUT);
}

private function determineProtocolVersion(Request $request): string
Expand Down
2 changes: 1 addition & 1 deletion test/ClientHttpBinIntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public function testCustomUserAgentSentIfAssigned(): \Generator
$uri = 'http://httpbin.org/user-agent';
$customUserAgent = 'test-user-agent';

$request = (new Request($uri))->withHeader('User-Agent', $customUserAgent);
$request = (new Request($uri))->withHeader('User-Agent', $customUserAgent)->withHeader('Connection', 'keep-alive');

/** @var Response $response */
$response = yield $this->executeRequest($request);
Expand Down

0 comments on commit 5bd4a38

Please sign in to comment.