Skip to content

Commit

Permalink
Merge pull request #9 from amphp/message-input-stream
Browse files Browse the repository at this point in the history
Make Message accept an InputStream instead of an Iterator
  • Loading branch information
kelunik committed May 15, 2017
2 parents 4f4c9f1 + 71745c6 commit ad249a8
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 21 deletions.
32 changes: 32 additions & 0 deletions lib/IteratorStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?php

namespace Amp\ByteStream;

use Amp\Deferred;
use Amp\Iterator;
use Amp\Promise;

class IteratorStream implements InputStream {
private $iterator;

public function __construct(Iterator $iterator) {
$this->iterator = $iterator;
}

/** @inheritdoc */
public function read(): Promise {
$deferred = new Deferred;

$this->iterator->advance()->onResolve(function ($error, $hasNextElement) use ($deferred) {
if ($error) {
$deferred->fail($error);
} elseif ($hasNextElement) {
$deferred->resolve($this->iterator->getCurrent());
} else {
$deferred->resolve(null);
}
});

return $deferred->promise();
}
}
48 changes: 35 additions & 13 deletions lib/Message.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,30 @@

use Amp\Coroutine;
use Amp\Deferred;
use Amp\Iterator;
use Amp\Promise;
use Amp\Success;

/**
* Creates a buffered message from an Iterator. The message can be consumed in chunks using the read() API or it may be
* buffered and accessed in its entirety by waiting for the promise to resolve.
* Creates a buffered message from an InputStream. The message can be consumed in chunks using the read() API or it may
* be buffered and accessed in its entirety by waiting for the promise to resolve.
*
* Buffering Example:
*
* $stream = new Message($iterator); // $iterator is an instance of \Amp\Iterator emitting only strings.
* $stream = new Message($inputStream);
* $content = yield $stream;
*
* Streaming Example:
*
* $stream = new Message($iterator); // $iterator is an instance of \Amp\Iterator emitting only strings.
* $stream = new Message($inputStream);
*
* while (($chunk = yield $stream->read()) !== null) {
* // Immediately use $chunk, reducing memory consumption since the entire message is never buffered.
* }
*/
class Message implements InputStream, Promise {
/** @var InputStream */
private $source;

/** @var string */
private $buffer = "";

Expand All @@ -45,15 +47,15 @@ class Message implements InputStream, Promise {
private $complete = false;

/**
* @param \Amp\Iterator $iterator An iterator that only emits strings.
* @param InputStream $source An iterator that only emits strings.
*/
public function __construct(Iterator $iterator) {
$this->coroutine = new Coroutine($this->iterate($iterator));
public function __construct(InputStream $source) {
$this->source = $source;
}

private function iterate(Iterator $iterator): \Generator {
while (yield $iterator->advance()) {
$buffer = $this->buffer .= $iterator->getCurrent();
private function consume(): \Generator {
while (($chunk = yield $this->source->read()) !== null) {
$buffer = $this->buffer .= $chunk;

if ($buffer === "") {
continue; // Do not succeed reads with empty string.
Expand All @@ -62,12 +64,12 @@ private function iterate(Iterator $iterator): \Generator {
$this->pendingRead = null;
$this->buffer = "";
$deferred->resolve($buffer);
$buffer = ""; // Destroy last emitted chunk to free memory.
} elseif (!$this->buffering) {
$buffer = ""; // Destroy last emitted chunk to free memory.
$this->backpressure = new Deferred;
yield $this->backpressure->promise();
}

$buffer = ""; // Destroy last emitted chunk to free memory.
}

$this->complete = true;
Expand All @@ -87,6 +89,10 @@ public function read(): Promise {
throw new PendingReadError;
}

if ($this->coroutine === null) {
$this->coroutine = new Coroutine($this->consume());
}

if ($this->buffer !== "") {
$buffer = $this->buffer;
$this->buffer = "";
Expand Down Expand Up @@ -114,6 +120,10 @@ public function read(): Promise {
public function onResolve(callable $onResolved) {
$this->buffering = true;

if ($this->coroutine === null) {
$this->coroutine = new Coroutine($this->consume());
}

if ($this->backpressure) {
$backpressure = $this->backpressure;
$this->backpressure = null;
Expand All @@ -122,4 +132,16 @@ public function onResolve(callable $onResolved) {

$this->coroutine->onResolve($onResolved);
}

/**
* Exposes the source input stream.
*
* This might be required to resolve a promise with an InputStream, because promises in Amp can't be resolved with
* other promises.
*
* @return InputStream
*/
public function getInputStream(): InputStream {
return $this->source;
}
}
17 changes: 9 additions & 8 deletions test/MessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\ByteStream\Test;

use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\Message;
use Amp\Emitter;
use Amp\Loop;
Expand All @@ -13,7 +14,7 @@ public function testBufferingAll() {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Message($emitter->iterate());
$stream = new Message(new IteratorStream($emitter->iterate()));

foreach ($values as $value) {
$emitter->emit($value);
Expand All @@ -30,7 +31,7 @@ public function testFullStreamConsumption() {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Message($emitter->iterate());
$stream = new Message(new IteratorStream($emitter->iterate()));

foreach ($values as $value) {
$emitter->emit($value);
Expand All @@ -55,7 +56,7 @@ public function testFastResolvingStream() {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Message($emitter->iterate());
$stream = new Message(new IteratorStream($emitter->iterate()));

foreach ($values as $value) {
$emitter->emit($value);
Expand All @@ -78,7 +79,7 @@ public function testFastResolvingStreamBufferingOnly() {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Message($emitter->iterate());
$stream = new Message(new IteratorStream($emitter->iterate()));

foreach ($values as $value) {
$emitter->emit($value);
Expand All @@ -95,7 +96,7 @@ public function testPartialStreamConsumption() {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new Message($emitter->iterate());
$stream = new Message(new IteratorStream($emitter->iterate()));

$emitter->emit($values[0]);

Expand All @@ -119,7 +120,7 @@ public function testFailingStream() {
$value = "abc";

$emitter = new Emitter;
$stream = new Message($emitter->iterate());
$stream = new Message(new IteratorStream($emitter->iterate()));

$emitter->emit($value);
$emitter->fail($exception);
Expand All @@ -138,7 +139,7 @@ public function testEmptyStream() {
Loop::run(function () {
$emitter = new Emitter;
$emitter->complete();
$stream = new Message($emitter->iterate());
$stream = new Message(new IteratorStream($emitter->iterate()));

$this->assertNull(yield $stream->read());
});
Expand All @@ -149,7 +150,7 @@ public function testReadAfterCompletion() {
$value = "abc";

$emitter = new Emitter;
$stream = new Message($emitter->iterate());
$stream = new Message(new IteratorStream($emitter->iterate()));

$emitter->emit($value);
$emitter->complete();
Expand Down

0 comments on commit ad249a8

Please sign in to comment.