Skip to content

Commit

Permalink
Observables tweaks, bugfixes, and more tests
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 27, 2015
1 parent 95f81b8 commit 31ee0be
Show file tree
Hide file tree
Showing 11 changed files with 518 additions and 53 deletions.
44 changes: 27 additions & 17 deletions src/Observable/Emitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
namespace Icicle\Observable;

use Exception;
use Generator;
use Icicle\Awaitable\Awaitable;
use Icicle\Coroutine as CoroutineNS;
use Icicle\Coroutine\Coroutine;
use Icicle\Exception\InvalidArgumentError;
use Icicle\Exception\UnexpectedTypeError;
use Icicle\Loop;
use Icicle\Observable\Exception\BusyError;
Expand Down Expand Up @@ -85,6 +87,7 @@ private function start()
* @throws \Icicle\Observable\Exception\CompletedError If the observable has been completed.
* @throws \Icicle\Observable\Exception\BusyError If the observable is still busy emitting a value.
* @throws \Icicle\Observable\Exception\DisposedException If no listeners remain on the observable.
* @throws \Exception The custom exception used to dispose the observable.
*/
$emit = function ($value = null) {
if ($this->busy) {
Expand All @@ -95,7 +98,7 @@ private function start()

$this->busy = true;

if ($value instanceof \Generator) {
if ($value instanceof Generator) {
$value = new Coroutine($value);
}

Expand All @@ -113,7 +116,7 @@ private function start()
try {
$generator = $emitter($emit);

if (!$generator instanceof \Generator) {
if (!$generator instanceof Generator) {
throw new UnexpectedTypeError('Generator', $generator);
}

Expand All @@ -135,9 +138,9 @@ function (Exception $exception) {
/**
* {@inheritdoc}
*/
public function dispose()
public function dispose(Exception $exception = null)
{
$this->queue->dispose();
$this->queue->dispose($exception);
}

/**
Expand Down Expand Up @@ -173,7 +176,10 @@ private function subscribe(callable $onNext = null)

while (yield $iterator->wait()) {
if (null !== $onNext) {
yield $onNext($iterator->getCurrent());
$result = $onNext($iterator->getCurrent());
if ($result instanceof Generator || $result instanceof Awaitable) {
yield $result;
}
}
}

Expand Down Expand Up @@ -202,7 +208,11 @@ public function filter(callable $callback)
$iterator = $this->getIterator();
while (yield $iterator->wait()) {
$value = $iterator->getCurrent();
if (yield $callback($value)) {
$result = $callback($value);
if ($result instanceof Generator || $result instanceof Awaitable) {
$result = (yield $result);
}
if ($result) {
yield $emit($value);
}
}
Expand All @@ -215,7 +225,7 @@ public function filter(callable $callback)
public function throttle($time)
{
$time = (float) $time;
if (0 > $time) {
if (0 >= $time) {
return $this->skip(0);
}

Expand Down Expand Up @@ -261,12 +271,12 @@ public function splat(callable $callback)
*/
public function take($count)
{
$count = (int) $count;
if (0 > $count) {
$count = 0;
}

return new self(function (callable $emit) use ($count) {
$count = (int) $count;
if (0 > $count) {
throw new InvalidArgumentError('The number of values to take must be non-negative.');
}

$iterator = $this->getIterator();
for ($i = 0; $i < $count && (yield $iterator->wait()); ++$i) {
yield $emit($iterator->getCurrent());
Expand All @@ -279,12 +289,12 @@ public function take($count)
*/
public function skip($count)
{
$count = (int) $count;
if (0 > $count) {
$count = 0;
}

return new self(function (callable $emit) use ($count) {
$count = (int) $count;
if (0 > $count) {
throw new InvalidArgumentError('The number of values to skip must be non-negative.');
}

$iterator = $this->getIterator();
for ($i = 0; $i < $count && (yield $iterator->wait()); ++$i);
while (yield $iterator->wait()) {
Expand Down
15 changes: 5 additions & 10 deletions src/Observable/EmitterIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,19 +82,18 @@ public function wait()
$this->awaitable = null;
}

if ($this->queue->isComplete()) {
yield false;
return;
}

yield true;
yield !$this->queue->isComplete();
}

/**
* {@inheritdoc}
*/
public function getCurrent()
{
if (null === $this->placeholder || null !== $this->awaitable) {
throw new UninitializedError('wait() must be called before calling this method.');
}

if ($this->queue->isComplete()) {
if ($this->current instanceof Exception) {
throw $this->current;
Expand All @@ -103,10 +102,6 @@ public function getCurrent()
throw new CompletedError('The observable has completed and the iterator is invalid.');
}

if (null === $this->placeholder) {
throw new UninitializedError('wait() must be called before calling this method.');
}

return $this->current;
}

Expand Down
23 changes: 14 additions & 9 deletions src/Observable/Internal/EmitQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ class EmitQueue
*/
private $listeners = 0;

/**
* @var bool
*/
private $backPressure = true;

/**
* @param bool $backPressure
*/
Expand Down Expand Up @@ -74,12 +79,10 @@ public function push($value)
$placeholder = $this->placeholder;
$this->placeholder = new Placeholder($this->delayed, $this->backPressure);

if ($this->backPressure) {
yield $placeholder->wait();
yield $placeholder->wait();

if ($this->failed) {
yield $this->delayed; // Will throw failure reason (probably disposed).
}
if ($this->failed) {
yield $this->delayed; // Will throw failure reason (probably disposed).
}

yield $value;
Expand All @@ -99,7 +102,7 @@ public function increment()
public function decrement()
{
if (0 >= --$this->listeners && !$this->complete) {
$this->dispose();
$this->dispose(new DisposedException('All subscribers stopped listening for emitted values.'));
}
}

Expand Down Expand Up @@ -141,11 +144,13 @@ public function fail(\Exception $exception)
}

/**
* Marks the observable as disposed (failed with a DisposedException).
* Marks the observable as disposed (with a DisposedException instance if $exception is null).
*
* @param \Exception|null $exception
*/
public function dispose()
public function dispose(\Exception $exception = null)
{
$this->fail(new DisposedException('All subscribers stopped listening for emitted values.'));
$this->fail($exception ?: new DisposedException('Observable disposed.'));
}

/**
Expand Down
1 change: 1 addition & 0 deletions src/Observable/Internal/Placeholder.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

use Icicle\Awaitable\Awaitable;
use Icicle\Awaitable\Delayed;
use Icicle\Loop;

final class Placeholder
{
Expand Down
14 changes: 8 additions & 6 deletions src/Observable/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,19 @@ interface Observable
public function getIterator();

/**
* Disposes of the observable, halting emission of values and failing the observable with an instance of
* DisposedException.
* Disposes of the observable, halting emission of values and failing the observable with the given exception.
* If no exception is given, an instance of \Icicle\Observable\Exception\DisposedException is used.
*
* @param \Exception|null $exception
*/
public function dispose();
public function dispose(\Exception $exception = null);

/**
* The given callable will be invoked each time a value is emitted from the observable. The returned awaitable
* will be fulfilled when the observable completes or rejected if an error occurs. The awaitable will also be
* rejected if $onNext throws an exception or returns a rejected awaitable.
*
* @param callable<(mixed $value)> $onNext
* @param callable<(mixed $value): \Generator|Awaitable|null> $onNext
*
* @return \Icicle\Awaitable\Awaitable
*/
Expand All @@ -37,7 +39,7 @@ public function each(callable $onNext = null);
* Each emitted value is passed to $callback. The value returned from $callback is then emitted from the returned
* observable.
*
* @param callable<(mixed $value): mixed> $callback
* @param callable<(mixed $value): \Generator|Awaitable|mixed> $callback
*
* @return \Icicle\Observable\Observable
*/
Expand All @@ -47,7 +49,7 @@ public function map(callable $callback);
* Filters the values emitted by the observable using $callback. If $callback returns true, the value is emitted
* from the returned observable. If $callback returns false, the value is ignored and not emitted.
*
* @param callable<(mixed $value): bool)> $callback
* @param callable<(mixed $value): \Generator|Awaitable|bool)> $callback
*
* @return \Icicle\Observable\Observable
*/
Expand Down
11 changes: 7 additions & 4 deletions src/Observable/ObservableIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,26 @@ interface ObservableIterator
/**
* @coroutine
*
* Returns a promise that is fulfilled with a boolean. If true, a new value is available by calling current().
* If false, the observable has completed an calling current() will throw an exception. If an error occurs with
* the observable, the exception will be used to reject the returned promise.
* Resolves with true if a new value is available by calling current() or false if the observable has completed.
* Calling current() will throw an exception if the observable completed. If an error occurs with the observable,
* this coroutine will be rejected with the exception used to fail the observable.
*
* @return \Generator
*
* @resolve bool
*
* @throws \Icicle\Observable\Exception\SynchronousIterationError Thrown if the last promise returned by this
* method is not resolved before calling the method again. Prevents synchronous iteration.
* @throws \Exception Exception used to fail the observable.
*/
public function wait();

/**
* @return mixed Value emitted from observable.
*
* @throws \Icicle\Observable\Exception\CompletedError
* @throws \Icicle\Observable\Exception\CompletedError If the observable has successfully completed.
* @throws \Icicle\Observable\Exception\UninitializedError If wait() was not called before calling this method.
* @throws \Exception Exception used to fail the observable.
*/
public function getCurrent();

Expand Down
21 changes: 17 additions & 4 deletions src/Observable/Postponed.php
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,23 @@ final class Postponed
*/
private $delayed;

/**
* @var \Icicle\Awaitable\Delayed|null
*/
private $started;

/**
* @param bool $hot
*/
public function __construct($hot = false)
{
$this->started = new Delayed();
$this->delayed = new Delayed();

$this->emitter = new Emitter(function (callable $emit) {
$this->emit = $emit;
$this->delayed = new Delayed();
$this->started->resolve($emit);
$this->started = null;

yield $this->delayed;
}, $hot);
}
Expand Down Expand Up @@ -67,8 +76,12 @@ public function getEmitter()
*/
public function emit($value = null)
{
if (null === $this->emit) {
$this->emit = (yield $this->started);
}

$emit = $this->emit;
return $emit($value);
yield $emit($value);
}

/**
Expand All @@ -84,7 +97,7 @@ public function complete()
*
* @param \Exception $reason
*/
public function error(\Exception $reason = null)
public function fail(\Exception $reason)
{
$this->delayed->reject($reason);
}
Expand Down
2 changes: 1 addition & 1 deletion src/Observable/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ function observe(callable $emitter, $index = 0 /* , ...$args */)
yield $emit($queue->shift());
}
}
});
}, true);
}

/**
Expand Down
Loading

0 comments on commit 31ee0be

Please sign in to comment.