Skip to content

Commit

Permalink
Merge 73b2dc5 into 9f60974
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Mar 12, 2018
2 parents 9f60974 + 73b2dc5 commit 7f92f6f
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 0 deletions.
2 changes: 2 additions & 0 deletions lib/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
* while (($chunk = yield $stream->read()) !== null) {
* // Immediately use $chunk, reducing memory consumption since the entire message is never buffered.
* }
*
* @deprecated Use Amp\ByteStream\Payload instead.
*/
class Message implements InputStream, Promise {
/** @var InputStream */
Expand Down
86 changes: 86 additions & 0 deletions lib/Payload.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php

namespace Amp\ByteStream;

use Amp\Coroutine;
use Amp\Promise;
use function Amp\call;

/**
* Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may
* be buffered and accessed in its entirety by calling buffer(). Once buffering is requested through buffer(), the
* stream cannot be read in chunks. On destruct any remaining data is read from the InputStream given to this class.
*/
class Payload implements InputStream {
/** @var InputStream */
private $stream;

/** @var \Amp\Promise|null */
private $promise;

/** @var \Amp\Promise|null */
private $lastRead;

/**
* @param \Amp\ByteStream\InputStream $stream
*/
public function __construct(InputStream $stream) {
$this->stream = $stream;
}

public function __destruct() {
if (!$this->promise) {
Promise\rethrow(new Coroutine($this->consume()));
}
}

private function consume(): \Generator {
try {
if ($this->lastRead && null === yield $this->lastRead) {
return;
}

while (null !== yield $this->stream->read()) {
// Discard unread bytes from message.
}
} catch (\Throwable $exception) {
// If exception is thrown here the connection closed anyway.
}
}

/**
* @inheritdoc
*
* @throws \Error If a buffered message was requested by calling buffer().
*/
final public function read(): Promise {
if ($this->promise) {
throw new \Error("Cannot stream message data once a buffered message has been requested");
}

return $this->lastRead = $this->stream->read();
}

/**
* Buffers the entire message and resolves the returned promise then.
*
* @return Promise<string> Resolves with the entire message contents.
*/
final public function buffer(): Promise {
if ($this->promise) {
return $this->promise;
}

return $this->promise = call(function () {
$buffer = '';
if ($this->lastRead && null === yield $this->lastRead) {
return $buffer;
}

while (null !== $chunk = yield $this->stream->read()) {
$buffer .= $chunk;
}
return $buffer;
});
}
}
255 changes: 255 additions & 0 deletions test/PayloadTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
<?php

namespace Amp\ByteStream\Test;

use Amp\ByteStream\InMemoryStream;
use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\Payload;
use Amp\ByteStream\PendingReadError;
use Amp\Emitter;
use Amp\Loop;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\TestException;

class PayloadTest extends TestCase {
public function testBufferingAll() {
Loop::run(function () {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

foreach ($values as $value) {
$emitter->emit($value);
}

$emitter->complete();

$this->assertSame(\implode($values), yield $stream->buffer());
});
}

public function testFullStreamConsumption() {
Loop::run(function () use (&$invoked) {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

foreach ($values as $value) {
$emitter->emit($value);
}

Loop::delay(5, function () use ($emitter) {
$emitter->complete();
});

$buffer = "";
while (($chunk = yield $stream->read()) !== null) {
$buffer .= $chunk;
}

$this->assertSame(\implode($values), $buffer);
$this->assertSame("", yield $stream->buffer());
});
}

public function testFastResolvingStream() {
Loop::run(function () {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

foreach ($values as $value) {
$emitter->emit($value);
}

$emitter->complete();

$emitted = [];
while (($chunk = yield $stream->read()) !== null) {
$emitted[] = $chunk;
}

$this->assertSame($values, $emitted);
$this->assertSame("", yield $stream->buffer());
});
}

public function testFastResolvingStreamBufferingOnly() {
Loop::run(function () {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

foreach ($values as $value) {
$emitter->emit($value);
}

$emitter->complete();

$this->assertSame(\implode($values), yield $stream->buffer());
});
}

public function testPartialStreamConsumption() {
Loop::run(function () {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

$emitter->emit($values[0]);

$chunk = yield $stream->read();

$this->assertSame(\array_shift($values), $chunk);

foreach ($values as $value) {
$emitter->emit($value);
}

$emitter->complete();

$this->assertSame(\implode($values), yield $stream->buffer());
});
}

public function testFailingStream() {
Loop::run(function () {
$exception = new TestException;
$value = "abc";

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

$emitter->emit($value);
$emitter->fail($exception);

$callable = $this->createCallback(1);

try {
while (($chunk = yield $stream->read()) !== null) {
$this->assertSame($value, $chunk);
}

$this->fail("No exception has been thrown");
} catch (TestException $reason) {
$this->assertSame($exception, $reason);
$callable(); // <-- ensure this point is reached
}
});
}

public function testFailingStreamWithPendingRead() {
Loop::run(function () {
$exception = new TestException;
$value = "abc";

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

$readPromise = $stream->read();
$emitter->fail($exception);

$callable = $this->createCallback(1);

try {
yield $readPromise;

$this->fail("No exception has been thrown");
} catch (TestException $reason) {
$this->assertSame($exception, $reason);
$callable(); // <-- ensure this point is reached
}
});
}

public function testEmptyStream() {
Loop::run(function () {
$emitter = new Emitter;
$emitter->complete();
$stream = new Payload(new IteratorStream($emitter->iterate()));

$this->assertNull(yield $stream->read());
});
}

public function testEmptyStringStream() {
Loop::run(function () {
$value = "";

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

$emitter->emit($value);

$emitter->complete();

$this->assertSame("", yield $stream->buffer());
});
}

public function testReadAfterCompletion() {
Loop::run(function () {
$value = "abc";

$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

$emitter->emit($value);
$emitter->complete();

$this->assertSame($value, yield $stream->read());
$this->assertNull(yield $stream->read());
});
}

public function testPendingRead() {
Loop::run(function () {
$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));

Loop::delay(0, function () use ($emitter) {
$emitter->emit("test");
});

$this->assertSame("test", yield $stream->read());
});
}

public function testPendingReadError() {
Loop::run(function () {
$emitter = new Emitter;
$stream = new Payload(new IteratorStream($emitter->iterate()));
$stream->read();

$this->expectException(PendingReadError::class);

$stream->read();
});
}

public function testReadAfterBuffer() {
Loop::run(function () {
$stream = new Payload(new InMemoryStream("test"));
$stream->buffer();

$this->expectException(\Error::class);
$this->expectExceptionMessage("Cannot stream message data once a buffered message has been requested");

yield $stream->read();
});
}

public function testFurtherCallsToBufferReturnSameData() {
Loop::run(function () {
$data = "test";
$stream = new Payload(new InMemoryStream($data));
$this->assertSame($data, yield $stream->buffer());
$this->assertSame($data, yield $stream->buffer());
});
}
}

0 comments on commit 7f92f6f

Please sign in to comment.