Skip to content

Commit

Permalink
Merge pull request #1 from deepfreeze/release0.2
Browse files Browse the repository at this point in the history
Release0.2
  • Loading branch information
icywolfy committed Oct 2, 2015
2 parents 2d70195 + fd5deed commit d0d79d5
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 12 deletions.
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"stream"
],
"require": {
"deepfreeze-spi/stream": "~0.1.0"
"deepfreeze-spi/stream": "~0.2.0"
},
"require-dev": {
"phpunit/phpunit": "~4.6.0",
Expand Down
94 changes: 90 additions & 4 deletions src/AppendStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
namespace DeepFreeze\IO\Stream;


use DeepFreeze\IO\Stream\Exception\NotSupportedException;
use DeepFreeze\IO\Stream\Exception\RuntimeException;
use DeepFreezeSpi\IO\Stream\Exception;
use DeepFreezeSpi\IO\Stream\StreamInterface;

class AppendStream extends StreamDecorator
class AppendStream implements StreamInterface
{
/**
* @var int
Expand All @@ -24,6 +26,10 @@ class AppendStream extends StreamDecorator
*/
private $streams = array();

/**
* @var int
*/
private $defaultCopyBufferSize = 65536;

public function __construct(array $streams) {
$this->setStreams($streams);
Expand Down Expand Up @@ -137,10 +143,13 @@ public function read($count = 0) {
if (empty($this->streams)) {
return null;
}
$buffer = '';
$bytesRemaining = $count;
$currentStreamIndex = $this->currentStreamIndex;
$streamCount = count($this->streams);
if ($currentStreamIndex >= $streamCount) {
return null;
}
$buffer = '';
$bytesRemaining = $count;
do {
$read = $this->streams[$currentStreamIndex]->read($bytesRemaining);
if (empty($read)) {
Expand All @@ -153,7 +162,7 @@ public function read($count = 0) {
}
$buffer .= $read;
$bytesRemaining = $count - strlen($buffer);
} while ($bytesRemaining > 0);
} while (($bytesRemaining > 0) && ($currentStreamIndex < $streamCount));
return $buffer;
}

Expand Down Expand Up @@ -348,4 +357,81 @@ private function seekFromEnd($position) {
// Position is before the first file:
throw new Exception\RuntimeException('Unable to seek before start position.');
}


public function flush() {
// No buffers to flush
}

public function write($data, $length = null) {
throw new NotSupportedException("Write is not supported on AppendStream.");
}


public function canTimeout() {
return false;
}


public function setPosition($position) {
$this->seek($position, self::SEEK_ORIGIN);
}


public function setReadTimeout($ms) {
throw new NotSupportedException("Timeouts are not supported on AppendStream.");
}


public function getReadTimeout() {
throw new NotSupportedException("Timeouts are not supported on AppendStream.");
}


public function setWriteTimeout($ms) {
throw new NotSupportedException("Timeouts are not supported on AppendStream.");
}


public function getWriteTimeout() {
throw new NotSupportedException("Timeouts are not supported on AppendStream.");
}


public function copyTo(StreamInterface $destination, $bufferSize = null) {
if (null === $destination) {
throw new Exception\InvalidArgumentException('destination');
}
if (!$this->canRead() && !$this->canWrite()) {
throw new Exception\ObjectDisposedException('this');
}
if (!$destination->canRead() && !$destination->canWrite()) {
throw new Exception\ObjectDisposedException('destination');
}
if (!$this->canRead()) {
throw new Exception\NotSupportedException();
}
if (!$destination->canWrite()) {
throw new Exception\NotSupportedException();
}

if (null !== $bufferSize && $bufferSize <= 0) {
throw new Exception\InvalidArgumentException('bufferSize',
$bufferSize,
'Parameter "bufferSize" must be greater than 0.');
}

// Copy in chunks
$bufferSize = $bufferSize ?: $this->defaultCopyBufferSize;
while (($data = $this->read($bufferSize)) !== null) {
$destination->write($data);
}
}


public function setLength($length) {
throw new NotSupportedException("Stream is not writable.");
}


}
2 changes: 1 addition & 1 deletion src/FileStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class FileStream extends StreamDecorator implements StreamInterface
);


public function __construct($path, $fileMode, $fileAccess) {
public function __construct($path, $fileAccess, $fileMode=self::MODE_OPEN) {
$this->init($path, $fileMode, $fileAccess);
}

Expand Down
232 changes: 232 additions & 0 deletions src/LazyLoadStream.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
<?php
namespace DeepFreeze\IO\Stream;

use DeepFreezeSpi\IO\Stream\StreamInterface;

/**
* Class LazyLoadStream
* This class overrides every function of StreamDecorator, so it doesn't inherit.
* @package DeepFreeze\IO\Stream
*/
class LazyLoadStream implements StreamInterface
{
/**
* @var StreamInterface
*/
protected $stream;


/**
* @var callable
*/
private $callback;

/**
* @var array
*/
private $callbackParams = array();
public function __construct($callback, $callbackParams = array()) {
$this->setCallback($callback, $callbackParams);
}


/**
* Return the created stream.
*
* As this is a pure decorator class, exposing the stream does not impart any risk of
* incoherent state.
*
* @return StreamInterface
*/
public function getStream() {
$this->initializeStream();
return $this->stream;
}


/**
* @return callable
*/
public function getCallback() {
return $this->callback;
}



/**
* @param callable $callback
*/
public function setCallback($callback, $callbackParams = null) {
// Validate the callback on use. The callback may not be valid at that time.
$this->callback = $callback;
if (null === $callbackParams) {
// Don't modify
return;
}
if (!is_array($callbackParams)) {
// Assume One Parameter callback.
$callbackParams = array($callbackParams);
}
$this->callbackParams = $callbackParams;
}


private function initializeStream() {
// If stream is already initialized
if (null !== $this->stream) {
return;
}
if (!is_callable($this->callback)) {
throw new Exception\RuntimeException('The provided callback is not valid.');
}
$stream = call_user_func_array($this->callback, $this->callbackParams);
if (!$stream instanceof StreamInterface) {
throw new Exception\RuntimeException('The callback did not return a valid StreamInterface instance.');
}
$this->stream = $stream;
}

/**
* @return bool
*/
public function canRead() {
$this->initializeStream();
return $this->stream->canRead();
}


/**
* @return bool
*/
public function canSeek() {
$this->initializeStream();
return $this->stream->canSeek();
}


/**
* @return bool
*/
public function canTimeout() {
$this->initializeStream();
return $this->stream->canTimeout();
}


/**
* @return bool
*/
public function canWrite() {
$this->initializeStream();
return $this->stream->canTimeout();
}


/**
* @return int
*/
public function getLength() {
$this->initializeStream();
return $this->stream->getLength();
}


/**
* @return int
*/
public function getPosition() {
$this->initializeStream();
return $this->stream->getPosition();
}


/**
* @param int $position
*/
public function setPosition($position) {
$this->initializeStream();
$this->stream->setPosition($position);
}


/**
* @param int $ms
*/
public function setReadTimeout($ms) {
$this->initializeStream();
$this->stream->setReadTimeout($ms);
}


/**
* @return int
*/
public function getReadTimeout() {
$this->initializeStream();
return $this->stream->getReadTimeout();
}


/**
* @param int $ms
*/
public function setWriteTimeout($ms) {
$this->initializeStream();
$this->stream->setWriteTimeout($ms);
}


/**
* @return int
*/
public function getWriteTimeout() {
$this->initializeStream();
return $this->stream->getWriteTimeout();
}


public function copyTo(StreamInterface $destination, $bufferSize = null) {
$this->initializeStream();
$this->stream->copyTo($destination, $bufferSize);
}


public function dispose() {
$this->initializeStream();
$this->stream->dispose();
}


public function flush() {
$this->initializeStream();
$this->stream->flush();
}


/**
* @param int $length
* @return string
*/
public function read($length = 0) {
$this->initializeStream();
return $this->stream->read($length);
}


public function seek($position, $whence = null) {
$this->initializeStream();
return $this->stream->seek($position, $whence);
}


public function setLength($length) {
$this->initializeStream();
$this->stream->setLength($length);
}


public function write($data, $length = null) {
$this->initializeStream();
$this->stream->write($data, $length);
}
}
Loading

0 comments on commit d0d79d5

Please sign in to comment.