Skip to content

Commit

Permalink
Update observables with PHP 7 features
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Dec 1, 2015
1 parent 97f25ec commit 29a7882
Show file tree
Hide file tree
Showing 12 changed files with 184 additions and 204 deletions.
87 changes: 40 additions & 47 deletions src/Observable/Emitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,12 @@

namespace Icicle\Observable;

use Exception;
use Generator;
use Icicle\Coroutine as CoroutineNS;
use Icicle\Coroutine\Coroutine;
use Icicle\Exception\InvalidArgumentError;
use Icicle\Exception\UnexpectedTypeError;
use Icicle\Coroutine\{Coroutine, function sleep};
use Icicle\Exception\{InvalidArgumentError, UnexpectedTypeError};
use Icicle\Loop;
use Icicle\Observable\Exception\DisposedException;
use Icicle\Observable\Exception\InvalidEmitterError;
use Icicle\Observable\Exception\{DisposedException, InvalidEmitterError};
use Throwable;

class Emitter implements Observable
{
Expand Down Expand Up @@ -70,7 +67,7 @@ private function start()
* @throws \Icicle\Observable\Exception\CompletedError If the observable has been completed.
* @throws \Icicle\Observable\Exception\BusyError If the observable is still busy emitting a value.
*/
$emit = function ($value = null) {
$emit = function ($value = null): \Generator {
return $this->queue->push($value);
};

Expand All @@ -86,11 +83,11 @@ private function start()
function ($value) {
$this->queue->complete($value);
},
function (Exception $exception) {
function (Throwable $exception) {
$this->queue->fail($exception);
}
);
} catch (Exception $exception) {
} catch (Throwable $exception) {
$this->queue->fail(new InvalidEmitterError($emitter, $exception));
}
});
Expand All @@ -99,7 +96,7 @@ function (Exception $exception) {
/**
* {@inheritdoc}
*/
public function dispose(Exception $exception = null)
public function dispose(Throwable $exception = null)
{
if (null === $exception) {
$exception = new DisposedException('Observable disposed.');
Expand All @@ -117,7 +114,7 @@ public function dispose(Exception $exception = null)
/**
* {@inheritdoc}
*/
public function getIterator()
public function getIterator(): ObservableIterator
{
if (null !== $this->emitter) {
$this->start();
Expand All @@ -129,63 +126,61 @@ public function getIterator()
/**
* {@inheritdoc}
*/
public function each(callable $onNext = null)
public function each(callable $onNext = null): \Generator
{
$iterator = $this->getIterator();

while (yield $iterator->wait()) {
while (yield from $iterator->wait()) {
if (null !== $onNext) {
yield $onNext($iterator->getCurrent());
}
}

yield $iterator->getReturn();
return $iterator->getReturn();
}

/**
* {@inheritdoc}
*/
public function map(callable $onNext, callable $onComplete = null)
public function map(callable $onNext, callable $onComplete = null): Observable
{
return new self(function (callable $emit) use ($onNext, $onComplete) {
$iterator = $this->getIterator();
while (yield $iterator->wait()) {
yield $emit($onNext($iterator->getCurrent()));
while (yield from $iterator->wait()) {
yield from $emit($onNext($iterator->getCurrent()));
}

if (null === $onComplete) {
yield $iterator->getReturn();
return;
return $iterator->getReturn();
}

yield $onComplete($iterator->getReturn());
return yield $onComplete($iterator->getReturn());
});
}

/**
* {@inheritdoc}
*/
public function filter(callable $callback)
public function filter(callable $callback): Observable
{
return new self(function (callable $emit) use ($callback) {
$iterator = $this->getIterator();
while (yield $iterator->wait()) {
while (yield from $iterator->wait()) {
$value = $iterator->getCurrent();
if (yield $callback($value)) {
yield $emit($value);
yield from $emit($value);
}
}

yield $iterator->getReturn();
return $iterator->getReturn();
});
}

/**
* {@inheritdoc}
*/
public function throttle($time)
public function throttle(float $time): Observable
{
$time = (float) $time;
if (0 >= $time) {
return $this->skip(0);
}
Expand All @@ -194,28 +189,28 @@ public function throttle($time)
$iterator = $this->getIterator();
$start = microtime(true) - $time;

while (yield $iterator->wait()) {
while (yield from $iterator->wait()) {
$value = $iterator->getCurrent();

$diff = $time + $start - microtime(true);

if (0 < $diff) {
yield CoroutineNS\sleep($diff);
yield from sleep($diff);
}

$start = microtime(true);

yield $emit($value);
yield from $emit($value);
}

yield $iterator->getReturn();
return $iterator->getReturn();
});
}

/**
* {@inheritdoc}
*/
public function splat(callable $onNext, callable $onComplete = null)
public function splat(callable $onNext, callable $onComplete = null): Observable
{
$onNext = function ($values) use ($onNext) {
if ($values instanceof \Traversable) {
Expand All @@ -225,7 +220,7 @@ public function splat(callable $onNext, callable $onComplete = null)
}

ksort($values);
return call_user_func_array($onNext, $values);
return $onNext(...$values);
};

if (null !== $onComplete) {
Expand All @@ -237,7 +232,7 @@ public function splat(callable $onNext, callable $onComplete = null)
}

ksort($values);
return call_user_func_array($onComplete, $values);
return $onComplete(...$values);
};
}

Expand All @@ -247,56 +242,54 @@ public function splat(callable $onNext, callable $onComplete = null)
/**
* {@inheritdoc}
*/
public function take($count)
public function take(int $count): Observable
{
return new self(function (callable $emit) use ($count) {
$count = (int) $count;
if (0 > $count) {
throw new InvalidArgumentError('The number of values to take must be non-negative.');
}

$iterator = $this->getIterator();
for ($i = 0; $i < $count && (yield $iterator->wait()); ++$i) {
yield $emit($iterator->getCurrent());
for ($i = 0; $i < $count && yield from $iterator->wait(); ++$i) {
yield from $emit($iterator->getCurrent());
}

yield $i;
return $i;
});
}

/**
* {@inheritdoc}
*/
public function skip($count)
public function skip(int $count): Observable
{
return new self(function (callable $emit) use ($count) {
$count = (int) $count;
if (0 > $count) {
throw new InvalidArgumentError('The number of values to skip must be non-negative.');
}

$iterator = $this->getIterator();
for ($i = 0; $i < $count && (yield $iterator->wait()); ++$i);
while (yield $iterator->wait()) {
yield $emit($iterator->getCurrent());
for ($i = 0; $i < $count && yield from $iterator->wait(); ++$i);
while (yield from $iterator->wait()) {
yield from $emit($iterator->getCurrent());
}

yield $iterator->getReturn();
return $iterator->getReturn();
});
}

/**
* {@inheritdoc}
*/
public function isComplete()
public function isComplete(): bool
{
return $this->queue->isComplete();
}

/**
* {@inheritdoc}
*/
public function isFailed()
public function isFailed(): bool
{
return $this->queue->isFailed();
}
Expand Down
15 changes: 6 additions & 9 deletions src/Observable/EmitterIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@

namespace Icicle\Observable;

use Exception;
use Icicle\Observable\Exception\CompletedError;
use Icicle\Observable\Exception\IncompleteError;
use Icicle\Observable\Exception\UninitializedError;
use Icicle\Observable\Exception\{CompletedError, IncompleteError, UninitializedError};

class EmitterIterator implements ObservableIterator
{
Expand Down Expand Up @@ -60,7 +57,7 @@ public function __destruct()
/**
* {@inheritdoc}
*/
public function wait()
public function wait(): \Generator
{
while (null !== $this->awaitable) {
yield $this->awaitable; // Wait until last call has resolved.
Expand All @@ -72,15 +69,15 @@ public function wait()

try {
$this->placeholder = $this->queue->pull();
$this->current = (yield $this->awaitable = $this->placeholder->getAwaitable());
} catch (Exception $exception) {
$this->current = yield $this->awaitable = $this->placeholder->getAwaitable();
} catch (\Throwable $exception) {
$this->current = $exception;
throw $exception;
} finally {
$this->awaitable = null;
}

yield !$this->queue->isComplete();
return !$this->queue->isComplete();
}

/**
Expand Down Expand Up @@ -118,7 +115,7 @@ public function getReturn()
/**
* {@inheritdoc}
*/
public function isValid()
public function isValid(): bool
{
return !$this->queue->isComplete();
}
Expand Down
Loading

0 comments on commit 29a7882

Please sign in to comment.