Skip to content

Commit

Permalink
Add support for switching protocol responses
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 12, 2019
1 parent 8f83ba8 commit f669532
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 7 deletions.
33 changes: 27 additions & 6 deletions src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
use Amp\Loop;
use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use Amp\Socket\Socket;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;
use Amp\Success;
Expand All @@ -50,7 +49,7 @@ final class Http1Connection implements Connection

private const PROTOCOL_VERSIONS = ['1.0', '1.1'];

/** @var Socket */
/** @var EncryptableSocket */
private $socket;

/** @var bool */
Expand All @@ -74,7 +73,7 @@ final class Http1Connection implements Connection
/** @var int */
private $estimatedClose;

public function __construct(Socket $socket, int $timeoutGracePeriod)
public function __construct(EncryptableSocket $socket, int $timeoutGracePeriod)
{
$this->socket = $socket;

Expand Down Expand Up @@ -102,15 +101,19 @@ public function onClose(callable $onClose): void
}

public function close(): Promise
{
$this->socket->close();
return $this->free();
}

private function free(): Promise
{
$this->estimatedClose = 0;

if ($this->timeoutWatcher !== null) {
Loop::cancel($this->timeoutWatcher);
}

$this->socket->close();

if ($this->onClose !== null) {
$onClose = $this->onClose;
$this->onClose = null;
Expand Down Expand Up @@ -273,7 +276,25 @@ private function readResponse(
continue;
}

if ($response->getStatus() < Http\Status::OK) {
$status = $response->getStatus();

if ($status === Http\Status::SWITCHING_PROTOCOLS) {
if (($onUpgrade = $request->getUpgradeHandler()) === null) {
throw new HttpException('Received Switching Protocols response without upgrade handler callback');
}

$socket = new UpgradedSocket($this->socket, $parser->getBuffer());

asyncCall($onUpgrade, $socket, clone $request, $response);

$this->free(); // Close this connection without closing socket.

$trailersDeferred->resolve($trailers);

return $response;
}

if ($status < Http\Status::OK) {
$chunk = $parser->getBuffer();
$parser = new Http1Parser($request, $bodyCallback, $trailersCallback);
goto parseChunk;
Expand Down
111 changes: 111 additions & 0 deletions src/Connection/UpgradedSocket.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?php

namespace Amp\Http\Client\Connection;

use Amp\CancellationToken;
use Amp\Promise;
use Amp\Socket\EncryptableSocket;
use Amp\Socket\ResourceSocket;
use Amp\Socket\SocketAddress;
use Amp\Socket\TlsInfo;
use Amp\Success;

final class UpgradedSocket implements EncryptableSocket
{
/** @var ResourceSocket */
private $socket;

/** @var string|null */
private $buffer;

/**
* @param EncryptableSocket $socket
* @param string $buffer Remaining buffer previously read from the socket.
*/
public function __construct(EncryptableSocket $socket, string $buffer)
{
$this->socket = $socket;
$this->buffer = $buffer !== '' ? $buffer : null;
}

public function read(): Promise
{
if ($this->buffer !== null) {
$buffer = $this->buffer;
$this->buffer = null;
return new Success($buffer);
}

return $this->socket->read();
}

public function close(): void
{
$this->socket->close();
}

public function __destruct()
{
$this->close();
}

public function write(string $data): Promise
{
return $this->socket->write($data);
}

public function end(string $finalData = ""): Promise
{
return $this->socket->end($finalData);
}

public function reference(): void
{
$this->socket->reference();
}

public function unreference(): void
{
$this->socket->unreference();
}

public function isClosed(): bool
{
return $this->socket->isClosed();
}

public function getLocalAddress(): SocketAddress
{
return $this->socket->getLocalAddress();
}

public function getRemoteAddress(): SocketAddress
{
return $this->socket->getRemoteAddress();
}

public function getResource()
{
return $this->socket->getResource();
}

public function setupTls(?CancellationToken $token = null): Promise
{
return $this->socket->setupTls($token);
}

public function shutdownTls(?CancellationToken $token = null): Promise
{
return $this->socket->shutdownTls();
}

public function getTlsState(): int
{
return $this->socket->getTlsState();
}

public function getTlsInfo(): ?TlsInfo
{
return $this->socket->getTlsInfo();
}
}
20 changes: 19 additions & 1 deletion src/Request.php
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ private static function clone($value)
/** @var callable|null */
private $onPush;

/** @var callable|null */
private $onUpgrade;

/** @var mixed[] */
private $attributes = [];

Expand Down Expand Up @@ -247,7 +250,7 @@ public function setBody($body): void
*
* @param callable|null $onPush
*/
public function setPushHandler(?callable $onPush = null): void
public function setPushHandler(?callable $onPush): void
{
$this->onPush = $onPush;
}
Expand Down Expand Up @@ -284,6 +287,21 @@ public function getPushHandler(): ?callable
return $this->onPush;
}

/**
* Registers a callback invoked if a 101 response is returned to the request.
*
* @param callable|null $onUpgrade
*/
public function setUpgradeHandler(callable $onUpgrade): void
{
$this->onUpgrade = $onUpgrade;
}

public function getUpgradeHandler(): ?callable
{
return $this->onUpgrade;
}

/**
* @return int Timeout in milliseconds for the TCP connection.
*/
Expand Down
33 changes: 33 additions & 0 deletions test/Connection/Http1ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,39 @@ public function test100Continue(): \Generator
$this->assertSame('Nothing to send', $response->getReason());
}

public function testUpgrade(): \Generator
{
[$server, $client] = Socket\createPair();

$connection = new Http1Connection($client, 5000);

$socketData = "Data that should be sent after the upgrade response";

$invoked = false;
$callback = function (Socket\EncryptableSocket $socket, Request $request, Response $response) use (&$invoked, $socketData) {
$invoked = true;
$this->assertSame(101, $response->getStatus());
$this->assertSame($socketData, yield $socket->read());
};

$request = new Request('http://httpbin.org/upgrade', 'GET');
$request->setHeader('connection', 'upgrade');
$request->setUpgradeHandler($callback);

/** @var Stream $stream */
$stream = yield $connection->getStream($request);

$server->write("HTTP/1.1 101 Switching Protocols\r\nConnection: Upgrade\r\n\r\n" . $socketData);

/** @var Response $response */
$response = yield $stream->request($request, new NullCancellationToken);

$this->assertTrue($invoked);
$this->assertSame(101, $response->getStatus());
$this->assertSame('Switching Protocols', $response->getReason());
$this->assertSame([], (yield $response->getTrailers())->getHeaders());
}

private function createSlowBody()
{
return new class implements RequestBody {
Expand Down

0 comments on commit f669532

Please sign in to comment.