Skip to content

Commit

Permalink
Write frames asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Oct 27, 2020
1 parent e3e45b3 commit 8f7f8c0
Showing 1 changed file with 53 additions and 12 deletions.
65 changes: 53 additions & 12 deletions src/Connection/Internal/Http2ConnectionProcessor.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
use Amp\Http\InvalidHeaderException;
use Amp\Http\Status;
use Amp\Loop;
use Amp\Pipeline;
use Amp\PipelineSource;
use Amp\Promise;
use Amp\Socket\EncryptableSocket;
Expand Down Expand Up @@ -104,10 +105,15 @@ final class Http2ConnectionProcessor implements Http2Processor

private int|null $shutdown = null;

private PipelineSource $frameQueueSource;
private Pipeline $frameQueue;

public function __construct(EncryptableSocket $socket)
{
$this->socket = $socket;
$this->hpack = new HPack;
$this->frameQueueSource = new PipelineSource;
$this->frameQueue = $this->frameQueueSource->pipe();
}

public function isInitialized(): bool
Expand Down Expand Up @@ -1070,14 +1076,13 @@ public function request(Request $request, CancellationToken $cancellationToken,
$lastChunk = \array_pop($split);

// Use async for each write to ensure no other frames are written to the connection.
async(fn() => $this->writeFrame(Http2Parser::HEADERS, Http2Parser::NO_FLAG, $streamId, $firstChunk));
$this->writeFrame(Http2Parser::HEADERS, Http2Parser::NO_FLAG, $streamId, $firstChunk);

foreach ($split as $headerChunk) {
async(fn() => $this->writeFrame(Http2Parser::CONTINUATION, Http2Parser::NO_FLAG, $streamId, $headerChunk));
$this->writeFrame(Http2Parser::CONTINUATION, Http2Parser::NO_FLAG, $streamId, $headerChunk);
}

// Use async for last write to keep ordering, but await completion of write.
await(async(fn() => $this->writeFrame(Http2Parser::CONTINUATION, $flag, $streamId, $lastChunk)));
$this->writeFrame(Http2Parser::CONTINUATION, $flag, $streamId, $lastChunk);
} else {
$this->writeFrame(Http2Parser::HEADERS, $flag, $streamId, $headers);
}
Expand Down Expand Up @@ -1181,6 +1186,23 @@ private function run(): void
self::DEFAULT_MAX_FRAME_SIZE
)
);
} catch (\Throwable $e) {
/**
* @psalm-suppress DeprecatedClass
* @noinspection PhpDeprecationInspection
*/
$this->shutdown(new ClientHttp2ConnectionException(
"The HTTP/2 connection closed" . ($this->shutdown !== null ? ' unexpectedly' : ''),
$this->shutdown ?? Http2Parser::GRACEFUL_SHUTDOWN
), 0);

$this->close();

return;
}

try {
defer(fn() => $this->runWriteThread());

$parser = (new Http2Parser($this))->parse();

Expand Down Expand Up @@ -1221,15 +1243,13 @@ private function writeFrame(
int $stream = 0,
string $data = ''
): void {
\assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data)));
if ($this->hasWriteError) {
return;
}

try {
$this->socket->write(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data);
} catch (\Throwable $e) {
$this->hasWriteError = true;
\assert(Http2Parser::logDebugFrame('send', $type, $flags, $stream, \strlen($data)));

throw $e;
}
$this->frameQueueSource->emit(\substr(\pack("NccN", \strlen($data), $type, $flags, $stream), 1) . $data);
}

private function applySetting(int $setting, int $value): void
Expand Down Expand Up @@ -1268,7 +1288,7 @@ private function applySetting(int $setting, int $value): void
try {
$this->writeBufferedData($stream);
} catch (\Throwable $exception) {
$this->close();
$this->shutdown(new SocketException('Failed to write to socket'));
}
}
});
Expand Down Expand Up @@ -1701,4 +1721,25 @@ private function createStreamInactivityWatcher(int $streamId, int $timeout): ?st

return $watcher;
}

private function runWriteThread(): void
{
try {
while (null !== $frame = $this->frameQueue->continue()) {
$this->socket->write($frame);
}
} catch (\Throwable $exception) {
$this->hasWriteError = true;

/**
* @psalm-suppress DeprecatedClass
* @noinspection PhpDeprecationInspection
*/
$this->shutdown(new ClientHttp2ConnectionException(
"The HTTP/2 connection closed unexpectedly: " . $exception->getMessage(),
Http2Parser::INTERNAL_ERROR,
$exception
), \max(0, $this->streamId));
}
}
}

0 comments on commit 8f7f8c0

Please sign in to comment.