Skip to content

Commit

Permalink
Merge b3f17c5 into 632d3ad
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Nov 20, 2019
2 parents 632d3ad + b3f17c5 commit 66f9628
Show file tree
Hide file tree
Showing 31 changed files with 619 additions and 243 deletions.
4 changes: 2 additions & 2 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ root = true
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
indent_style = spaces
charset = utf-8
indent_style = space
charset = utf-8
4 changes: 2 additions & 2 deletions examples/basic/6-customization.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Interceptor\LogIntoHttpArchive;
use Amp\Http\Client\Interceptor\LogHttpArchive;
use Amp\Http\Client\Interceptor\MatchOrigin;
use Amp\Http\Client\Interceptor\SetRequestHeader;
use Amp\Http\Client\Request;
Expand All @@ -14,7 +14,7 @@
Loop::run(static function () use ($argv) {
try {
$client = (new HttpClientBuilder)
->intercept(new LogIntoHttpArchive(__DIR__ . '/log.har'))
->intercept(new LogHttpArchive(__DIR__ . '/log.har'))
->intercept(new MatchOrigin(['https://amphp.org' => new SetRequestHeader('x-amphp', 'true')]))
->followRedirects(0)
->retry(3)
Expand Down
21 changes: 16 additions & 5 deletions src/ApplicationInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
namespace Amp\Http\Client;

use Amp\CancellationToken;
use Amp\Http\Client\Interceptor\DecompressResponse;
use Amp\Promise;

/**
Expand All @@ -13,16 +14,26 @@ interface ApplicationInterceptor
/**
* Intercepts an application request to an HTTP resource.
*
* The implementation might modify the request, delegate the request handling to the `$next` client, and/or modify
* the response after the promise returned from `$next->request(...)` resolved.
* The implementation might modify the request, delegate the request handling to the `$httpClient`, and/or modify
* the response after the promise returned from `$httpClient->request(...)` resolves.
*
* An interceptor might also short-circuit and not delegate to the `$next` client at all.
* An interceptor might also short-circuit and not delegate to the `$httpClient` at all.
*
* Any retry or cloned follow-up request must be manually cloned from `$request` to ensure a properly working
* interceptor chain, e.g. the {@see DecompressResponse} interceptor only decodes a response if the
* `accept-encoding` header isn't set manually. If the request isn't cloned, the first attempt will set the header
* and the second attempt will see the header and won't decode the response, because it thinks another interceptor
* or the application itself will care about the decoding.
*
* @param Request $request
* @param CancellationToken $cancellation
* @param DelegateHttpClient $next
* @param DelegateHttpClient $httpClient
*
* @return Promise
*/
public function request(Request $request, CancellationToken $cancellation, DelegateHttpClient $next): Promise;
public function request(
Request $request,
CancellationToken $cancellation,
DelegateHttpClient $httpClient
): Promise;
}
2 changes: 0 additions & 2 deletions src/Connection/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@

interface Connection
{
public const MAX_KEEP_ALIVE_TIMEOUT = 60;

/**
* @param Request $request
*
Expand Down
12 changes: 12 additions & 0 deletions src/Connection/ConnectionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,24 @@
namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Http\Client\EventListener;
use Amp\Http\Client\Request;
use Amp\Promise;

interface ConnectionPool
{
/**
* Reserve a stream for a particular request.
*
* During connection establishment, the pool must call the {@see EventListener::startConnectionCreation()},
* {@see EventListener::startTlsNegotiation()}, and {@see EventListener::completeTlsNegotiation()} on all event
* listeners registered on the given request in the order defined by {@see Request::getEventListeners()} as
* appropriate. Before calling the next listener, the promise returned from the previous one must resolve
* successfully.
*
* Additionally, the pool may invoke {@see EventListener::startDnsResolution()} and
* {@see EventListener::completeDnsResolution()}, but is not required to implement such granular events.
*
* @param Request $request
* @param CancellationToken $token
*
Expand Down
59 changes: 45 additions & 14 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use Amp\Emitter;
use Amp\Http;
use Amp\Http\Client\Connection\Internal\Http1Parser;
use Amp\Http\Client\HarAttributes;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Internal\ForbidCloning;
use Amp\Http\Client\Internal\ForbidSerialization;
Expand Down Expand Up @@ -47,6 +46,7 @@ final class Http1Connection implements Connection
use ForbidSerialization;
use ForbidCloning;

private const MAX_KEEP_ALIVE_TIMEOUT = 60;
private const PROTOCOL_VERSIONS = ['1.0', '1.1'];

/** @var EncryptableSocket */
Expand All @@ -73,6 +73,9 @@ final class Http1Connection implements Connection
/** @var int */
private $estimatedClose;

/** @var bool */
private $explicitTimeout = false;

/** @var SocketAddress */
private $localAddress;

Expand Down Expand Up @@ -173,16 +176,18 @@ private function free(): Promise

private function hasStreamFor(Request $request): bool
{
$connectionUnlikelyToClose = $this->explicitTimeout && $this->getRemainingTime() > $this->timeoutGracePeriod;

return !$this->busy
&& $this->socket
&& !$this->socket->isClosed()
&& ($this->getRemainingTime() > $this->timeoutGracePeriod || $request->isIdempotent());
&& ($connectionUnlikelyToClose || $request->isIdempotent());
}

/** @inheritdoc */
private function request(Request $request, CancellationToken $cancellation): Promise
private function request(Request $request, CancellationToken $cancellation, Stream $stream): Promise
{
return call(function () use ($request, $cancellation) {
return call(function () use ($request, $cancellation, $stream) {
++$this->requestCounter;

if ($this->timeoutWatcher !== null) {
Expand All @@ -206,10 +211,17 @@ private function request(Request $request, CancellationToken $cancellation): Pro
$id = $combinedCancellation->subscribe([$this, 'close']);

try {
$request->setAttribute(HarAttributes::TIME_SEND, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startSendingRequest($request, $stream);
}

yield from $this->writeRequest($request, $protocolVersion, $combinedCancellation);
$request->setAttribute(HarAttributes::TIME_WAIT, getCurrentTime());
return yield from $this->readResponse($request, $cancellation, $combinedCancellation);

foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeSendingRequest($request, $stream);
}

return yield from $this->readResponse($request, $cancellation, $combinedCancellation, $stream);
} finally {
$combinedCancellation->unsubscribe($id);
$cancellation->throwIfRequested();
Expand Down Expand Up @@ -246,15 +258,19 @@ private function buildRequest(Request $request): \Generator
* @param CancellationToken $originalCancellation
* @param CancellationToken $readingCancellation
*
* @param Stream $stream
*
* @return \Generator
* @throws CancelledException
* @throws HttpException
* @throws ParseException
* @throws SocketException
* @throws CancelledException
*/
private function readResponse(
Request $request,
CancellationToken $originalCancellation,
CancellationToken $readingCancellation
CancellationToken $readingCancellation,
Stream $stream
): \Generator {
$bodyEmitter = new Emitter;

Expand All @@ -277,7 +293,10 @@ private function readResponse(
try {
while (null !== $chunk = yield $this->socket->read()) {
if ($firstRead) {
$request->setAttribute(HarAttributes::TIME_RECEIVE, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($request, $stream);
}

$firstRead = false;
}

Expand Down Expand Up @@ -332,10 +351,16 @@ private function readResponse(
}

$bodyCancellationSource = new CancellationTokenSource;
$bodyCancellationToken = new CombinedCancellationToken($readingCancellation, $bodyCancellationSource->getToken());
$bodyCancellationToken = new CombinedCancellationToken(
$readingCancellation,
$bodyCancellationSource->getToken()
);

$response->setBody(new ResponseBodyStream(new IteratorStream($bodyEmitter->iterate()), $bodyCancellationSource));
$response->setTrailers($trailersDeferred->promise());
$response->setBody(new ResponseBodyStream(
new IteratorStream($bodyEmitter->iterate()),
$bodyCancellationSource
));

// Read body async
asyncCall(function () use (
Expand All @@ -347,6 +372,7 @@ private function readResponse(
$originalCancellation,
$readingCancellation,
$bodyCancellationToken,
$stream,
&$backpressure,
&$trailers
) {
Expand Down Expand Up @@ -398,7 +424,9 @@ private function readResponse(

$this->busy = false;

$request->setAttribute(HarAttributes::TIME_COMPLETE, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeReceivingResponse($request, $stream);
}

$bodyEmitter->complete();
$trailersDeferred->resolve($trailers);
Expand Down Expand Up @@ -546,13 +574,16 @@ private function determineKeepAliveTimeout(Response $response): int
return 0;
}

if (\strcasecmp($responseConnHeader, 'keep-alive')) {
if (!\strcasecmp($responseConnHeader, 'close')) {
return 0;
}

$params = Http\createFieldValueComponentMap(Http\parseFieldValueComponents($response, 'keep-alive'));

$timeout = (int) ($params['timeout'] ?? $this->priorTimeout);
if (isset($params['timeout'])) {
$this->explicitTimeout = true;
}

return $this->priorTimeout = \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT);
}
Expand Down
Loading

0 comments on commit 66f9628

Please sign in to comment.