Skip to content

Commit

Permalink
Fail streaming messages correctly on read failures
Browse files Browse the repository at this point in the history
Fixes #18.
  • Loading branch information
kelunik committed Jun 28, 2017
1 parent 704adf7 commit 4777508
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
13 changes: 13 additions & 0 deletions lib/Message.php
Expand Up @@ -4,6 +4,7 @@

use Amp\Coroutine;
use Amp\Deferred;
use Amp\Failure;
use Amp\Promise;
use Amp\Success;

Expand Down Expand Up @@ -49,6 +50,9 @@ class Message implements InputStream, Promise {
/** @var bool True if the iterator has completed. */
private $complete = false;

/** @var \Throwable Used to fail future reads on failure. */
private $error;

/**
* @param InputStream $source An iterator that only emits strings.
*/
Expand Down Expand Up @@ -95,6 +99,15 @@ final public function read(): Promise {

if ($this->coroutine === null) {
$this->coroutine = new Coroutine($this->consume());
$this->coroutine->onResolve(function ($error) {
if ($error) {
$this->error = $error;
}
});
}

if ($this->error) {
return new Failure($this->error);
}

if ($this->buffer !== "") {
Expand Down
10 changes: 8 additions & 2 deletions test/MessageTest.php
Expand Up @@ -9,6 +9,7 @@
use Amp\Emitter;
use Amp\Loop;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\TestException;

class MessageTest extends TestCase {
public function testBufferingAll() {
Expand Down Expand Up @@ -118,7 +119,7 @@ public function testPartialStreamConsumption() {

public function testFailingStream() {
Loop::run(function () {
$exception = new \Exception;
$exception = new TestException;
$value = "abc";

$emitter = new Emitter;
Expand All @@ -127,12 +128,17 @@ public function testFailingStream() {
$emitter->emit($value);
$emitter->fail($exception);

$callable = $this->createCallback(1);

try {
while (($chunk = yield $stream->read()) !== null) {
$this->assertSame($value, $chunk);
}
} catch (\Exception $reason) {

$this->fail("No exception has been thrown");
} catch (TestException $reason) {
$this->assertSame($exception, $reason);
$callable(); // <-- ensure this point is reached
}
});
}
Expand Down

0 comments on commit 4777508

Please sign in to comment.