Skip to content

Commit

Permalink
Properly handle half closed streams
Browse files Browse the repository at this point in the history
  • Loading branch information
bwoebi committed Feb 25, 2016
1 parent cfa7fd3 commit 2c5f341
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 43 deletions.
6 changes: 5 additions & 1 deletion lib/Client.php
Expand Up @@ -5,6 +5,10 @@
use Amp\Struct;

class Client {
const CLOSED_RD = 1;
const CLOSED_WR = 2;
const CLOSED_RDWR = 3;

use Struct;
public $id;
public $socket;
Expand All @@ -23,7 +27,7 @@ class Client {
public $bufferPromisor;
public $onWriteDrain;
public $shouldClose;
public $isDead;
public $isDead = 0;
public $isExported;
public $remainingKeepAlives;
public $pendingResponses = 0;
Expand Down
24 changes: 15 additions & 9 deletions lib/Http1Driver.php
Expand Up @@ -124,7 +124,7 @@ public function writer(InternalRequest $ireq): \Generator {
($this->responseWriter)($client);
$msgs = "";

if ($client->isDead) {
if ($client->isDead & Client::CLOSED_WR) {
if (!$client->bufferPromisor) {
$client->bufferPromisor = new Deferred;
$client->bufferPromisor->fail(new ClientException);
Expand All @@ -146,18 +146,22 @@ public function writer(InternalRequest $ireq): \Generator {
} while (($msgPart = yield) !== null);

$client->writeBuffer .= $msgs;

// parserEmitLock check is required to prevent recursive continutation of the parser
if ($client->requestParser && !$client->parserEmitLock) {
$client->requestParser->send('');
}
$client->parserEmitLock = false;

($this->responseWriter)($client, $final = true);

if ($client->isDead) {
if ($client->isDead & Client::CLOSED_WR) {
if (!$client->bufferPromisor) {
$client->bufferPromisor = new Deferred;
$client->bufferPromisor->fail(new ClientException);
}
} else {
if ($client->requestParser && !$client->parserEmitLock) {
$client->requestParser->send('');
}
$client->parserEmitLock = false;
} elseif (($client->isDead & Client::CLOSED_RD) && $client->bodyPromisors) {
array_pop($client->bodyPromisors)->fail(new ClientException); // just one element with Http1Driver
}
}

Expand Down Expand Up @@ -191,15 +195,17 @@ public function parser(Client $client): \Generator {
];

if ($client->parserEmitLock) {
$client->parserEmitLock = false;
do {
if (\strlen($buffer) > $maxHeaderSize + $maxBodySize) {
\Amp\disable($client->readWatcher);
$client->parserEmitLock = false;
}

$buffer .= yield;
} while ($client->parserEmitLock);
\Amp\enable($client->readWatcher);
if (!($client->isDead & Client::CLOSED_RD)) {
\Amp\enable($client->readWatcher);
}
}

$client->parserEmitLock = true;
Expand Down
4 changes: 2 additions & 2 deletions lib/Http2Driver.php
Expand Up @@ -259,7 +259,7 @@ public function writer(InternalRequest $ireq): \Generator {
$msgs = "";
}

if ($client->isDead) {
if ($client->isDead & Client::CLOSED_WR) {
if (!$client->bufferPromisor) {
$client->bufferPromisor = new Deferred;
$client->bufferPromisor->fail(new ClientException);
Expand All @@ -272,7 +272,7 @@ public function writer(InternalRequest $ireq): \Generator {

$this->writeData($client, $msgs, $id, true);

if ($client->isDead && !$client->bufferPromisor) {
if (($client->isDead & Client::CLOSED_WR) && !$client->bufferPromisor) {
$client->bufferPromisor = new Failure(new ClientException);
}

Expand Down
55 changes: 36 additions & 19 deletions lib/Server.php
Expand Up @@ -453,10 +453,9 @@ private function writeResponse(Client $client, $final = false) {
}

private function onResponseDataDone(Client $client) {
if ($client->shouldClose) {
if ($client->shouldClose || (--$client->pendingResponses == 0 && $client->isDead == Client::CLOSED_RD)) {
$this->close($client);
} elseif (!$client->isDead) {
$client->pendingResponses--;
} elseif (!($client->isDead & Client::CLOSED_RD)) {
$this->renewKeepAliveTimeout($client);
}
}
Expand All @@ -465,10 +464,20 @@ private function onWritable(string $watcherId, $socket, $client) {
$bytesWritten = @\fwrite($socket, $client->writeBuffer);
if ($bytesWritten === false) {
if (!\is_resource($socket) || @\feof($socket)) {
$client->isDead = true;
$this->close($client);
if ($client->isDead == Client::CLOSED_RD) {
$this->close($client);
} else {
$client->isDead = Client::CLOSED_WR;
$client->writeWatcher = null;
\Amp\cancel($watcherId);
}
}
} else {
$client->bufferSize -= $bytesWritten;
if ($client->bufferPromisor && $client->bufferSize <= $client->options->softStreamCap) {
$client->bufferPromisor->succeed();
$client->bufferPromisor = null;
}
if ($bytesWritten === \strlen($client->writeBuffer)) {
$client->writeBuffer = "";
\Amp\disable($watcherId);
Expand All @@ -479,11 +488,6 @@ private function onWritable(string $watcherId, $socket, $client) {
$client->writeBuffer = \substr($client->writeBuffer, $bytesWritten);
\Amp\enable($watcherId);
}
$client->bufferSize -= $bytesWritten;
if ($client->bufferPromisor && $client->bufferSize <= $client->options->softStreamCap) {
$client->bufferPromisor->succeed();
$client->bufferPromisor = null;
}
}
}

Expand All @@ -495,6 +499,7 @@ private function timeoutKeepAlives(int $now) {
if ($client->pendingResponses > \count($client->bodyPromisors)) {
$this->clearKeepAliveTimeout($client); // renewed when response ends
} else {
// timeouts are only active while Client is doing nothing (not sending nor receving) and no pending writes, hence we can just fully close here
$this->close($client);
}
} else {
Expand Down Expand Up @@ -522,8 +527,20 @@ private function onReadable(string $watcherId, $socket, $client) {
$data = @\fread($socket, $this->options->ioGranularity);
if ($data == "") {
if (!\is_resource($socket) || @\feof($socket)) {
$client->isDead = true;
$this->close($client);
if ($client->isDead == Client::CLOSED_WR || $client->pendingResponses == 0) {
$this->close($client);
} else {
$client->isDead = Client::CLOSED_RD;
\Amp\cancel($watcherId);
$client->readWatcher = null;
if ($client->bodyPromisors) {
$ex = new ClientException;
foreach ($client->bodyPromisors as $promisor) {
$promisor->fail($ex);
}
$client->bodyPromisors = [];
}
}
}
return;
}
Expand Down Expand Up @@ -793,7 +810,7 @@ private function initializeResponse(InternalRequest $ireq): Response {

private function onCoroutineAppResolve($error, $result, $ireq) {
if (empty($error)) {
if ($ireq->client->isExported || $ireq->client->isDead) {
if ($ireq->client->isExported || ($ireq->client->isDead & Client::CLOSED_WR)) {
return;
} elseif ($ireq->response->state() & Response::STARTED) {
$ireq->response->end();
Expand All @@ -814,7 +831,7 @@ private function onCoroutineAppResolve($error, $result, $ireq) {
private function onApplicationError(\Throwable $error, InternalRequest $ireq) {
$this->logger->error($error);

if ($ireq->client->isDead || $ireq->client->isExported) {
if (($ireq->client->isDead & Client::CLOSED_WR) || $ireq->client->isExported) {
// Responder actions may catch an initial ClientException and continue
// doing further work. If an error arises at this point our only option
// is to log the error (which we just did above).
Expand Down Expand Up @@ -875,10 +892,10 @@ private function tryErrorResponse(\Throwable $error, InternalRequest $ireq) {

private function close(Client $client) {
$this->clear($client);
if (!$client->isDead) {
@fclose($client->socket);
$client->isDead = true;
}
assert($client->isDead != Client::CLOSED_RDWR);
@fclose($client->socket);
$client->isDead = Client::CLOSED_RDWR;

$this->clientCount--;
$net = @\inet_pton($client->clientAddr);
if (isset($net[4])) {
Expand Down Expand Up @@ -912,7 +929,7 @@ private function clear(Client $client) {
}

private function export(Client $client): \Closure {
$client->isDead = true;
$client->isDead = Client::CLOSED_RDWR;
$client->isExported = true;
$this->clear($client);

Expand Down
40 changes: 28 additions & 12 deletions test/ServerTest.php
Expand Up @@ -15,6 +15,8 @@
use Aerys\VhostContainer;
use Amp\Socket as sock;

// @TODO test communication on half-closed streams (both ways) [also with yield message] (also with HTTP/1 pipelining...)

class ServerTest extends \PHPUnit_Framework_TestCase {
function tryRequest($emit, $responder, $middlewares = []) {
$gen = $this->tryIterativeRequest($responder, $middlewares);
Expand Down Expand Up @@ -360,21 +362,34 @@ function testEncryptedIO() {
\Amp\run(function() {
$deferred = new \Amp\Deferred;
list($address) = yield from $this->startServer(function (Client $client, $write) use ($deferred) {
$this->assertTrue($client->isEncrypted);
$this->assertNotTrue($client->isDead);

do {
$dump = ($dump ?? "") . yield;
} while (strlen($dump) <= 65537);
$this->assertEquals("1a", substr($dump, -2));
$client->writeBuffer = "b";
$write($client, true);

try {
$this->assertTrue($client->isEncrypted);
$this->assertEquals(0, $client->isDead);

do {
$dump = ($dump ?? "") . yield;
} while (strlen($dump) <= 65537);
$this->assertEquals("1a", substr($dump, -2));
$client->writeBuffer = "b";
$client->pendingResponses = 1;
$write($client, true);
yield;
} catch (\Throwable $e) {
$deferred->fail($e);
} finally {
$this->assertTrue($client->isDead);
$deferred->succeed();
if (isset($e)) {
return;
}
\Amp\immediately(function() use ($client, $deferred) {
try {
$this->assertEquals(Client::CLOSED_RDWR, $client->isDead);
} catch (\Throwable $e) {
$deferred->fail($e);
}
if (empty($e)) {
$deferred->succeed();
}
});
}
}, true);

Expand All @@ -384,6 +399,7 @@ function testEncryptedIO() {
yield $client->write("a");
$this->assertEquals("b", yield $client->read(1));
$client->close();

yield $deferred->promise();
\Amp\stop();
\Amp\reactor(\Amp\driver());
Expand Down

0 comments on commit 2c5f341

Please sign in to comment.