Skip to content

Commit

Permalink
Support disposal on observe()
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jan 17, 2016
1 parent b3baa0b commit cac1e4c
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 6 deletions.
19 changes: 15 additions & 4 deletions src/Observable/functions.php
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,19 @@ function merge(array $observables)
* invoked.
*
* @param callable(mixed ...$args) $emitter Function accepting a callback that periodically emits events.
* @param callable(callable $callback, \Exception $exception) $onDisposed Called if the observable is disposed.
* The callback passed to this function is the callable provided to the $emitter callable given to this
* function.
* @param int $index Position of callback function in emitter function argument list.
* @param mixed ...$args Other arguments to pass to emitter function.
*
* @return \Icicle\Observable\Observable
*/
function observe(callable $emitter, $index = 0 /* , ...$args */)
function observe(callable $emitter, callable $onDisposed = null, $index = 0 /* , ...$args */)
{
$args = array_slice(func_get_args(), 2);
$args = array_slice(func_get_args(), 3);

return new Emitter(function (callable $emit) use ($emitter, $index, $args) {
$emitter = function (callable $emit) use (&$callback, $emitter, $index, $args) {
$queue = new \SplQueue();

/** @var \Icicle\Awaitable\Delayed $delayed */
Expand Down Expand Up @@ -111,7 +114,15 @@ function observe(callable $emitter, $index = 0 /* , ...$args */)
yield $emit($queue->shift());
}
}
});
};

if (null !== $onDisposed) {
$onDisposed = function (\Exception $exception) use (&$callback, $onDisposed) {
$onDisposed($callback, $exception);
};
}

return new Emitter($emitter, $onDisposed);
}

/**
Expand Down
29 changes: 27 additions & 2 deletions tests/Observable/ObserveTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Icicle\Loop;
use Icicle\Loop\SelectLoop;
use Icicle\Observable;
use Icicle\Observable\Exception\DisposedException;
use Icicle\Tests\TestCase;

class ObserveTest extends TestCase
Expand Down Expand Up @@ -76,7 +77,7 @@ public function testFunctionRequiringOtherArguments()
$this->assertSame($value, $name);
};

$observable = Observable\observe($emitter, 1, $value);
$observable = Observable\observe($emitter, null, 1, $value);

$callback = $this->createCallback(3);
$callback->method('__invoke')
Expand All @@ -101,10 +102,34 @@ public function testFunctionRequiringOtherArguments()
*/
public function testTooFewArguments()
{
$observable = Observable\observe($this->createCallback(0), 1);
$observable = Observable\observe($this->createCallback(0), null, 1);

$awaitable = new Coroutine($observable->each($this->createCallback(0)));

$awaitable->wait();
}

/**
* @depends testBasicEmitter
*/
public function testDisposedFunction()
{
$emitter = function(callable $callback) {
$this->callback = $callback;
};

$callback = $this->createCallback(1);
$callback->method('__invoke')
->with($this->isType('callable'), $this->isInstanceOf(DisposedException::class));

$observable = Observable\observe($emitter, $callback);

$awaitable = new Coroutine($observable->each($this->createCallback(0)));

Loop\tick();

$observable->dispose();

Loop\run();
}
}

0 comments on commit cac1e4c

Please sign in to comment.