Skip to content

Commit

Permalink
Merge 0f018b0 into 45074e8
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jun 17, 2019
2 parents 45074e8 + 0f018b0 commit 3011310
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 199 deletions.
12 changes: 6 additions & 6 deletions src/ClientBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@
namespace Amp\Http\Client;

use Amp\Http\Client\Internal\ApplicationInterceptorClient;
use Amp\Socket\SocketPool;
use Amp\Socket\UnlimitedSocketPool;
use Amp\Socket\Connector;
use Amp\Socket\DnsConnector;

final class ClientBuilder
{
private $socketPool;
private $connector;
private $networkInterceptors = [];
private $applicationInterceptors = [];

public function __construct(?SocketPool $socketPool = null)
public function __construct(?Connector $connector = null)
{
$this->socketPool = $socketPool ?? new UnlimitedSocketPool;
$this->connector = $connector ?? new DnsConnector;
}

public function addNetworkInterceptor(NetworkInterceptor $networkInterceptor): self
Expand All @@ -33,7 +33,7 @@ public function addApplicationInterceptor(ApplicationInterceptor $applicationInt

public function build(): Client
{
$client = new SocketClient($this->socketPool);
$client = new SocketClient($this->connector);
foreach ($this->networkInterceptors as $networkInterceptor) {
$client->addNetworkInterceptor($networkInterceptor);
}
Expand Down
29 changes: 29 additions & 0 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Http\Client\ConnectionInfo;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Promise;

interface Connection
{
/**
* @param Request $request
* @param CancellationToken|null $token
*
* @return Promise<Response>
*/
public function request(Request $request, ?CancellationToken $token = null): Promise;

/**
* @return ConnectionInfo
*/
public function getConnectionInfo(): ConnectionInfo;

public function isClosed(): bool;

public function close(): Promise;
}
28 changes: 28 additions & 0 deletions src/Connection/ConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\Http\Client\Request;
use Amp\Socket\Socket;

interface ConnectionPool
{
/**
* @param Socket $socket
*
* @return Connection
*/
public function createConnection(Socket $socket): Connection;

/**
* @param Request $request
*
* @return Connection|null
*/
public function getConnection(Request $request): ?Connection;

/**
* @return string[] A list of supported application-layer protocols (ALPNs).
*/
public function getApplicationLayerProtocols(): array;
}
41 changes: 41 additions & 0 deletions src/Connection/DefaultConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\Http\Client\Request;
use Amp\Socket\Socket;

final class DefaultConnectionPool implements ConnectionPool
{
private $connections = [];

public function createConnection(Socket $socket): Connection
{
$connection = new Http1Connection($socket);
$this->connections[$socket->getRemoteAddress()->toString()] = $connection;
return $connection;
}

public function getConnection(Request $request): ?Connection
{
$uri = $request->getUri();

$address = $uri->getHost();
$port = $uri->getPort();

if ($port !== null) {
$address .= ':' . $port;
}

if (isset($this->connections[$address])) {
return $this->connections[$address];
}

return null;
}

public function getApplicationLayerProtocols(): array
{
return ['http/1.1'];
}
}
66 changes: 38 additions & 28 deletions src/Driver/Http1Driver.php → src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace Amp\Http\Client\Driver;
namespace Amp\Http\Client\Connection;

use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\StreamException;
Expand Down Expand Up @@ -33,16 +33,36 @@
*
* @see Client
*/
final class Http1Driver implements HttpDriver
final class Http1Connection implements Connection
{
private $socket;
private $connectionInfo;

public function __construct(Socket $socket)
{
$this->socket = $socket;
$this->connectionInfo = ConnectionInfo::fromSocket($socket);
}

public function getConnectionInfo(): ConnectionInfo
{
return $this->connectionInfo;
}

public function isClosed(): bool
{
return $this->socket->isClosed();
}

public function close(): Promise
{
$this->socket->close();
return new Success;
}

/** @inheritdoc */
public function request(
Socket $socket,
ConnectionInfo $connectionInfo,
Request $request,
?CancellationToken $cancellation = null
): Promise {
return call(function () use ($request, $socket, $connectionInfo, $cancellation) {
public function request(Request $request, ?CancellationToken $cancellation = null): Promise {
return call(function () use ($request, $cancellation) {
$cancellation = $cancellation ?? new NullCancellationToken;

/** @var Request $request */
Expand All @@ -58,18 +78,16 @@ public function request(
$readingCancellation = $cancellation;
}

$cancellationId = $readingCancellation->subscribe(static function () use ($socket) {
$socket->close();
});
$cancellationId = $readingCancellation->subscribe([$this->socket, 'close']);

$completionDeferred->promise()->onResolve(static function () use ($readingCancellation, $cancellationId) {
$readingCancellation->unsubscribe($cancellationId);
});

try {
yield RequestWriter::writeRequest($socket, $request, $protocolVersion);
yield RequestWriter::writeRequest($this->socket, $request, $protocolVersion);

return yield from $this->doRead($socket, $request, $connectionInfo, $cancellation, $readingCancellation, $completionDeferred);
return yield from $this->doRead($request, $cancellation, $readingCancellation, $completionDeferred);
} catch (HttpException $e) {
$cancellation->throwIfRequested();

Expand Down Expand Up @@ -98,9 +116,7 @@ private function buildRequest(Request $request): \Generator
}

/**
* @param Socket $socket
* @param Request $request
* @param ConnectionInfo $connectionInfo
* @param CancellationToken $originalCancellation
* @param CancellationToken $readingCancellation
* @param Deferred $completionDeferred
Expand All @@ -111,9 +127,7 @@ private function buildRequest(Request $request): \Generator
* @throws CancelledException
*/
private function doRead(
Socket $socket,
Request $request,
ConnectionInfo $connectionInfo,
CancellationToken $originalCancellation,
CancellationToken $readingCancellation,
Deferred $completionDeferred
Expand All @@ -127,20 +141,18 @@ private function doRead(
$backpressure = $bodyEmitter->emit($data);
};

$parser = new Http1Parser($request, $connectionInfo, $bodyCallback);
$parser = new Http1Parser($request, $this->connectionInfo, $bodyCallback);

try {
while (null !== $chunk = yield $socket->read()) {
while (null !== $chunk = yield $this->socket->read()) {
$response = $parser->parse($chunk);
if ($response === null) {
continue;
}

$bodyCancellationSource = new CancellationTokenSource;
$bodyCancellationToken = new CombinedCancellationToken($readingCancellation, $bodyCancellationSource->getToken());
$bodyCancellationToken->subscribe(static function () use ($socket) {
$socket->close();
});
$bodyCancellationToken->subscribe([$this->socket, 'close']);

$response = $response
->withBody(new ResponseBodyStream(new IteratorStream($bodyEmitter->iterate()), $bodyCancellationSource))
Expand All @@ -156,7 +168,6 @@ private function doRead(
$originalCancellation,
$readingCancellation,
$bodyCancellationToken,
$socket,
&$backpressure
) {
try {
Expand All @@ -177,7 +188,7 @@ private function doRead(
if (!$backpressure instanceof Success) {
yield $this->withCancellation($backpressure, $bodyCancellationToken);
}
} while (null !== $chunk = yield $socket->read());
} while (null !== $chunk = yield $this->socket->read());

$originalCancellation->throwIfRequested();

Expand All @@ -194,13 +205,13 @@ private function doRead(
}

if ($this->shouldCloseSocketAfterResponse($response) || $parser->getState() === Http1Parser::BODY_IDENTITY_EOF) {
$socket->close();
$this->socket->close();
}

$bodyEmitter->complete();
$completionDeferred->resolve();
} catch (\Throwable $e) {
$socket->close();
$this->socket->close();

$bodyEmitter->fail($e);
$completionDeferred->fail($e);
Expand Down Expand Up @@ -300,7 +311,6 @@ private function normalizeTraceRequest(Request $request): Request
return $request;
}


private function shouldCloseSocketAfterResponse(Response $response): bool
{
$request = $response->getRequest();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

namespace Amp\Http\Client\Driver;
namespace Amp\Http\Client\Connection;

use Amp\ByteStream\InMemoryStream;
use Amp\Http\Client\ConnectionInfo;
Expand Down
11 changes: 11 additions & 0 deletions src/ConnectionInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

namespace Amp\Http\Client;

use Amp\Socket\EncryptableSocket;
use Amp\Socket\Socket;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;

Expand All @@ -11,6 +13,15 @@ final class ConnectionInfo
private $remoteAddress;
private $tlsInfo;

public static function fromSocket(Socket $socket): self
{
return new self(
$socket->getLocalAddress(),
$socket->getRemoteAddress(),
$socket instanceof EncryptableSocket ? $socket->getTlsInfo() : null
);
}

public function __construct(SocketAddress $localAddress, SocketAddress $remoteAddress, ?TlsInfo $tlsInfo = null)
{
$this->localAddress = $localAddress;
Expand Down
19 changes: 0 additions & 19 deletions src/Driver/DefaultHttpDriverFactory.php

This file was deleted.

23 changes: 0 additions & 23 deletions src/Driver/HttpDriver.php

This file was deleted.

22 changes: 0 additions & 22 deletions src/Driver/HttpDriverFactory.php

This file was deleted.

Loading

0 comments on commit 3011310

Please sign in to comment.