Skip to content

Commit

Permalink
Merge f9ba1ba into 0dff1fe
Browse files Browse the repository at this point in the history
  • Loading branch information
sebdesign committed Oct 6, 2017
2 parents 0dff1fe + f9ba1ba commit 020a9fa
Show file tree
Hide file tree
Showing 7 changed files with 415 additions and 12 deletions.
4 changes: 2 additions & 2 deletions lib/ZlibInputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public function read(): Promise {
}

if ($data === null) {
$decompressed = \inflate_add($this->resource, "", \ZLIB_FINISH);
$decompressed = @\inflate_add($this->resource, "", \ZLIB_FINISH);

if ($decompressed === false) {
throw new StreamException("Failed adding data to deflate context");
Expand All @@ -61,7 +61,7 @@ public function read(): Promise {
return $decompressed;
}

$decompressed = \inflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH);
$decompressed = @\inflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH);

if ($decompressed === false) {
throw new StreamException("Failed adding data to deflate context");
Expand Down
12 changes: 5 additions & 7 deletions lib/ZlibOutputStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public function write(string $data): Promise {
throw new ClosedException("The stream has already been closed");
}

$compressed = \deflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH);
$compressed = @\deflate_add($this->resource, $data, \ZLIB_SYNC_FLUSH);

if ($compressed === false) {
throw new StreamException("Failed adding data to deflate context");
Expand All @@ -61,17 +61,15 @@ public function end(string $finalData = ""): Promise {
throw new ClosedException("The stream has already been closed");
}

$compressed = \deflate_add($this->resource, $finalData, \ZLIB_FINISH);
$compressed = @\deflate_add($this->resource, $finalData, \ZLIB_FINISH);

if ($compressed === false) {
throw new StreamException("Failed adding data to deflate context");
}

$promise = $this->destination->write($compressed);
$promise->onResolve(function ($error) {
if ($error) {
$this->close();
}
$promise = $this->destination->end($compressed);
$promise->onResolve(function () {
$this->close();
});

return $promise;
Expand Down
93 changes: 93 additions & 0 deletions test/IteratorStreamTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
<?php

namespace Amp\ByteStream\Test;

use Amp\ByteStream\IteratorStream;
use Amp\ByteStream\StreamException;
use Amp\Emitter;
use Amp\Loop;
use Amp\PHPUnit\TestCase;
use Amp\PHPUnit\TestException;

class IteratorStreamTest extends TestCase {
public function testReadIterator() {
Loop::run(function () {
$values = ["abc", "def", "ghi"];

$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());

foreach ($values as $value) {
$emitter->emit($value);
}

$emitter->complete();

$buffer = "";
while (($chunk = yield $stream->read()) !== null) {
$buffer .= $chunk;
}

$this->assertSame(\implode($values), $buffer);
$this->assertNull(yield $stream->read());
});
}

public function testFailingIterator() {
Loop::run(function () {
$exception = new TestException;
$value = "abc";

$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());

$emitter->emit($value);
$emitter->fail($exception);

$callable = $this->createCallback(1);

try {
while (($chunk = yield $stream->read()) !== null) {
$this->assertSame($value, $chunk);
}

$this->fail("No exception has been thrown");
} catch (TestException $reason) {
$this->assertSame($exception, $reason);
$callable(); // <-- ensure this point is reached
}
});
}

public function testThrowsOnNonStringIteration() {
$this->expectException(StreamException::class);
Loop::run(function () {
$value = 42;

$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());

$emitter->emit($value);

yield $stream->read();
});
}

public function testFailsAfterException() {
$this->expectException(StreamException::class);
Loop::run(function () {
$value = 42;

$emitter = new Emitter;
$stream = new IteratorStream($emitter->iterate());

$emitter->emit($value);

try {
yield $stream->read();
} catch (StreamException $e) {
yield $stream->read();
}
});
}
}
15 changes: 15 additions & 0 deletions test/MessageTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,21 @@ public function testEmptyStream() {
});
}

public function testEmptyStringStream() {
Loop::run(function () {
$value = "";

$emitter = new Emitter;
$stream = new Message(new IteratorStream($emitter->iterate()));

$emitter->emit($value);

$emitter->complete();

$this->assertSame("", yield $stream);
});
}

