Skip to content

Commit

Permalink
Add support for new Stream API
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed May 21, 2020
1 parent b867505 commit dc7e184
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 2 deletions.
2 changes: 1 addition & 1 deletion composer.json
Expand Up @@ -26,7 +26,7 @@
}
],
"require": {
"amphp/amp": "^2"
"amphp/amp": "dev-streams as 2.6"
},
"require-dev": {
"amphp/phpunit-util": "^1",
Expand Down
2 changes: 1 addition & 1 deletion lib/IteratorStream.php
Expand Up @@ -17,7 +17,7 @@ final class IteratorStream implements InputStream
private $pending = false;

/**
* @psam-param Iterator<string> $iterator
* @psalm-param Iterator<string> $iterator
*/
public function __construct(Iterator $iterator)
{
Expand Down
68 changes: 68 additions & 0 deletions lib/StreamConverter.php
@@ -0,0 +1,68 @@
<?php

namespace Amp\ByteStream;

use Amp\Deferred;
use Amp\Failure;
use Amp\Promise;
use Amp\Stream;

final class StreamConverter implements InputStream
{
/** @var Stream<string> */
private $stream;
/** @var \Throwable|null */
private $exception;
/** @var bool */
private $pending = false;

/**
* @psalm-param Stream<string> $iterator
*/
public function __construct(Stream $stream)
{
$this->stream = $stream;
}

/** @inheritdoc */
public function read(): Promise
{
if ($this->exception) {
return new Failure($this->exception);
}

if ($this->pending) {
throw new PendingReadError;
}

$this->pending = true;
/** @var Deferred<string|null> $deferred */
$deferred = new Deferred;

$this->stream->continue()->onResolve(function ($error, $chunk) use ($deferred) {
$this->pending = false;

if ($error) {
$this->exception = $error;
$deferred->fail($error);
} elseif ($chunk !== null) {
if (!\is_string($chunk)) {
$this->exception = new StreamException(\sprintf(
"Unexpected iterator value of type '%s', expected string",
\is_object($chunk) ? \get_class($chunk) : \gettype($chunk)
));

$deferred->fail($this->exception);

return;
}

$deferred->resolve($chunk);
} else {
$deferred->resolve();
}
});

return $deferred->promise();
}
}
91 changes: 91 additions & 0 deletions test/StreamConverterTest.php
@@ -0,0 +1,91 @@
<?php

namespace Amp\ByteStream\Test;

use Amp\ByteStream\StreamConverter;
use Amp\ByteStream\StreamException;
use Amp\PHPUnit\AsyncTestCase;
use Amp\PHPUnit\TestException;
use Amp\StreamSource;

class StreamConverterTest extends AsyncTestCase
{
public function testReadIterator()
{
$values = ["abc", "def", "ghi"];

$source = new StreamSource;
$stream = new StreamConverter($source->stream());

foreach ($values as $value) {
$source->yield($value);
}

$source->complete();

$buffer = "";
while (($chunk = yield $stream->read()) !== null) {
$buffer .= $chunk;
}

$this->assertSame(\implode($values), $buffer);
$this->assertNull(yield $stream->read());
}

public function testFailingIterator()
{
$exception = new TestException;
$value = "abc";

$source = new StreamSource;
$stream = new StreamConverter($source->stream());

$source->yield($value);
$source->fail($exception);

$callable = $this->createCallback(1);

try {
while (($chunk = yield $stream->read()) !== null) {
$this->assertSame($value, $chunk);
}

$this->fail("No exception has been thrown");
} catch (TestException $reason) {
$this->assertSame($exception, $reason);
$callable(); // <-- ensure this point is reached
}
}

public function testThrowsOnNonStringIteration()
{
$this->expectException(StreamException::class);

$value = 42;

$source = new StreamSource;
$stream = new StreamConverter($source->stream());

$source->yield($value);

yield $stream->read();
}

public function testFailsAfterException()
{
$this->expectException(StreamException::class);

$value = 42;

$source = new StreamSource;
$stream = new StreamConverter($source->stream());

$source->yield($value);

try {
yield $stream->read();
} catch (StreamException $e) {
yield $stream->read();
}
}
}

0 comments on commit dc7e184

Please sign in to comment.