Skip to content

Commit

Permalink
Merge a694372 into be7b723
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Jan 17, 2020
2 parents be7b723 + a694372 commit 4d08dc8
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 38 deletions.
13 changes: 4 additions & 9 deletions src/Connection/Http1Connection.php
Expand Up @@ -279,18 +279,9 @@ private function readResponse(
$parser = new Http1Parser($request, $bodyCallback, $trailersCallback);

$start = getCurrentTime();
$firstRead = true;

try {
while (null !== $chunk = yield $this->socket->read()) {
if ($firstRead) {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($request, $stream);
}

$firstRead = false;
}

parseChunk:
$response = $parser->parse($chunk);
if ($response === null) {
Expand Down Expand Up @@ -334,6 +325,10 @@ private function readResponse(
goto parseChunk;
}

foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($request, $stream);
}

if ($status >= 200 && $status < 300 && $request->getMethod() === 'CONNECT') {
foreach ($request->getEventListeners() as $eventListener) {
yield $eventListener->completeReceivingResponse($request, $stream);
Expand Down
138 changes: 110 additions & 28 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Expand Up @@ -373,16 +373,6 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
return;
}

asyncCall(function () use ($stream, $streamId) {
try {
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($stream->request, $stream->stream);
}
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException("Event listener error", $streamId, Http2Parser::CANCEL));
}
});

$response = new Response(
'2',
$status,
Expand All @@ -396,18 +386,45 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
$onInformationalResponse = $stream->request->getInformationalResponseHandler();

if ($onInformationalResponse !== null) {
asyncCall(function () use ($onInformationalResponse, $response, $streamId) {
$stream->preResponseResolution = call(function () use (
$onInformationalResponse,
$response,
$stream,
$streamId
) {
yield $stream->preResponseResolution;

try {
yield call($onInformationalResponse, $response);
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException('Informational response handler threw an exception', $streamId, self::CANCEL));
$this->handleStreamException(new Http2StreamException(
'Informational response handler threw an exception',
$streamId,
Http2Parser::CANCEL
));
}
});
}

return;
}

\assert($stream->preResponseResolution === null);

$stream->preResponseResolution = call(function () use ($stream, $streamId) {
try {
foreach ($stream->request->getEventListeners() as $eventListener) {
yield $eventListener->startReceivingResponse($stream->request, $stream->stream);
}
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException(
"Event listener error",
$streamId,
Http2Parser::CANCEL
));
}
});

$stream->body = new Emitter;
$stream->trailers = new Deferred;

Expand All @@ -429,6 +446,9 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
$stream->pendingResponse->resolve(call(static function () use ($response, $stream) {
yield $stream->requestBodyCompletion->promise();

yield $stream->preResponseResolution;
$stream->preResponseResolution = null;

$stream->pendingResponse = null;

return $response;
Expand Down Expand Up @@ -467,7 +487,12 @@ public function handleHeaders(int $streamId, array $pseudo, array $headers, bool
return;
}

$this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL));
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
});

Expand Down Expand Up @@ -570,7 +595,10 @@ public function handlePushPromise(int $parentId, int $streamId, array $pseudo, a
"query" => $query,
]);
} catch (\Exception $exception) {
$this->handleConnectionException(new Http2ConnectionException("Invalid push URI", Http2Parser::PROTOCOL_ERROR));
$this->handleConnectionException(new Http2ConnectionException(
"Invalid push URI",
Http2Parser::PROTOCOL_ERROR
));

return;
}
Expand Down Expand Up @@ -607,7 +635,11 @@ static function () {
$stream->requestBodyCompletion->resolve();

if ($parentStream->request->getPushHandler() === null) {
$this->handleStreamException(new Http2StreamException("Push promise refused", $streamId, Http2Parser::CANCEL));
$this->handleStreamException(new Http2StreamException(
"Push promise refused",
$streamId,
Http2Parser::CANCEL
));

return;
}
Expand All @@ -626,7 +658,12 @@ static function () {
return;
}

$this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL));
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
});

Expand Down Expand Up @@ -686,7 +723,10 @@ public function handleStreamException(Http2StreamException $exception): void

public function handleConnectionException(Http2ConnectionException $exception): void
{
$this->shutdown(null, new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception));
$this->shutdown(
null,
new ClientHttp2ConnectionException($exception->getMessage(), $exception->getCode(), $exception)
);
}

public function handleData(int $streamId, string $data): void
Expand Down Expand Up @@ -714,7 +754,11 @@ public function handleData(int $streamId, string $data): void
$stream->received += $length;

if ($stream->received >= $stream->request->getBodySizeLimit()) {
$this->handleStreamException(new Http2StreamException("Body size limit exceeded", $streamId, Http2Parser::CANCEL));
$this->handleStreamException(new Http2StreamException(
"Body size limit exceeded",
$streamId,
Http2Parser::CANCEL
));

return;
}
Expand Down Expand Up @@ -789,7 +833,11 @@ public function handleStreamEnd(int $streamId): void

