Skip to content

Commit

Permalink
Merge c862d24 into 45074e8
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jun 20, 2019
2 parents 45074e8 + c862d24 commit 2de4bde
Show file tree
Hide file tree
Showing 18 changed files with 294 additions and 281 deletions.
12 changes: 6 additions & 6 deletions src/ClientBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
namespace Amp\Http\Client;

use Amp\Http\Client\Internal\ApplicationInterceptorClient;
use Amp\Socket\SocketPool;
use Amp\Socket\UnlimitedSocketPool;
use Amp\Socket\Connector;
use Amp\Socket\DnsConnector;

final class ClientBuilder
{
private $socketPool;
private $connector;
private $networkInterceptors = [];
private $applicationInterceptors = [];

public function __construct(?SocketPool $socketPool = null)
public function __construct(?Connector $connector = null)
{
$this->socketPool = $socketPool ?? new UnlimitedSocketPool;
$this->connector = $connector ?? new DnsConnector;
}

public function addNetworkInterceptor(NetworkInterceptor $networkInterceptor): self
Expand All @@ -33,7 +33,7 @@ public function addApplicationInterceptor(ApplicationInterceptor $applicationInt

public function build(): Client
{
$client = new SocketClient($this->socketPool);
$client = new SocketClient($this->connector);
foreach ($this->networkInterceptors as $networkInterceptor) {
$client->addNetworkInterceptor($networkInterceptor);
}
Expand Down
33 changes: 33 additions & 0 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Promise;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;

interface Connection
{
/**
* @param Request $request
* @param CancellationToken|null $token
*
* @return Promise<Response>
*/
public function request(Request $request, ?CancellationToken $token = null): Promise;

public function isBusy(): bool;

public function isClosed(): bool;

public function close(): Promise;

public function getLocalAddress(): SocketAddress;

public function getRemoteAddress(): SocketAddress;

public function getTlsInfo(): ?TlsInfo;
}
18 changes: 18 additions & 0 deletions src/Connection/ConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Http\Client\Request;
use Amp\Promise;

interface ConnectionPool
{
/**
* @param Request $request
* @param CancellationToken $token
*
* @return Promise<Connection>
*/
public function getConnection(Request $request, CancellationToken $token): Promise;
}
108 changes: 108 additions & 0 deletions src/Connection/DefaultConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\ByteStream\StreamException;
use Amp\CancellationToken;
use Amp\CancelledException;
use Amp\Http\Client\Internal\CombinedCancellationToken;
use Amp\Http\Client\Request;
use Amp\Http\Client\SocketException;
use Amp\Http\Client\TimeoutException;
use Amp\Promise;
use Amp\Socket\ClientTlsContext;
use Amp\Socket\ConnectContext;
use Amp\Socket\Connector;
use Amp\Socket;
use Amp\Socket\EncryptableSocket;
use Amp\TimeoutCancellationToken;
use function Amp\call;

final class DefaultConnectionPool implements ConnectionPool
{
private const APPLICATION_LAYER_PROTOCOLS = ['http/1.1'];

/** @var Connector */
private $connector;

/** @var Connection[][] */
private $connections = [];

public function __construct(?Connector $connector = null)
{
$this->connector = $connector ?? Socket\connector();
}

public function getConnection(Request $request, CancellationToken $cancellation): Promise
{
return call(function () use ($request, $cancellation) {
$isHttps = $request->getUri()->getScheme() === 'https';
$defaultPort = $isHttps ? 443 : 80;

$authority = $request->getUri()->getHost() . ':' . ($request->getUri()->getPort() ?: $defaultPort);

if (isset($this->connections[$authority])) {
foreach ($this->connections[$authority] as $connection) {
\assert($connection instanceof Connection);
if (!$connection->isBusy()) {
return $connection;
}
}
}

$connectContext = new ConnectContext;

if ($isHttps) {
$tlsContext = ($connectContext->getTlsContext() ?? new ClientTlsContext($request->getUri()->getHost()))
->withApplicationLayerProtocols(self::APPLICATION_LAYER_PROTOCOLS)
->withPeerCapturing();

$connectContext = $connectContext->withTlsContext($tlsContext);
}

try {
$checkoutCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTcpConnectTimeout()));

/** @var EncryptableSocket $socket */
$socket = yield $this->connector->connect('tcp://' . $authority, $connectContext, $checkoutCancellationToken);
} catch (Socket\ConnectException $e) {
throw new SocketException(\sprintf("Connection to '%s' failed", $authority), 0, $e);
} catch (CancelledException $e) {
// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();

// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new TimeoutException(\sprintf("Connection to '%s' timed out, took longer than " . $request->getTcpConnectTimeout() . ' ms', $authority)); // don't pass $e
}

if ($isHttps) {
try {
$tlsState = $socket->getTlsState();
if ($tlsState === EncryptableSocket::TLS_STATE_DISABLED) {
$tlsCancellationToken = new CombinedCancellationToken($cancellation, new TimeoutCancellationToken($request->getTlsHandshakeTimeout()));
yield $socket->setupTls($tlsCancellationToken);
} elseif ($tlsState !== EncryptableSocket::TLS_STATE_ENABLED) {
throw new SocketException('Failed to setup TLS connection, connection was in an unexpected TLS state (' . $tlsState . ')');
}
} catch (StreamException $exception) {
throw new SocketException(\sprintf("Connection to '%s' closed during TLS handshake", $authority), 0, $exception);
} catch (CancelledException $e) {
// In case of a user cancellation request, throw the expected exception
$cancellation->throwIfRequested();

// Otherwise we ran into a timeout of our TimeoutCancellationToken
throw new TimeoutException(\sprintf("TLS handshake with '%s' @ '%s' timed out, took longer than " . $request->getTlsHandshakeTimeout() . ' ms', $authority, $socket->getRemoteAddress()->toString())); // don't pass $e
}
}

if (!isset($this->connections[$authority])) {
$this->connections[$authority] = [];
}

$connection = new Http1Connection($socket);
$this->connections[$authority][] = $connection;

return $connection;
});
}
}
90 changes: 60 additions & 30 deletions src/Driver/Http1Driver.php → src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace Amp\Http\Client\Driver;
namespace Amp\Http\Client\Connection;

use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\StreamException;
Expand All @@ -9,7 +9,6 @@
use Amp\CancelledException;
use Amp\Deferred;
use Amp\Emitter;
use Amp\Http\Client\ConnectionInfo;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Internal\CombinedCancellationToken;
use Amp\Http\Client\Internal\RequestWriter;
Expand All @@ -22,7 +21,10 @@
use Amp\Http\Client\TimeoutException;
use Amp\NullCancellationToken;
use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use Amp\Socket\Socket;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;
use Amp\Success;
use Amp\TimeoutCancellationToken;
use function Amp\asyncCall;
Expand All @@ -33,18 +35,54 @@
*
* @see Client
*/
final class Http1Driver implements HttpDriver
final class Http1Connection implements Connection
{
private $socket;
private $busy = false;

public function __construct(Socket $socket)
{
$this->socket = $socket;
}

public function isBusy(): bool
{
return $this->busy;
}

public function isClosed(): bool
{
return $this->socket->isClosed();
}

public function close(): Promise
{
$this->socket->close();
return new Success;
}

public function getLocalAddress(): SocketAddress
{
return $this->socket->getLocalAddress();
}

public function getRemoteAddress(): SocketAddress
{
return $this->socket->getRemoteAddress();
}

public function getTlsInfo(): ?TlsInfo
{
return $this->socket instanceof EncryptableSocket ? $this->socket->getTlsInfo() : null;
}

/** @inheritdoc */
public function request(
Socket $socket,
ConnectionInfo $connectionInfo,
Request $request,
?CancellationToken $cancellation = null
): Promise {
return call(function () use ($request, $socket, $connectionInfo, $cancellation) {
public function request(Request $request, ?CancellationToken $cancellation = null): Promise {
return call(function () use ($request, $cancellation) {
$cancellation = $cancellation ?? new NullCancellationToken;

$this->busy = true;

/** @var Request $request */
$request = yield from $this->buildRequest($request);
$protocolVersion = $this->determineProtocolVersion($request);
Expand All @@ -58,18 +96,18 @@ public function request(
$readingCancellation = $cancellation;
}

$cancellationId = $readingCancellation->subscribe(static function () use ($socket) {
$socket->close();
});
$cancellationId = $readingCancellation->subscribe([$this->socket, 'close']);

$completionDeferred->promise()->onResolve(static function () use ($readingCancellation, $cancellationId) {
$busy = &$this->busy;
$completionDeferred->promise()->onResolve(static function () use (&$busy, $readingCancellation, $cancellationId) {
$readingCancellation->unsubscribe($cancellationId);
$busy = false;
});

try {
yield RequestWriter::writeRequest($socket, $request, $protocolVersion);
yield RequestWriter::writeRequest($this->socket, $request, $protocolVersion);

return yield from $this->doRead($socket, $request, $connectionInfo, $cancellation, $readingCancellation, $completionDeferred);
return yield from $this->doRead($request, $cancellation, $readingCancellation, $completionDeferred);
} catch (HttpException $e) {
$cancellation->throwIfRequested();

Expand Down Expand Up @@ -98,9 +136,7 @@ private function buildRequest(Request $request): \Generator
}

/**
* @param Socket $socket
* @param Request $request
* @param ConnectionInfo $connectionInfo
* @param CancellationToken $originalCancellation
* @param CancellationToken $readingCancellation
* @param Deferred $completionDeferred
Expand All @@ -111,9 +147,7 @@ private function buildRequest(Request $request): \Generator
* @throws CancelledException
*/
private function doRead(
Socket $socket,
Request $request,
ConnectionInfo $connectionInfo,
CancellationToken $originalCancellation,
CancellationToken $readingCancellation,
Deferred $completionDeferred
Expand All @@ -127,20 +161,18 @@ private function doRead(
$backpressure = $bodyEmitter->emit($data);
};

$parser = new Http1Parser($request, $connectionInfo, $bodyCallback);
$parser = new Http1Parser($request, $this, $bodyCallback);

try {
while (null !== $chunk = yield $socket->read()) {
while (null !== $chunk = yield $this->socket->read()) {
$response = $parser->parse($chunk);
if ($response === null) {
continue;
}

$bodyCancellationSource = new CancellationTokenSource;
$bodyCancellationToken = new CombinedCancellationToken($readingCancellation, $bodyCancellationSource->getToken());
$bodyCancellationToken->subscribe(static function () use ($socket) {
$socket->close();
});
$bodyCancellationToken->subscribe([$this->socket, 'close']);

$response = $response
->withBody(new ResponseBodyStream(new IteratorStream($bodyEmitter->iterate()), $bodyCancellationSource))
Expand All @@ -156,7 +188,6 @@ private function doRead(
$originalCancellation,
$readingCancellation,
$bodyCancellationToken,
$socket,
&$backpressure
) {
try {
Expand All @@ -177,7 +208,7 @@ private function doRead(
if (!$backpressure instanceof Success) {
yield $this->withCancellation($backpressure, $bodyCancellationToken);
}
} while (null !== $chunk = yield $socket->read());
} while (null !== $chunk = yield $this->socket->read());

$originalCancellation->throwIfRequested();

Expand All @@ -194,13 +225,13 @@ private function doRead(
}

if ($this->shouldCloseSocketAfterResponse($response) || $parser->getState() === Http1Parser::BODY_IDENTITY_EOF) {
$socket->close();
$this->socket->close();
}

$bodyEmitter->complete();
$completionDeferred->resolve();
} catch (\Throwable $e) {
$socket->close();
$this->socket->close();

$bodyEmitter->fail($e);
$completionDeferred->fail($e);
Expand Down Expand Up @@ -300,7 +331,6 @@ private function normalizeTraceRequest(Request $request): Request
return $request;
}


private function shouldCloseSocketAfterResponse(Response $response): bool
{
$request = $response->getRequest();
Expand Down
Loading

0 comments on commit 2de4bde

Please sign in to comment.