Skip to content

Commit

Permalink
Merge 860f656 into 632d3ad
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Nov 19, 2019
2 parents 632d3ad + 860f656 commit d6be000
Show file tree
Hide file tree
Showing 27 changed files with 434 additions and 238 deletions.
4 changes: 2 additions & 2 deletions .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ root = true
end_of_line = lf
insert_final_newline = true
trim_trailing_whitespace = true
indent_style = spaces
charset = utf-8
indent_style = space
charset = utf-8
4 changes: 2 additions & 2 deletions examples/basic/6-customization.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Interceptor\LogIntoHttpArchive;
use Amp\Http\Client\Interceptor\LogHttpArchive;
use Amp\Http\Client\Interceptor\MatchOrigin;
use Amp\Http\Client\Interceptor\SetRequestHeader;
use Amp\Http\Client\Request;
Expand All @@ -14,7 +14,7 @@
Loop::run(static function () use ($argv) {
try {
$client = (new HttpClientBuilder)
->intercept(new LogIntoHttpArchive(__DIR__ . '/log.har'))
->intercept(new LogHttpArchive(__DIR__ . '/log.har'))
->intercept(new MatchOrigin(['https://amphp.org' => new SetRequestHeader('x-amphp', 'true')]))
->followRedirects(0)
->retry(3)
Expand Down
14 changes: 9 additions & 5 deletions src/ApplicationInterceptor.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,20 @@ interface ApplicationInterceptor
/**
* Intercepts an application request to an HTTP resource.
*
* The implementation might modify the request, delegate the request handling to the `$next` client, and/or modify
* the response after the promise returned from `$next->request(...)` resolved.
* The implementation might modify the request, delegate the request handling to the `$httpClient`, and/or modify
* the response after the promise returned from `$httpClient->request(...)` resolves.
*
* An interceptor might also short-circuit and not delegate to the `$next` client at all.
* An interceptor might also short-circuit and not delegate to the `$httpClient` at all.
*
* @param Request $request
* @param CancellationToken $cancellation
* @param DelegateHttpClient $next
* @param DelegateHttpClient $httpClient
*
* @return Promise
*/
public function request(Request $request, CancellationToken $cancellation, DelegateHttpClient $next): Promise;
public function request(
Request $request,
CancellationToken $cancellation,
DelegateHttpClient $httpClient
): Promise;
}
57 changes: 43 additions & 14 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
use Amp\Emitter;
use Amp\Http;
use Amp\Http\Client\Connection\Internal\Http1Parser;
use Amp\Http\Client\HarAttributes;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Internal\ForbidCloning;
use Amp\Http\Client\Internal\ForbidSerialization;
Expand Down Expand Up @@ -73,6 +72,9 @@ final class Http1Connection implements Connection
/** @var int */
private $estimatedClose;

/** @var bool */
private $explicitTimeout = false;

/** @var SocketAddress */
private $localAddress;

Expand Down Expand Up @@ -173,16 +175,18 @@ private function free(): Promise

private function hasStreamFor(Request $request): bool
{
$connectionUnlikelyToClose = $this->explicitTimeout && $this->getRemainingTime() > $this->timeoutGracePeriod;

return !$this->busy
&& $this->socket
&& !$this->socket->isClosed()
&& ($this->getRemainingTime() > $this->timeoutGracePeriod || $request->isIdempotent());
&& ($connectionUnlikelyToClose || $request->isIdempotent());
}

/** @inheritdoc */
private function request(Request $request, CancellationToken $cancellation): Promise
private function request(Request $request, CancellationToken $cancellation, Stream $stream): Promise
{
return call(function () use ($request, $cancellation) {
return call(function () use ($request, $cancellation, $stream) {
++$this->requestCounter;

if ($this->timeoutWatcher !== null) {
Expand All @@ -206,10 +210,17 @@ private function request(Request $request, CancellationToken $cancellation): Pro
$id = $combinedCancellation->subscribe([$this, 'close']);

try {
$request->setAttribute(HarAttributes::TIME_SEND, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startSendingRequest($request, $stream);
}

yield from $this->writeRequest($request, $protocolVersion, $combinedCancellation);
$request->setAttribute(HarAttributes::TIME_WAIT, getCurrentTime());
return yield from $this->readResponse($request, $cancellation, $combinedCancellation);

foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startWaitingForResponse($request, $stream);
}

return yield from $this->readResponse($request, $cancellation, $combinedCancellation, $stream);
} finally {
$combinedCancellation->unsubscribe($id);
$cancellation->throwIfRequested();
Expand Down Expand Up @@ -246,15 +257,19 @@ private function buildRequest(Request $request): \Generator
* @param CancellationToken $originalCancellation
* @param CancellationToken $readingCancellation
*
* @param Stream $stream
*
* @return \Generator
* @throws CancelledException
* @throws HttpException
* @throws ParseException
* @throws SocketException
* @throws CancelledException
*/
private function readResponse(
Request $request,
CancellationToken $originalCancellation,
CancellationToken $readingCancellation
CancellationToken $readingCancellation,
Stream $stream
): \Generator {
$bodyEmitter = new Emitter;

Expand All @@ -277,7 +292,10 @@ private function readResponse(
try {
while (null !== $chunk = yield $this->socket->read()) {
if ($firstRead) {
$request->setAttribute(HarAttributes::TIME_RECEIVE, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($request, $stream);
}

$firstRead = false;
}

Expand Down Expand Up @@ -332,10 +350,16 @@ private function readResponse(
}

$bodyCancellationSource = new CancellationTokenSource;
$bodyCancellationToken = new CombinedCancellationToken($readingCancellation, $bodyCancellationSource->getToken());
$bodyCancellationToken = new CombinedCancellationToken(
$readingCancellation,
$bodyCancellationSource->getToken()
);

$response->setBody(new ResponseBodyStream(new IteratorStream($bodyEmitter->iterate()), $bodyCancellationSource));
$response->setTrailers($trailersDeferred->promise());
$response->setBody(new ResponseBodyStream(
new IteratorStream($bodyEmitter->iterate()),
$bodyCancellationSource
));

// Read body async
asyncCall(function () use (
Expand Down Expand Up @@ -398,7 +422,9 @@ private function readResponse(

$this->busy = false;

$request->setAttribute(HarAttributes::TIME_COMPLETE, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeRequest($request);
}

$bodyEmitter->complete();
$trailersDeferred->resolve($trailers);
Expand Down Expand Up @@ -546,13 +572,16 @@ private function determineKeepAliveTimeout(Response $response): int
return 0;
}

if (\strcasecmp($responseConnHeader, 'keep-alive')) {
if (!\strcasecmp($responseConnHeader, 'close')) {
return 0;
}

$params = Http\createFieldValueComponentMap(Http\parseFieldValueComponents($response, 'keep-alive'));

$timeout = (int) ($params['timeout'] ?? $this->priorTimeout);
if (isset($params['timeout'])) {
$this->explicitTimeout = true;
}

return $this->priorTimeout = \min(\max(0, $timeout), self::MAX_KEEP_ALIVE_TIMEOUT);
}
Expand Down
53 changes: 35 additions & 18 deletions src/Connection/Http2Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
use Amp\Emitter;
use Amp\Failure;
use Amp\Http\Client\Connection\Internal\Http2Stream;
use Amp\Http\Client\HarAttributes;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Internal\ForbidCloning;
use Amp\Http\Client\Internal\ForbidSerialization;
Expand All @@ -36,7 +35,6 @@
use League\Uri;
use function Amp\asyncCall;
use function Amp\call;
use function Amp\getCurrentTime;

final class Http2Connection implements Connection
{
Expand Down Expand Up @@ -289,7 +287,7 @@ private function ping(): Promise
return $this->pongDeferred->promise();
}

private function request(Request $request, CancellationToken $token): Promise
private function request(Request $request, CancellationToken $token, Stream $applicationStream): Promise
{
$this->requestCount++;

Expand All @@ -312,6 +310,7 @@ private function request(Request $request, CancellationToken $token): Promise
$request->getBodySizeLimit()
);

$stream->applicationStream = $applicationStream;
$stream->request = $request;
$stream->cancellationToken = $token;

Expand All @@ -324,7 +323,7 @@ private function request(Request $request, CancellationToken $token): Promise
);
}

return call(function () use ($id, $request, $token): \Generator {
return call(function () use ($id, $request, $token, $applicationStream): \Generator {
$this->pendingRequests[$id] = $deferred = new Deferred;

if ($this->socket->isClosed()) {
Expand Down Expand Up @@ -381,7 +380,9 @@ private function request(Request $request, CancellationToken $token): Promise
":method" => [$request->getMethod()],
], $request->getHeaders());

$request->setAttribute(HarAttributes::TIME_SEND, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startSendingRequest($request, $applicationStream);
}

$headers = $this->table->encode($headers);

Expand All @@ -390,7 +391,9 @@ private function request(Request $request, CancellationToken $token): Promise
$chunk = yield $stream->read();

if (!isset($this->streams[$id]) || $token->isRequested()) {
$request->setAttribute(HarAttributes::TIME_WAIT, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startWaitingForResponse($request, $applicationStream);
}

return yield $deferred->promise();
}
Expand All @@ -412,15 +415,19 @@ private function request(Request $request, CancellationToken $token): Promise
}

if ($chunk === null) {
$request->setAttribute(HarAttributes::TIME_WAIT, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startWaitingForResponse($request, $applicationStream);
}

return yield $deferred->promise();
}

$buffer = $chunk;
while (null !== $chunk = yield $stream->read()) {
if (!isset($this->streams[$id]) || $token->isRequested()) {
$request->setAttribute(HarAttributes::TIME_WAIT, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startWaitingForResponse($request, $applicationStream);
}

return yield $deferred->promise();
}
Expand All @@ -430,7 +437,9 @@ private function request(Request $request, CancellationToken $token): Promise
}

if (!isset($this->streams[$id]) || $token->isRequested()) {
$request->setAttribute(HarAttributes::TIME_WAIT, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startWaitingForResponse($request, $applicationStream);
}

return yield $deferred->promise();
}
Expand All @@ -439,7 +448,9 @@ private function request(Request $request, CancellationToken $token): Promise

yield $this->writeData($buffer, $id);

$request->setAttribute(HarAttributes::TIME_WAIT, getCurrentTime());
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startWaitingForResponse($request, $applicationStream);
}

return yield $deferred->promise();
} catch (\Throwable $exception) {
Expand Down Expand Up @@ -697,7 +708,7 @@ private function parser(): \Generator
$stream->serverWindow -= $length;
$stream->received += $length;

if ($stream->received >= $stream->maxBodySize && ($flags & self::END_STREAM) === "\0") {
if ($stream->received >= $stream->bodySizeLimit && ($flags & self::END_STREAM) === "\0") {
throw new Http2StreamException("Max body size exceeded", $id, self::CANCEL);
}

Expand Down Expand Up @@ -739,7 +750,7 @@ private function parser(): \Generator
}

$increment = \min(
$stream->maxBodySize - $stream->received - $stream->serverWindow,
$stream->bodySizeLimit - $stream->received - $stream->serverWindow,
self::MAX_INCREMENT
);
if ($increment <= 0) {
Expand Down Expand Up @@ -772,7 +783,9 @@ private function parser(): \Generator

unset($this->bodyEmitters[$id], $this->trailerDeferreds[$id]);

$stream->request->setAttribute(HarAttributes::TIME_COMPLETE, getCurrentTime());
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->completeRequest($stream->request);
}

$this->setupPingIfIdle();

Expand Down Expand Up @@ -1368,7 +1381,9 @@ private function parser(): \Generator

unset($this->bodyEmitters[$id], $this->trailerDeferreds[$id]);

$stream->request->setAttribute(HarAttributes::TIME_COMPLETE, getCurrentTime());
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->completeRequest($stream->request);
}

$this->setupPingIfIdle();

Expand Down Expand Up @@ -1416,7 +1431,9 @@ private function parser(): \Generator
$deferred = $this->pendingRequests[$id];
unset($this->pendingRequests[$id]);

$stream->request->setAttribute(HarAttributes::TIME_RECEIVE, getCurrentTime());
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($stream->request, $stream->applicationStream);
}

if ($stream->state & Http2Stream::REMOTE_CLOSED) {
$response = new Response(
Expand All @@ -1435,9 +1452,9 @@ private function parser(): \Generator
$this->trailerDeferreds[$id] = new Deferred;
$this->bodyEmitters[$id] = new Emitter;

if ($this->serverWindow <= $stream->maxBodySize >> 1) {
$increment = $stream->maxBodySize - $this->serverWindow;
$this->serverWindow = $stream->maxBodySize;
if ($this->serverWindow <= $stream->bodySizeLimit >> 1) {
$increment = $stream->bodySizeLimit - $this->serverWindow;
$this->serverWindow = $stream->bodySizeLimit;
$this->writeFrame(\pack("N", $increment), self::WINDOW_UPDATE, self::NOFLAG);
}

Expand Down
Loading

0 comments on commit d6be000

Please sign in to comment.