Skip to content

Commit

Permalink
Base implementation on iterators only
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 6, 2015
1 parent 9c80751 commit 2680258
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 214 deletions.
4 changes: 2 additions & 2 deletions examples/observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
yield $emit(10);
});

$observable->subscribe(
$observable->each(
function ($value) {
printf("Base: %d\n", $value);
},
Expand Down Expand Up @@ -51,7 +51,7 @@ function () {
});


$observable->subscribe(
$observable->each(
function ($value) {
printf("Filtered and mapped: %d\n", $value);
},
Expand Down
122 changes: 34 additions & 88 deletions src/Observable/Internal/ObserverQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@
namespace Icicle\Observable\Internal;

use Exception;
use Icicle\Loop;
use Icicle\Observable\Exception\BusyError;
use Icicle\Observable\Exception\CompletedException;
use Icicle\Promise;
use Icicle\Promise\Deferred;
use Icicle\Promise\PromiseInterface;

class ObserverQueue
Expand All @@ -23,71 +23,47 @@ class ObserverQueue
*/
private $complete = false;

/**
* @var \SplObjectStorage
*/
private $observers;

/**
* @var bool
*/
private $busy = false;

/**
* Initializes queue collection.
* @var \Icicle\Promise\PromiseInterface[]
*/
public function __construct()
{
$this->observers = new \SplObjectStorage();
}
private $promises = [];

/**
* @param \Icicle\Observable\Internal\Observer $observer
* @var \Icicle\Promise\Deferred
*/
public function subscribe(Observer $observer)
{
if ($this->complete) {
Loop\queue(function () use ($observer) {
if ($this->complete instanceof Exception) {
$result = $observer->onError($this->complete);
} else {
$result = $observer->onComplete();
}

if ($result instanceof PromiseInterface) {
$result->done();
}
});
return;
}
private $deferred;

$this->observers->attach($observer);
}
/**
* @var \Icicle\Promise\PromiseInterface
*/
private $promise;

/**
* @param \Icicle\Observable\Internal\Observer $observer
* Initializes queue collection.
*/
public function unsubscribe(Observer $observer)
public function __construct()
{
if (null === $this->observers) {
return;
}

$this->observers->detach($observer);
$this->deferred = new Deferred();
$this->promise = $this->deferred->getPromise()->uncancellable();
}

/**
* @param \Icicle\Observable\Internal\Observer $observer
* @param \Icicle\Promise\PromiseInterface|null $promise
*
* @return bool
* @return \Icicle\Promise\PromiseInterface
*/
public function isSubscribed(Observer $observer)
public function pull(PromiseInterface $promise)
{
if (null === $this->observers) {
return false;
if (!$this->complete) {
$this->promises[] = $promise;
}

return $this->observers->contains($observer);
return $this->promise;
}

/**
Expand Down Expand Up @@ -127,22 +103,20 @@ public function emit($value)
try {
$value = (yield $value); // Get resolution value of $value.

$results = [];
$promises = $this->promises;
$this->promises = [];

foreach ($this->observers as $observer) {
try {
$result = $observer->onNext($value);
} catch (Exception $exception) {
$result = Promise\reject($exception);
}
$this->deferred->resolve($value);
$this->deferred = new Deferred();
$this->promise = $this->deferred->getPromise()->uncancellable();

if ($result instanceof PromiseInterface) {
$results[] = $result;
$result->done();
}
}
$count = count($promises);

yield Promise\settle($results);
if (1 === $count) {
yield $promises[0];
} elseif (0 !== $count) {
yield Promise\all($promises);
}
} finally {
$this->busy = false;
}
Expand All @@ -168,22 +142,8 @@ public function complete()

$this->complete = true;

$results = [];

foreach ($this->observers as $observer) {
try {
$result = $observer->onComplete();
} catch (Exception $exception) {
$result = Promise\reject($exception);
}

if ($result instanceof PromiseInterface) {
$results[] = $result;
$result->done();
}
}

$this->observers = null;
$this->promises = [];
$this->deferred->resolve();
}

/**
Expand All @@ -207,21 +167,7 @@ public function error(Exception $exception)

$this->complete = true;

$results = [];

foreach ($this->observers as $observer) {
try {
$result = $observer->onError($exception);
} catch (Exception $exception) {
$result = Promise\reject($exception);
}

if ($result instanceof PromiseInterface) {
$results[] = $result;
$result->done();
}
}

$this->observers = null;
$this->promises = [];
$this->deferred->reject($exception);
}
}
77 changes: 41 additions & 36 deletions src/Observable/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@
namespace Icicle\Observable;

use Exception;
use Icicle\Coroutine;
use Icicle\Loop;
use Icicle\Coroutine\Coroutine;
use Icicle\Observable\Exception\ErrorException;
use Icicle\Promise\Promise;

class Observable implements ObservableInterface
{
Expand Down Expand Up @@ -63,7 +61,8 @@ public function __construct(callable $producer)
};

try {
Coroutine\create($producer, $emit)->done($complete, $error);
$coroutine = new Coroutine($producer($emit));
$coroutine->done($complete, $error);
} catch (Exception $exception) {
$error($exception);
}
Expand All @@ -74,27 +73,53 @@ public function __construct(callable $producer)
*/
public function getIterator()
{
return new ObservableIterator($this);
return new ObservableIterator($this->queue);
}

/**
* {@inheritdoc}
*/
public function subscribe(callable $onNext = null, callable $onError = null, callable $onComplete = null)
public function each(callable $onNext = null, callable $onError = null, callable $onComplete = null)
{
$observer = new Internal\Observer($onNext, $onError, $onComplete);

$this->queue->subscribe($observer);

return new Observer($this->queue, $observer);
return new Coroutine($this->subscribe($onNext, $onError, $onComplete));
}

/**
* {@inheritdoc}
* @coroutine
*
* @param callable|null $onNext
* @param callable|null $onError
* @param callable|null $onComplete
*
* @return \Generator
*/
public function isComplete()
private function subscribe(callable $onNext = null, callable $onError = null, callable $onComplete = null)
{
return $this->queue->isComplete();
$iterator = $this->getIterator();

do {
try {
$valid = (yield $iterator->wait());
} catch (Exception $exception) {
if (null === $onError) {
throw $exception;
}

yield $onError($exception);
return;
}

if (null !== $onNext && $valid) {
yield $onNext($iterator->getCurrent());
}
} while ($valid);

if (null === $onComplete) {
yield null;
return;
}

yield $onComplete();
}

/**
Expand Down Expand Up @@ -129,28 +154,8 @@ public function filter(callable $callback)
/**
* {@inheritdoc}
*/
public function each(callable $callback)
public function isComplete()
{
return new Promise(function (callable $resolve, callable $reject) use ($callback) {
$unsubscribe = function () use (&$observer) {
/** @var \Icicle\Observable\Observer $observer */
$observer->unsubscribe();
};

$observer = $this->subscribe(
function ($value) use ($unsubscribe, $reject, $callback) {
try {
$callback($value);
} catch (Exception $exception) {
$unsubscribe();
$reject($exception);
}
},
$reject,
$resolve
);

return $unsubscribe;
});
return $this->queue->isComplete();
}
}
19 changes: 6 additions & 13 deletions src/Observable/ObservableInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,9 @@ public function getIterator();
* @param callable|null $onError
* @param callable|null $onComplete
*
* @return \Icicle\Observable\ObserverInterface
* @return \Icicle\Coroutine\CoroutineInterface
*/
public function subscribe(callable $onNext = null, callable $onError = null, callable $onComplete = null);

/**
* Determines if the observable has completed (completed observables will no longer emit values). Returns true
* even if the observable completed due to an error.
*
* @return bool
*/
public function isComplete();
public function each(callable $onNext = null, callable $onError = null, callable $onComplete = null);

/**
* @param callable<(mixed $value): mixed)> $callback
Expand All @@ -50,9 +42,10 @@ public function map(callable $callback);
public function filter(callable $callback);

/**
* @param callable<(mixed $value): mixed)> $callback
* Determines if the observable has completed (completed observables will no longer emit values). Returns true
* even if the observable completed due to an error.
*
* @return \Icicle\Promise\PromiseInterface
* @return bool
*/
public function each(callable $callback);
public function isComplete();
}

0 comments on commit 2680258

Please sign in to comment.