return new Trailers([]);
} catch (\Throwable $e) {
$this->handleStreamException(new Http2StreamException("Event listener error", $streamId, Http2Parser::CANCEL));
$this->handleStreamException(new Http2StreamException(
"Event listener error",
$streamId,
Http2Parser::CANCEL
));

throw $e;
}
Expand Down Expand Up @@ -884,7 +932,12 @@ public function request(Request $request, CancellationToken $cancellationToken,
return;
}

$this->writeFrame(Http2Parser::RST_STREAM, Http2Parser::NO_FLAG, $streamId, \pack("N", Http2Parser::CANCEL));
$this->writeFrame(
Http2Parser::RST_STREAM,
Http2Parser::NO_FLAG,
$streamId,
\pack("N", Http2Parser::CANCEL)
);
$this->releaseStream($streamId, $exception);
};

Expand Down Expand Up @@ -1041,8 +1094,12 @@ private function run(): \Generator
}
}

private function writeFrame(int $type, int $flags = Http2Parser::NO_FLAG, int $stream = 0, string $data = ''): Promise
{
private function writeFrame(
int $type,
int $flags = Http2Parser::NO_FLAG,
int $stream = 0,
string $data = ''
): Promise {
\assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data)));

/** @noinspection PhpUnhandledExceptionInspection */
Expand Down Expand Up @@ -1155,9 +1212,19 @@ private function writeBufferedData(Http2Stream $stream): Promise
}

if ($stream->requestBodyComplete) {
$promise = $this->writeFrame(Http2Parser::DATA, Http2Parser::END_STREAM, $stream->id, $stream->requestBodyBuffer);
$promise = $this->writeFrame(
Http2Parser::DATA,
Http2Parser::END_STREAM,
$stream->id,
$stream->requestBodyBuffer
);
} else {
$promise = $this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, $stream->requestBodyBuffer);
$promise = $this->writeFrame(
Http2Parser::DATA,
Http2Parser::NO_FLAG,
$stream->id,
$stream->requestBodyBuffer
);
}

$stream->requestBodyBuffer = "";
Expand All @@ -1180,7 +1247,12 @@ private function writeBufferedData(Http2Stream $stream): Promise
$this->clientWindow -= $windowSize;

for ($off = 0; $off < $end; $off += $this->frameSizeLimit) {
$this->writeFrame(Http2Parser::DATA, Http2Parser::NO_FLAG, $stream->id, \substr($data, $off, $this->frameSizeLimit));
$this->writeFrame(
Http2Parser::DATA,
Http2Parser::NO_FLAG,
$stream->id,
\substr($data, $off, $this->frameSizeLimit)
);
}

$promise = $this->writeFrame(
Expand Down Expand Up @@ -1332,7 +1404,7 @@ private function ping(): Promise
}

/**
* @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no
* @param int|null $lastId ID of last processed frame. Null to use the last opened frame ID or 0 if no
* streams have been opened.
* @param HttpException|null $reason
*
Expand All @@ -1347,7 +1419,12 @@ private function shutdown(?int $lastId = null, ?HttpException $reason = null): P
return call(function () use ($lastId, $reason) {
$code = $reason ? $reason->getCode() : Http2Parser::GRACEFUL_SHUTDOWN;
$lastId = $lastId ?? ($this->streamId > 0 ? $this->streamId : 0);
$goawayPromise = $this->writeFrame(Http2Parser::GOAWAY, Http2Parser::NO_FLAG, 0, \pack("NN", $lastId, $code));
$goawayPromise = $this->writeFrame(
Http2Parser::GOAWAY,
Http2Parser::NO_FLAG,
0,
\pack("NN", $lastId, $code)
);

if ($this->settings !== null) {
$settings = $this->settings;
Expand Down Expand Up @@ -1454,7 +1531,12 @@ private function increaseStreamWindow(Http2Stream $stream): void
}

if ($increase > 0) {
$this->writeFrame(Http2Parser::WINDOW_UPDATE, Http2Parser::NO_FLAG, $stream->id, \pack("N", self::WINDOW_INCREMENT));
$this->writeFrame(
Http2Parser::WINDOW_UPDATE,
Http2Parser::NO_FLAG,
$stream->id,
\pack("N", self::WINDOW_INCREMENT)
);
}
}
}
6 changes: 5 additions & 1 deletion src/Connection/Internal/Http2Stream.php
Expand Up @@ -10,6 +10,7 @@
use Amp\Http\Client\Internal\ForbidSerialization;
use Amp\Http\Client\Request;
use Amp\Http\Client\Response;
use Amp\Promise;
use Amp\Struct;

/**
Expand All @@ -32,9 +33,12 @@ final class Http2Stream
/** @var Response|null */
public $response;

/** @var Deferred */
/** @var Deferred|null */
public $pendingResponse;

/** @var Promise|null */
public $preResponseResolution;

/** @var bool */
public $responsePending = true;

Expand Down

0 comments on commit 4d08dc8

Please sign in to comment.