Skip to content

Commit

Permalink
Start Emitter asynchronously; change each() return type to simply Awa…
Browse files Browse the repository at this point in the history
…itable

Also minor updates for changes in 0.9.x
  • Loading branch information
trowski committed Nov 24, 2015
1 parent 67d0bb1 commit 9095b65
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 115 deletions.
112 changes: 63 additions & 49 deletions src/Observable/Emitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
namespace Icicle\Observable;

use Exception;
use Icicle\Awaitable;
use Icicle\Awaitable\Awaitable;
use Icicle\Coroutine as CoroutineNS;
use Icicle\Coroutine\Coroutine;
use Icicle\Exception\UnexpectedTypeError;
use Icicle\Loop;
use Icicle\Observable\Exception\BusyError;
use Icicle\Observable\Exception\InvalidEmitterError;

Expand Down Expand Up @@ -59,63 +61,75 @@ public function __construct(callable $emitter, $hot = false)
*/
private function start()
{
$emitter = $this->emitter;
$this->emitter = null;

if ($this->queue->isComplete()) {
return;
}

/**
* Emits a value from the observable.
*
* @coroutine
*
* @param mixed $value If $value is an instance of \Icicle\Awaitable\Awaitable, the fulfillment value is used
* as the value to emit or the rejection reason is thrown from this coroutine. If $value is an instance of
* \Generator, it is used to create a coroutine which is then used as an awaitable.
*
* @return \Generator
*
* @resolve mixed The emitted value (the resolution value of $value)
*
* @throws \Icicle\Observable\Exception\CompletedError If the observable has been completed.
* @throws \Icicle\Observable\Exception\BusyError If the observable is still busy emitting a value.
* @throws \Icicle\Observable\Exception\DisposedException If no listeners remain on the observable.
*/
$emit = function ($value = null) {
if ($this->busy) {
throw new BusyError('Still busy emitting the last value. Wait until the $emit coroutine has resolved.');
}
Loop\queue(function () use ($emitter) { // Asynchronously start the observable.
/**
* Emits a value from the observable.
*
* @coroutine
*
* @param mixed $value If $value is an instance of \Icicle\Awaitable\Awaitable, the fulfillment value is
* used as the value to emit or the rejection reason is thrown from this coroutine. If $value is an
* instance of \Generator, it is used to create a coroutine which is then used as an awaitable.
*
* @return \Generator
*
* @resolve mixed The emitted value (the resolution value of $value)
*
* @throws \Icicle\Observable\Exception\CompletedError If the observable has been completed.
* @throws \Icicle\Observable\Exception\BusyError If the observable is still busy emitting a value.
* @throws \Icicle\Observable\Exception\DisposedException If no listeners remain on the observable.
*/
$emit = function ($value = null) {
if ($this->busy) {
throw new BusyError(
'Still busy emitting the last value. Wait until the $emit coroutine has resolved.'
);
}

$this->busy = true;
$this->busy = true;

try {
yield $this->queue->push(yield $value); // Get resolution value of $value.
} finally {
$this->busy = false;
}
};
if ($value instanceof \Generator) {
$value = new Coroutine($value);
}

$emitter = $this->emitter;
$this->emitter = null;
if ($value instanceof Awaitable) {
$value = (yield $value); // Get resolution value of $value.
}

try {
$generator = $emitter($emit);
try {
yield $this->queue->push($value);
} finally {
$this->busy = false;
}
};

if (!$generator instanceof \Generator) {
throw new UnexpectedTypeError('Generator', $generator);
}
try {
$generator = $emitter($emit);

$this->coroutine = new Coroutine($generator);
$this->coroutine->done(
function () {
$this->queue->complete();
},
function (Exception $exception) {
$this->queue->fail($exception);
if (!$generator instanceof \Generator) {
throw new UnexpectedTypeError('Generator', $generator);
}
);
} catch (Exception $exception) {
$this->queue->fail(new InvalidEmitterError($emitter, $exception));
}

$this->coroutine = new Coroutine($generator);
$this->coroutine->done(
function () {
$this->queue->complete();
},
function (Exception $exception) {
$this->queue->fail($exception);
}
);
} catch (Exception $exception) {
$this->queue->fail(new InvalidEmitterError($emitter, $exception));
}
});
}

/**
Expand All @@ -131,7 +145,7 @@ public function dispose()
*/
public function getIterator()
{
if (null === $this->coroutine) {
if (null !== $this->emitter) {
$this->start();
}

Expand Down Expand Up @@ -215,7 +229,7 @@ public function throttle($time)
$diff = $time + $start - microtime(true);

if (0 < $diff) {
$value = (yield Awaitable\resolve($value)->delay($diff));
yield CoroutineNS\sleep($diff);
}

$start = microtime(true);
Expand Down
9 changes: 4 additions & 5 deletions src/Observable/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ public function getIterator();
public function dispose();

/**
* The given callable will be invoked each time a value is emitted from the observable. The returned coroutine
* will be fulfilled when the observable completes or rejected if an error occurs. The coroutine will also be
* rejected if $onNext throws an exception or returns a rejected promise. The coroutine may be paused to interrupt
* the observable, preventing further values from being emitted until the coroutine is resumed or cancelled.
* The given callable will be invoked each time a value is emitted from the observable. The returned awaitable
* will be fulfilled when the observable completes or rejected if an error occurs. The awaitable will also be
* rejected if $onNext throws an exception or returns a rejected awaitable.
*
* @param callable<(mixed $value)> $onNext
*
* @return \Icicle\Coroutine\Coroutine
* @return \Icicle\Awaitable\Awaitable
*/
public function each(callable $onNext = null);

Expand Down
4 changes: 2 additions & 2 deletions src/Observable/Postponed.php
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ public function complete()
/**
* Throws an error in the observable.
*
* @param mixed $reason
* @param \Exception $reason
*/
public function error($reason = null)
public function error(\Exception $reason = null)
{
$this->delayed->reject($reason);
}
Expand Down

0 comments on commit 9095b65

Please sign in to comment.