Skip to content

Commit

Permalink
Add Suspended
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Oct 31, 2015
1 parent b3450ae commit 458647e
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 4 deletions.
5 changes: 4 additions & 1 deletion src/Observable/Internal/ObserverQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,10 @@ public function subscribe(Observer $observer)
{
if (null === $this->observers) {
Loop\queue(function () use ($observer) {
$observer->onComplete();
$result = $observer->onComplete();
if ($result instanceof PromiseInterface) {
$result->done();
}
});
return;
}
Expand Down
5 changes: 2 additions & 3 deletions src/Observable/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Icicle\Observable;

use Icicle\Coroutine\Coroutine;
use Icicle\Coroutine;
use Icicle\Loop;
use Icicle\Promise\Promise;

Expand Down Expand Up @@ -61,8 +61,7 @@ public function __construct(callable $producer)
};

try {
$coroutine = new Coroutine($producer($emit));
$coroutine->done($complete, $error);
Coroutine\create($producer, $emit)->done($complete, $error);
} catch (\Exception $exception) {
$error($exception);
}
Expand Down
3 changes: 3 additions & 0 deletions src/Observable/ObservableIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ function () {
*/
public function __destruct()
{
if (null !== $this->ready) {
$this->ready->resolve();
}
$this->observer->unsubscribe();
}

Expand Down
18 changes: 18 additions & 0 deletions src/Observable/ObservationInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

/*
* This file is part of Icicle, a library for writing asynchronous code in PHP using promises and coroutines.
*
* @copyright 2014-2015 Aaron Piotrowski. All rights reserved.
* @license MIT See the LICENSE file that was distributed with this source code for more information.
*/

namespace Icicle\Observable;

interface ObservationInterface
{
/**
* @return \Icicle\Observable\ObservableInterface
*/
public function getObservable();
}
86 changes: 86 additions & 0 deletions src/Observable/Suspended.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php

/*
* This file is part of Icicle, a library for writing asynchronous code in PHP using promises and coroutines.
*
* @copyright 2014-2015 Aaron Piotrowski. All rights reserved.
* @license MIT See the LICENSE file that was distributed with this source code for more information.
*/

namespace Icicle\Observable;

use Icicle\Promise\Deferred;

class Suspended implements ObservationInterface
{
/**
* @var \Icicle\Observable\Observable
*/
private $observable;

/**
* @var callable
*/
private $emit;

/**
* @var \Icicle\Promise\Deferred
*/
private $deferred;

public function __construct()
{
$this->observable = new Observable(function (callable $emit) {
$this->emit = $emit;
$this->deferred = new Deferred();
yield $this->deferred->getPromise();
});
}

/**
* @return \Icicle\Observable\Observable
*/
public function getObservable()
{
return $this->observable;
}

/**
* @coroutine
*
* Emits a value on the observable.
*
* @param mixed $value
*
* @return \Generator
*
* @resolve mixed Resolution value of $value.
*/
public function emit($value = null)
{
$emit = $this->emit;
return $emit($value);
}

/**
* Completes the observable.
*/
public function complete()
{
$this->deferred->resolve();
}

/**
* Throws an error in the observable.
*
* @param mixed $reason
*/
public function error($reason = null)
{
if (!$reason instanceof \Exception) {
$reason = new Exception\ErrorException($reason);
}

$this->deferred->reject($reason);
}
}

0 comments on commit 458647e

Please sign in to comment.