Skip to content

Commit

Permalink
Merge 4b29568 into af4dbfc
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Nov 8, 2019
2 parents af4dbfc + 4b29568 commit a670952
Show file tree
Hide file tree
Showing 19 changed files with 218 additions and 43 deletions.
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"amphp/hpack": "^2",
"amphp/http": "^1.3",
"amphp/socket": "^1",
"amphp/sync": "^1.3",
"league/uri": "^6",
"psr/http-message": "^1"
},
Expand Down
4 changes: 2 additions & 2 deletions examples/10-connection-pooling.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

use Amp\Http\Client\Connection\DefaultConnectionPool;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Request;
Expand All @@ -12,7 +12,7 @@
Loop::run(static function () use ($argv) {
try {
// There's no need to create a custom pool here, we just need it to access the statistics.
$pool = new DefaultConnectionPool;
$pool = new UnlimitedConnectionPool;

$client = (new HttpClientBuilder)->usingPool($pool)->followRedirects(0)->build();

Expand Down
4 changes: 2 additions & 2 deletions examples/11-connection-pooling-close.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

use Amp\Http\Client\Connection\DefaultConnectionPool;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Request;
Expand All @@ -12,7 +12,7 @@
Loop::run(static function () use ($argv) {
try {
// There's no need to create a custom pool here, we just need it to access the statistics.
$pool = new DefaultConnectionPool;
$pool = new UnlimitedConnectionPool;

$client = (new HttpClientBuilder)->usingPool($pool)->followRedirects(0)->build();

Expand Down
60 changes: 60 additions & 0 deletions examples/13-connection-limits.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
<?php

use Amp\CancellationToken;
use Amp\Http\Client\Connection\LimitedConnectionPool;
use Amp\Http\Client\Connection\Stream;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\HostRequestKey;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\NetworkInterceptor;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Loop;
use Amp\Promise;
use Amp\Sync\LocalKeyedSemaphore;
use function Amp\call;

require __DIR__ . '/../vendor/autoload.php';

Loop::run(static function () {
try {
// There's no need to create a custom pool here, we just need it to access the statistics.
$pool = new LimitedConnectionPool(new UnlimitedConnectionPool, new LocalKeyedSemaphore(1), new HostRequestKey);

$logger = new class implements NetworkInterceptor {
public function requestViaNetwork(
Request $request,
CancellationToken $cancellation,
Stream $stream
): Promise {
return call(static function () use ($request, $cancellation, $stream) {
print 'Starting request to ' . $request->getUri() . '...' . PHP_EOL;

try {
return yield $stream->request($request, $cancellation);
} finally {
print 'Done @ ' . $request->getUri() . ' ' . PHP_EOL;
}
});
}
};

$client = (new HttpClientBuilder)->usingPool($pool)->followRedirects(0)->interceptNetwork($logger)->build();

for ($i = 0; $i < 3; $i++) {
$promises = [];
for ($j = 0; $j < 10; $j++) {
$promises[] = call(static function () use ($client, $i, $j) {
/** @var Response $response */
$response = yield $client->request(new Request("http://amphp.org/$i.$j"));
yield $response->getBody()->buffer();
});
}

yield $promises;
}
} catch (HttpException $error) {
echo $error;
}
});
4 changes: 2 additions & 2 deletions examples/7-unix-sockets.php
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?php

use Amp\Http\Client\Connection\DefaultConnectionPool;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Request;
Expand All @@ -15,7 +15,7 @@
try {
// Unix sockets require a socket pool that changes all URLs to a fixed one.
$connector = new StaticConnector("unix:///var/run/docker.sock", new DnsConnector);
$client = (new HttpClientBuilder)->usingPool(new DefaultConnectionPool($connector))->build();
$client = (new HttpClientBuilder)->usingPool(new UnlimitedConnectionPool($connector))->build();

// amphp/http-client requires a host, so just use a dummy one.
$request = new Request('http://docker/info');
Expand Down
4 changes: 2 additions & 2 deletions examples/8-benchmark.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
// Custom (10 x $count requests): php examples/8-benchmark.php $count

use Amp\CancellationToken;
use Amp\Http\Client\Connection\DefaultConnectionPool;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
Expand Down Expand Up @@ -38,7 +38,7 @@ public function connect(string $uri, ?ConnectContext $context = null, ?Cancellat
};

$client = (new HttpClientBuilder)
->usingPool(new DefaultConnectionPool($connector))
->usingPool(new UnlimitedConnectionPool($connector))
->build();

$handler = coroutine(static function (int $count) use ($client, $argv) {
Expand Down
5 changes: 0 additions & 5 deletions src/Connection/ConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,4 @@ interface ConnectionPool
* @return Promise<Stream>
*/
public function getStream(Request $request, CancellationToken $token): Promise;

/**
* @return string[] Array of supported protocol versions.
*/
public function getProtocolVersions(): array;
}
2 changes: 1 addition & 1 deletion src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public function getStream(Request $request): Promise

$this->busy = true;

return new Success(new HttpStream(
return new Success(HttpStream::fromConnection(
$this,
\Closure::fromCallable([$this, 'request']),
\Closure::fromCallable([$this, 'release'])
Expand Down
2 changes: 1 addition & 1 deletion src/Connection/Http2Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ public function getStream(Request $request): Promise

--$this->remainingStreams;

return new HttpStream(
return HttpStream::fromConnection(
$this,
\Closure::fromCallable([$this, 'request']),
\Closure::fromCallable([$this, 'release'])
Expand Down
59 changes: 46 additions & 13 deletions src/Connection/HttpStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,52 +15,85 @@ final class HttpStream implements Stream
use ForbidSerialization;
use ForbidCloning;

/** @var Connection */
private $connection;
public static function fromConnection(
Connection $connection,
callable $requestCallback,
callable $releaseCallback
): self {
return new self(
$connection->getLocalAddress(),
$connection->getRemoteAddress(),
$connection->getTlsInfo(),
$requestCallback,
$releaseCallback
);
}

public static function fromStream(Stream $stream, callable $requestCallback, callable $releaseCallback): self
{
return new self(
$stream->getLocalAddress(),
$stream->getRemoteAddress(),
$stream->getTlsInfo(),
$requestCallback,
$releaseCallback
);
}

/** @var SocketAddress */
private $localAddress;

/** @var SocketAddress */
private $remoteAddress;

/** @var TlsInfo */
private $tlsInfo;

/** @var callable */
private $requestCallback;

/** @var callable|null */
private $release;
private $releaseCallback;

public function __construct(Connection $connection, callable $requestCallback, callable $releaseCallback)
private function __construct(SocketAddress $localAddress, SocketAddress $remoteAddress, ?TlsInfo $tlsInfo, callable $requestCallback, callable $releaseCallback)
{
$this->connection = $connection;
$this->localAddress = $localAddress;
$this->remoteAddress = $remoteAddress;
$this->tlsInfo = $tlsInfo;
$this->requestCallback = $requestCallback;
$this->release = $releaseCallback;
$this->releaseCallback = $releaseCallback;
}

public function __destruct()
{
if ($this->release !== null) {
($this->release)();
if ($this->releaseCallback !== null) {
($this->releaseCallback)();
}
}

public function request(Request $request, CancellationToken $token): Promise
{
if ($this->release === null) {
if ($this->releaseCallback === null) {
throw new \Error('A stream may only be used for a single request');
}

$this->release = null;
$this->releaseCallback = null;

return ($this->requestCallback)(clone $request, $token);
}

public function getLocalAddress(): SocketAddress
{
return $this->connection->getLocalAddress();
return $this->localAddress;
}

public function getRemoteAddress(): SocketAddress
{
return $this->connection->getRemoteAddress();
return $this->remoteAddress;
}

public function getTlsInfo(): ?TlsInfo
{
return $this->connection->getTlsInfo();
return $this->tlsInfo;
}
}
70 changes: 70 additions & 0 deletions src/Connection/LimitedConnectionPool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Http\Client\RequestKey;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Promise;
use Amp\Sync\KeyedSemaphore;
use Amp\Sync\Lock;
use function Amp\call;
use function Amp\coroutine;

final class LimitedConnectionPool implements ConnectionPool
{
/** @var ConnectionPool */
private $delegate;

/** @var KeyedSemaphore */
private $semaphore;

/** @var RequestKey */
private $keyExtractor;

public function __construct(ConnectionPool $delegate, KeyedSemaphore $semaphore, RequestKey $keyExtractor)
{
$this->delegate = $delegate;
$this->semaphore = $semaphore;
$this->keyExtractor = $keyExtractor;
}

public function getStream(Request $request, CancellationToken $token): Promise
{
return call(function () use ($request, $token) {
/** @var Lock $lock */
$lock = yield $this->semaphore->acquire($this->keyExtractor->getKey($request));

/** @var Stream $stream */
$stream = yield $this->delegate->getStream($request, $token);

return HttpStream::fromStream(
$stream,
coroutine(static function (Request $request, CancellationToken $cancellationToken) use (
$stream,
$lock
) {
try {
/** @var Response $response */
$response = yield $stream->request($request, $cancellationToken);

// await response being completely received
$response->getTrailers()->onResolve(static function () use ($lock) {
$lock->release();
});
} catch (\Throwable $e) {
$lock->release();

throw $e;
}

return $response;
}),
static function () use ($lock) {
$lock->release();
}
);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
use Amp\TimeoutCancellationToken;
use function Amp\call;

final class DefaultConnectionPool implements ConnectionPool
final class UnlimitedConnectionPool implements ConnectionPool
{
use ForbidSerialization;

Expand Down Expand Up @@ -189,11 +189,6 @@ public function withTimeoutGracePeriod(int $timeout): self
return $pool;
}

public function getProtocolVersions(): array
{
return self::PROTOCOL_VERSIONS;
}

private function createConnection(
Request $request,
CancellationToken $cancellation,
Expand Down
11 changes: 11 additions & 0 deletions src/HostRequestKey.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
<?php

namespace Amp\Http\Client;

final class HostRequestKey implements RequestKey
{
public function getKey(Request $request): string
{
return $request->getUri()->getHost();
}
}
4 changes: 2 additions & 2 deletions src/HttpClientBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace Amp\Http\Client;

use Amp\Http\Client\Connection\ConnectionPool;
use Amp\Http\Client\Connection\DefaultConnectionPool;
use Amp\Http\Client\Connection\UnlimitedConnectionPool;
use Amp\Http\Client\Interceptor\FollowRedirects;
use Amp\Http\Client\Interceptor\ForbidUriUserInfo;
use Amp\Http\Client\Interceptor\RetryRequests;
Expand Down Expand Up @@ -40,7 +40,7 @@ public static function buildDefault(): HttpClient

public function __construct()
{
$this->pool = new DefaultConnectionPool;
$this->pool = new UnlimitedConnectionPool;
$this->forbidUriUserInfo = new ForbidUriUserInfo;
$this->followRedirectsInterceptor = new FollowRedirects(10);
$this->retryInterceptor = new RetryRequests(2);
Expand Down

0 comments on commit a670952

Please sign in to comment.