public function testReadAfterCompletion() {
Loop::run(function () {
$value = "abc";
Expand Down
137 changes: 134 additions & 3 deletions test/ResourceStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@

namespace Amp\ByteStream\Test;

use Amp\ByteStream\ClosedException;
use Amp\ByteStream\PendingReadError;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
use Amp\Loop;
use Amp\Success;
use PHPUnit\Framework\TestCase;

class ResourceStreamTest extends TestCase {
const LARGE_MESSAGE_SIZE = 1 << 20; // 1 MB

public function getStreamPair() {
public function getStreamPair($outputChunkSize = null, $inputChunkSize = ResourceInputStream::DEFAULT_CHUNK_SIZE) {
$domain = \stripos(PHP_OS, "win") === 0 ? STREAM_PF_INET : STREAM_PF_UNIX;
list($left, $right) = @\stream_socket_pair($domain, \STREAM_SOCK_STREAM, \STREAM_IPPROTO_IP);

$a = new ResourceOutputStream($left);
$b = new ResourceInputStream($right);
$a = new ResourceOutputStream($left, $outputChunkSize);
$b = new ResourceInputStream($right, $inputChunkSize);

return [$a, $b];
}
Expand Down Expand Up @@ -103,4 +106,132 @@ public function testThrowsOnExternallyShutdownStreamWithSmallPayloads() {
yield $lastWritePromise;
});
}

public function testThrowsOnCloseBeforeWritingComplete() {
$this->expectException(ClosedException::class);

Loop::run(function () {
list($a, $b) = $this->getStreamPair(4096);

$message = \str_repeat(".", 8192 /* default chunk size */);

$lastWritePromise = $a->end($message);

$a->close();

yield $lastWritePromise;
});
}

public function testThrowsOnStreamNotWritable() {
$this->expectException(StreamException::class);

Loop::run(function () {
list($a, $b) = $this->getStreamPair();

$message = \str_repeat(".", 8192 /* default chunk size */);

$a->close();

$lastWritePromise = $a->end($message);

yield $lastWritePromise;
});
}

public function testThrowsOnReferencingClosedStream() {
$this->expectException(\Error::class);

Loop::run(function () {
list($a, $b) = $this->getStreamPair();

$b->close();

$b->reference();
});
}

public function testThrowsOnUnreferencingClosedStream() {
$this->expectException(\Error::class);

Loop::run(function () {
list($a, $b) = $this->getStreamPair();

$b->close();

$b->unreference();
});
}

public function testThrowsOnPendingRead() {
$this->expectException(PendingReadError::class);

Loop::run(function () {
list($a, $b) = $this->getStreamPair();

$b->read();
$b->read();
});
}

public function testResolveSuccessOnClosedStream() {
Loop::run(function () {
list($a, $b) = $this->getStreamPair();

$b->close();

$this->assertInstanceOf(Success::class, $b->read());
});
}

public function testChunkedPayload() {
Loop::run(function () {
list($a, $b) = $this->getStreamPair(4096);

$message = \str_repeat(".", 8192 /* default chunk size */);

\Amp\Promise\rethrow($a->end($message));

$received = "";
while (null !== $chunk = yield $b->read()) {
$received .= $chunk;
}

$this->assertSame($message, $received);
});
}

public function testEmptyPayload() {
Loop::run(function () {
list($a, $b) = $this->getStreamPair(4096);

$message = "";

\Amp\Promise\rethrow($a->end($message));

$received = "";
while (null !== $chunk = yield $b->read()) {
$received .= $chunk;
}

$this->assertSame($message, $received);
});
}

public function testCloseStreamAfterEndPayload() {
Loop::run(function () {
list($a, $b) = $this->getStreamPair();

$message = \str_repeat(".", 8192 /* default chunk size */);

\Amp\Promise\rethrow($a->end($message));

$received = "";
while (null !== $chunk = yield $b->read()) {
$received .= $chunk;
}

$this->assertSame($message, $received);
});
}
}
29 changes: 29 additions & 0 deletions test/ZlibInputStreamTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,38 @@ public function testRead() {
});
}

public function testGetEncoding() {
$gzStream = new ZlibInputStream(new InMemoryStream(""), \ZLIB_ENCODING_GZIP);

$this->assertSame(\ZLIB_ENCODING_GZIP, $gzStream->getEncoding());
}

public function testInvalidEncoding() {
$this->expectException(StreamException::class);

new ZlibInputStream(new InMemoryStream(""), 1337);
}

public function testGetOptions() {
$options = [
"level" => -1,
"memory" => 8,
"window" => 15,
"strategy" => \ZLIB_DEFAULT_STRATEGY,
];

$gzStream = new ZlibInputStream(new InMemoryStream(""), \ZLIB_ENCODING_GZIP, $options);

$this->assertSame($options, $gzStream->getOptions());
}

public function testInvalidStream() {
$this->expectException(StreamException::class);

Loop::run(function () {
$gzStream = new ZlibInputStream(new InMemoryStream("Invalid"), \ZLIB_ENCODING_GZIP);

yield $gzStream->read();
});
}
}
Loading

0 comments on commit 020a9fa

Please sign in to comment.