Skip to content

Commit

Permalink
Use internal class for upgraded stream writing
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 25, 2019
1 parent 81f0b92 commit 3de3281
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/Connection/Http1Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ private function readResponse(
goto parseChunk;
}

if ($status >= Http\Status::OK && $status < Http\Status::MULTIPLE_CHOICES && $request->getMethod() === 'CONNECT') {
if ($status >= 200 && $status < 300 && $request->getMethod() === 'CONNECT') {
$trailersDeferred->resolve($trailers);
return $this->handleUpgradeResponse($request, $response, $parser->getBuffer());
}
Expand Down
22 changes: 12 additions & 10 deletions src/Connection/Http2Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -1480,7 +1480,7 @@ static function () {
} else {
$this->bodyEmitters[$id] = new Emitter;

if ($status >= Status::OK && $status < Status::MULTIPLE_CHOICES && $stream->request->getMethod() === 'CONNECT') {
if ($status >= 200 && $status < 300 && $stream->request->getMethod() === 'CONNECT') {
if (($onUpgrade = $stream->request->getUpgradeHandler()) === null) {
throw new Http2StreamException('CONNECT or upgrade request made without upgrade handler callback', $id, self::CANCEL);
}
Expand All @@ -1498,17 +1498,19 @@ static function () {

$socket = new UpgradedStream(
new IteratorStream($this->bodyEmitters[$id]->iterate()),
function (string $data, bool $final) use ($id): Promise {
if (!isset($this->streams[$id])) {
return new Failure(new SocketException('Stream closed'));
}
new Internal\Http2UpgradeOutputStream(
function (string $data, bool $final) use ($id): Promise {
if (!isset($this->streams[$id])) {
return new Failure(new SocketException('Stream closed'));
}

if ($final) {
$this->streams[$id]->state |= Http2Stream::LOCAL_CLOSED;
}
if ($final) {
$this->streams[$id]->state |= Http2Stream::LOCAL_CLOSED;
}

return $this->writeData($data, $id);
},
return $this->writeData($data, $id);
}
),
$this->socket->getLocalAddress(),
$this->socket->getRemoteAddress()
);
Expand Down
45 changes: 45 additions & 0 deletions src/Connection/Internal/Http2UpgradeOutputStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<?php

namespace Amp\Http\Client\Connection\Internal;

use Amp\ByteStream\ClosedException;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\StreamException;
use Amp\Http\Client\HttpException;
use Amp\Promise;
use function Amp\call;

class Http2UpgradeOutputStream implements OutputStream
{
private $write;

public function __construct(callable $write)
{
$this->write = $write;
}

public function write(string $data): Promise
{
return $this->send($data, false);
}

public function end(string $finalData = ""): Promise
{
return $this->send($finalData, true);
}

private function send(string $data, bool $final): Promise
{
return call(function () use ($data, $final): \Generator {
if ($this->write === null) {
throw new ClosedException('The socket is no longer writable');
}

try {
return yield call($this->write, $data, $final);
} catch (HttpException $exception) {
throw new StreamException('An error occurred while writing to the tunnelled socket', 0, $exception);
}
});
}
}
33 changes: 12 additions & 21 deletions src/Connection/UpgradedStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@
namespace Amp\Http\Client\Connection;

use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\Failure;
use Amp\Http\Client\HttpException;
use Amp\Http\Client\Internal\ForbidCloning;
use Amp\Http\Client\Internal\ForbidSerialization;
use Amp\Promise;
use Amp\Socket\Socket;
use Amp\Socket\SocketAddress;
use Amp\Socket\SocketException;
use function Amp\call;

final class UpgradedStream implements Socket
{
Expand All @@ -23,7 +22,7 @@ final class UpgradedStream implements Socket
private $localAddress;
private $remoteAddress;

public function __construct(InputStream $read, callable $write, SocketAddress $local, SocketAddress $remote)
public function __construct(InputStream $read, OutputStream $write, SocketAddress $local, SocketAddress $remote)
{
$this->read = $read;
$this->write = $write;
Expand All @@ -43,7 +42,7 @@ public function read(): Promise
public function close(): void
{
if ($this->write !== null) {
($this->write)('', true);
$this->write->end();
}

$this->read = null;
Expand All @@ -57,28 +56,20 @@ public function __destruct()

public function write(string $data): Promise
{
return $this->send($data, false);
}

if ($this->write === null) {
return new Failure(new SocketException('The socket is no longer writable'));
}

public function send(string $data, bool $final): Promise
{
return call(function () use ($data, $final): \Generator {
if ($this->write === null) {
throw new SocketException('The socket is no longer writable');
}

try {
return yield call($this->write, $data, $final);
} catch (HttpException $exception) {
throw new SocketException('An error occurred while writing to the tunnelled socket', 0, $exception);
}
});
return $this->write->write($data);
}

public function end(string $finalData = ""): Promise
{
return $this->send($finalData, true);
if ($this->write === null) {
return new Failure(new SocketException('The socket is no longer writable'));
}

return $this->write->end($finalData);
}

public function reference(): void
Expand Down

0 comments on commit 3de3281

Please sign in to comment.