Skip to content

Commit

Permalink
Readable stream encoding and decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
WyriHaximus committed Jul 8, 2017
1 parent a82c226 commit db79b83
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 0 deletions.
72 changes: 72 additions & 0 deletions src/ReadableStreamBase64Decode.php
@@ -0,0 +1,72 @@
<?php declare(strict_types=1);

namespace WyriHaximus\React\Stream\Base64;

use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

final class ReadableStreamBase64Decode extends EventEmitter implements ReadableStreamInterface
{
/**
* @var WritableStreamInterface
*/
private $stream;

/**
* @var string
*/
private $buffer = '';

/**
* @param ReadableStreamInterface $stream
*/
public function __construct(ReadableStreamInterface $stream)
{
$this->stream = $stream;
$this->stream->on('data', function ($data) {
$this->buffer .= $data;
$this->emit('data', [$this->processBuffer()]);
});
$this->stream->once('close', function () {
$this->emit('data', [base64_decode($this->buffer, true)]);
$this->emit('close');
});
Util::forwardEvents($stream, $this, ['error', 'end']);
}

public function isReadable()
{
return $this->stream->isReadable();
}

public function pause()
{
return $this->stream->pause();
}

public function resume()
{
return $this->stream->resume();
}

public function pipe(WritableStreamInterface $dest, array $options = [])
{
return $this->stream->pipe($dest, $options);
}

public function close()
{
$this->stream->close();
}

private function processBuffer(): string
{
$length = strlen($this->buffer);
$buffer = base64_decode(substr($this->buffer, 0, $length - $length % 4), true);
$this->buffer = substr($this->buffer, $length - $length % 4);

return $buffer;
}
}
72 changes: 72 additions & 0 deletions src/ReadableStreamBase64Encode.php
@@ -0,0 +1,72 @@
<?php declare(strict_types=1);

namespace WyriHaximus\React\Stream\Base64;

use Evenement\EventEmitter;
use React\Stream\ReadableStreamInterface;
use React\Stream\Util;
use React\Stream\WritableStreamInterface;

final class ReadableStreamBase64Encode extends EventEmitter implements ReadableStreamInterface
{
/**
* @var WritableStreamInterface
*/
private $stream;

/**
* @var string
*/
private $buffer = '';

/**
* @param ReadableStreamInterface $stream
*/
public function __construct(ReadableStreamInterface $stream)
{
$this->stream = $stream;
$this->stream->on('data', function ($data) {
$this->buffer .= $data;
$this->emit('data', [$this->processBuffer()]);
});
$this->stream->once('close', function () {
$this->emit('data', [base64_encode($this->buffer)]);
$this->emit('close');
});
Util::forwardEvents($stream, $this, ['error', 'end']);
}

public function isReadable()
{
return $this->stream->isReadable();
}

public function pause()
{
return $this->stream->pause();
}

public function resume()
{
return $this->stream->resume();
}

public function pipe(WritableStreamInterface $dest, array $options = [])
{
return $this->stream->pipe($dest, $options);
}

public function close()
{
$this->stream->close();
}

private function processBuffer(): string
{
$length = strlen($this->buffer);
$buffer = base64_encode(substr($this->buffer, 0, $length - $length % 3));
$this->buffer = substr($this->buffer, $length - $length % 3);

return $buffer;
}
}
33 changes: 33 additions & 0 deletions tests/ReadableStreamBase64DecodeTest.php
@@ -0,0 +1,33 @@
<?php declare(strict_types=1);

namespace WyriHaximus\React\Tests\Stream\Base64;

use PHPUnit\Framework\TestCase;
use React\EventLoop\Factory;
use React\Stream\ThroughStream;
use WyriHaximus\React\Stream\Base64\ReadableStreamBase64Decode;
use function Clue\React\Block\await;
use function React\Promise\Stream\buffer;

final class ReadableStreamBase64DecodeTest extends TestCase
{
/**
* @dataProvider WyriHaximus\React\Tests\Stream\Base64\DataProvider::provideData
*/
public function testHash(string $data)
{
$loop = Factory::create();
$throughStream = new ThroughStream();
$stream = new ReadableStreamBase64Decode($throughStream);
$loop->futureTick(function () use ($throughStream, $data) {
$data = base64_encode($data);
$chunks = str_split($data);
$last = count($chunks) - 1;
for ($i = 0; $i < $last; $i++) {
$throughStream->write($chunks[$i]);
}
$throughStream->end($chunks[$last]);
});
self::assertSame($data, await(buffer($stream), $loop));
}
}
32 changes: 32 additions & 0 deletions tests/ReadableStreamBase64EncodeTest.php
@@ -0,0 +1,32 @@
<?php declare(strict_types=1);

namespace WyriHaximus\React\Tests\Stream\Base64;

use PHPUnit\Framework\TestCase;
use React\EventLoop\Factory;
use React\Stream\ThroughStream;
use WyriHaximus\React\Stream\Base64\ReadableStreamBase64Encode;
use function Clue\React\Block\await;
use function React\Promise\Stream\buffer;

final class ReadableStreamBase64EncodeTest extends TestCase
{
/**
* @dataProvider WyriHaximus\React\Tests\Stream\Base64\DataProvider::provideData
*/
public function testHash(string $data)
{
$loop = Factory::create();
$throughStream = new ThroughStream();
$stream = new ReadableStreamBase64Encode($throughStream);
$loop->futureTick(function () use ($throughStream, $data) {
$chunks = str_split($data);
$last = count($chunks) - 1;
for ($i = 0; $i < $last; $i++) {
$throughStream->write($chunks[$i]);
}
$throughStream->end($chunks[$last]);
});
self::assertSame(base64_encode($data), await(buffer($stream), $loop));
}
}

0 comments on commit db79b83

Please sign in to comment.