Skip to content

Commit

Permalink
Merge 5bd4a38 into 45074e8
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jun 28, 2019
2 parents 45074e8 + 5bd4a38 commit 6bad96c
Show file tree
Hide file tree
Showing 18 changed files with 364 additions and 319 deletions.
12 changes: 6 additions & 6 deletions src/ClientBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
namespace Amp\Http\Client;

use Amp\Http\Client\Internal\ApplicationInterceptorClient;
use Amp\Socket\SocketPool;
use Amp\Socket\UnlimitedSocketPool;
use Amp\Socket\Connector;
use Amp\Socket\DnsConnector;

final class ClientBuilder
{
private $socketPool;
private $connector;
private $networkInterceptors = [];
private $applicationInterceptors = [];

public function __construct(?SocketPool $socketPool = null)
public function __construct(?Connector $connector = null)
{
$this->socketPool = $socketPool ?? new UnlimitedSocketPool;
$this->connector = $connector ?? new DnsConnector;
}

public function addNetworkInterceptor(NetworkInterceptor $networkInterceptor): self
Expand All @@ -33,7 +33,7 @@ public function addApplicationInterceptor(ApplicationInterceptor $applicationInt

public function build(): Client
{
$client = new SocketClient($this->socketPool);
$client = new SocketClient($this->connector);
foreach ($this->networkInterceptors as $networkInterceptor) {
$client->addNetworkInterceptor($networkInterceptor);
}
Expand Down
35 changes: 35 additions & 0 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Promise;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;

interface Connection
{
public const MAX_KEEP_ALIVE_TIMEOUT = 60;

/**
* @param Request $request
* @param CancellationToken|null $token
*
* @return Promise<Response>
*/
public function request(Request $request, ?CancellationToken $token = null): Promise;

public function isBusy(): bool;

public function close(): Promise;

public function onClose(callable $onClose): void;

public function getLocalAddress(): SocketAddress;

public function getRemoteAddress(): SocketAddress;

public function getTlsInfo(): ?TlsInfo;
}
18 changes: 18 additions & 0 deletions src/Connection/ConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Http\Client\Request;
use Amp\Promise;

interface ConnectionPool
{
/**
* @param Request $request
* @param CancellationToken $token
*
* @return Promise<Connection>
*/
public function getConnection(Request $request, CancellationToken $token): Promise;
}
141 changes: 141 additions & 0 deletions src/Connection/DefaultConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\ByteStream\StreamException;
use Amp\CancellationToken;
use Amp\CancelledException;
use Amp\Http\Client\Internal\CombinedCancellationToken;
use Amp\Http\Client\Request;
use Amp\Http\Client\SocketException;
use Amp\Http\Client\TimeoutException;
use Amp\Promise;
use Amp\Socket;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext;
use Amp\Socket\Connector;
use Amp\Socket\EncryptableSocket;
use Amp\TimeoutCancellationToken;
use function Amp\call;

final class DefaultConnectionPool implements ConnectionPool
{
private const APPLICATION_LAYER_PROTOCOLS = ['http/1.1'];

/** @var Connector */
private $connector;

/** @var Promise[][] */
private $connections = [];

public function __construct(?Connector $connector = null)
{
$this->connector = $connector ?? Socket\connector();
}

public function getConnection(Request $request, CancellationToken $cancellation): Promise
{
return call(function () use ($request, $cancellation) {
$uri = $request->getUri();
$isHttps = $uri->getScheme() === 'https';
$defaultPort = $isHttps ? 443 : 80;

$authority = $uri->getHost() . ':' . ($uri->getPort() ?: $defaultPort);
$key = $uri->getScheme() . '://' . $authority;

if (!empty($this->connections[$key])) {
foreach ($this->connections[$key] as $index => $connection) {
if ($connection instanceof Promise) {
$connection = yield $connection;
}

\assert($connection instanceof Connection);

if (!$connection->isBusy()) {
return $connection;
}
}

++$index;
} else {
$this->connections[$key] = [];
$index = 0;
}

$promise = $this->connections[$key][$index] = call(function () use ($request, $isHttps, $authority, $cancellation, $key, $index) {
$connectContext = new ConnectContext;

if ($isHttps) {
$tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost()))
->withApplicationLayerProtocols(self::APPLICATION_LAYER_PROTOCOLS)
->withPeerCapturing();

$connectContext = $connectContext->withTlsContext($tlsContext);
}

try {
$checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout()));

/** @var EncryptableSocket $socket */
$socket = yield $this->connector->connect('tcp://' . $authority, $connectContext, $checkoutCancellationToken);
} catch (Socket\ConnectException $e) {
throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e);
} catch (CancelledException $e) {
// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();

// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e
}

if ($isHttps) {
try {
$tlsState = $socket->getTlsState();
if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) {
$tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout()));
yield $socket->setupTls($tlsCancellationToken);
} elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) {
throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')');
}
} catch (StreamException $exception) {
throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception);
} catch (CancelledException $e) {
// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();

// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e
}
}

$connection = new Http1Connection($socket);

$connections = &$this->connections;
$connection->onClose(static function () use (&$connections, $key, $index) {
unset($connections[$key][$index]);

if (empty($connections[$key])) {
unset($connections[$key]);
}
});

return $connection;
});

$promise->onResolve(function (?\Throwable $exception, ?Connection $connection) use ($key, $index): void {
if ($exception) {
unset($this->connections[$key][$index]);

if (empty($this->connections[$key])) {
unset($this->connections[$key]);
}
return;
}

$this->connections[$key][$index] = $connection;
});

return $promise;
});
}
}
Loading

0 comments on commit 6bad96c

Please sign in to comment.