Skip to content

Commit

Permalink
Merge branch 'v1.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jan 19, 2016
2 parents 4222472 + 81efaab commit f684727
Show file tree
Hide file tree
Showing 15 changed files with 812 additions and 66 deletions.
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
# Change log
All notable changes to this project will be documented in this file. This project adheres to [Semantic Versioning](http://semver.org/).

## [0.9.4] - 2016-01-16
## [0.9.4] - 2016-01-18
### Added
- Added `of()`, `fail()`, `concat()`, `zip()`, and `range()` functions to the `Icicle\Observable` namespace. See each function for more information.
- Added `reduce()` function to `Icicle\Observable\Observable` that executes an accumulator function on each emitted value, returning an observable that emits the current accumulated value after each invokation of the accumulator.

### Changed
- Passing `null` (now the default argument) to `Icicle\Loop\Loop::maxQueueDepth()` or `Icicle\Loop\maxQueueDepth()` will return the current max queue depth without modifying it.
- `Icicle\Observable\Emitter` can delegate to another instance of `Icicle\Observable\Observable` by emitting the observable. Values from the emitted observable will then be emitted as though they were emitted by the delegating observable. The `$emit` callable will resolve with the return value of the emitted observable.
- `Icicle\Observable\Emitter` now allows multiple coroutines to be created from the `$emit` callable simultaneously. This makes no difference for emitters using `yield` with `$emit`, but simplifies implementations using `$emit` as part of a callback that may be called before the previous value has finished emitting. See `Icicle\Observable\merge()` for an example of a function that uses `$emit` as a callback.
- If an awaitable emitted from `Icicle\Observable\Emitter` is rejected, the observable will fail with the exception used to reject the awaitable.
- `Icicle\Observable\observe()` now takes a callable `$onDisposed` argument that is invoked with the callable passed to the emitting function if the observable is disposed. This function can be used to remove the callable from the event emitter.
Expand Down
4 changes: 3 additions & 1 deletion src/Awaitable/Exception/CircularResolutionError.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,6 @@

namespace Icicle\Awaitable\Exception;

class CircularResolutionError extends \Exception implements Error {}
use Icicle\Exception\InvalidArgumentError;

class CircularResolutionError extends InvalidArgumentError implements Error {}
39 changes: 34 additions & 5 deletions src/Observable/Emitter.php
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ public function each(callable $onNext = null): \Generator
*/
public function map(callable $onNext, callable $onComplete = null): Observable
{
return new self(function (callable $emit) use ($onNext, $onComplete) {
return new self(function (callable $emit) use ($onNext, $onComplete): \Generator {
$iterator = $this->getIterator();
while (yield from $iterator->isValid()) {
$result = $onNext($iterator->getCurrent());
Expand Down Expand Up @@ -208,7 +208,7 @@ public function map(callable $onNext, callable $onComplete = null): Observable
*/
public function filter(callable $callback): Observable
{
return new self(function (callable $emit) use ($callback) {
return new self(function (callable $emit) use ($callback): \Generator {
$iterator = $this->getIterator();
while (yield from $iterator->isValid()) {
$value = $iterator->getCurrent();
Expand All @@ -229,6 +229,35 @@ public function filter(callable $callback): Observable
});
}

/**
* {@inheritdoc}
*/
public function reduce(callable $accumulator, $seed = null): Observable
{
return new self(function (callable $emit) use ($accumulator, $seed): \Generator {
$iterator = $this->getIterator();
if ($seed instanceof Awaitable) {
$carry = yield $seed;
} else {
$carry = $seed;
}

while (yield from $iterator->isValid()) {
$carry = $accumulator($carry, $iterator->getCurrent());

if ($carry instanceof Generator) {
$carry = yield from $carry;
} elseif ($carry instanceof Awaitable) {
$carry = yield $carry;
}

yield from $emit($carry);
}

return $carry;
});
}

/**
* {@inheritdoc}
*/
Expand All @@ -238,7 +267,7 @@ public function throttle(float $time): Observable
return $this->skip(0);
}

return new self(function (callable $emit) use ($time) {
return new self(function (callable $emit) use ($time): \Generator {
$iterator = $this->getIterator();
$start = microtime(true) - $time;

Expand Down Expand Up @@ -297,7 +326,7 @@ public function splat(callable $onNext, callable $onComplete = null): Observable
*/
public function take(int $count): Observable
{
return new self(function (callable $emit) use ($count) {
return new self(function (callable $emit) use ($count): \Generator {
if (0 > $count) {
throw new InvalidArgumentError('The number of values to take must be non-negative.');
}
Expand All @@ -316,7 +345,7 @@ public function take(int $count): Observable
*/
public function skip(int $count): Observable
{
return new self(function (callable $emit) use ($count) {
return new self(function (callable $emit) use ($count): \Generator {
if (0 > $count) {
throw new InvalidArgumentError('The number of values to skip must be non-negative.');
}
Expand Down
14 changes: 14 additions & 0 deletions src/Observable/Exception/CircularEmitError.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php

/*
* This file is part of Icicle, a library for writing asynchronous code in PHP using coroutines built with awaitables.
*
* @copyright 2014-2015 Aaron Piotrowski. All rights reserved.
* @license MIT See the LICENSE file that was distributed with this source code for more information.
*/

namespace Icicle\Observable\Exception;

use Icicle\Exception\InvalidArgumentError;

class CircularEmitError extends InvalidArgumentError implements Error {}
51 changes: 39 additions & 12 deletions src/Observable/Internal/EmitQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace Icicle\Observable\Internal;

use Icicle\Awaitable\{Awaitable, Delayed};
use Icicle\Observable\Exception\{AutoDisposedException, CompletedError};
use Icicle\Observable\Exception\{AutoDisposedException, CircularEmitError, CompletedError};
use Icicle\Observable\Observable;

class EmitQueue
Expand Down Expand Up @@ -68,6 +68,7 @@ public function __construct(Observable $observable)
* @return \Generator
*
* @throws \Icicle\Observable\Exception\CompletedError
* @throws \Icicle\Observable\Exception\CircularEmitError
*/
public function push($value): \Generator
{
Expand All @@ -82,21 +83,25 @@ public function push($value): \Generator
$this->busy = true;

try {
if ($value instanceof Awaitable) {
$value = yield $value;
}
if ($value instanceof Observable) {
if ($value === $this->observable) {
throw new CircularEmitError('Cannot emit an observable within itself.');
}

if (null === $this->observable) {
throw new CompletedError();
}
$iterator = $value->getIterator();

while (yield from $iterator->isValid()) {
yield $this->emit($iterator->getCurrent());
}

$this->delayed->resolve($value);
$this->delayed = new Delayed();
return $iterator->getReturn();
}

$placeholder = $this->placeholder;
$this->placeholder = new Placeholder($this->delayed);
if ($value instanceof Awaitable) {
$value = yield $value;
}

yield $placeholder->wait();
yield $this->emit($value);
} catch (\Throwable $exception) {
$this->fail($exception);
throw $exception;
Expand All @@ -111,6 +116,28 @@ public function push($value): \Generator
return $value;
}

/**
* @param mixed $value Value to emit.
*
* @return \Icicle\Awaitable\Awaitable
*
* @throws \Icicle\Observable\Exception\CompletedError Thrown if the observable has completed.
*/
private function emit($value): Awaitable
{
if (null === $this->observable) {
throw new CompletedError();
}

$this->delayed->resolve($value);
$this->delayed = new Delayed();

$placeholder = $this->placeholder;
$this->placeholder = new Placeholder($this->delayed);

return $placeholder->wait();
}

/**
* Increments the number of listening iterators.
*/
Expand Down
14 changes: 13 additions & 1 deletion src/Observable/Observable.php
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ public function map(callable $onNext, callable $onComplete = null): Observable;
*/
public function filter(callable $callback): Observable;

/**
* Reduce function similar to array_reduce(), instead invoking the accumulator as values are emitted. The initial
* seed value may be any value or an awaitable. Each value returned from the accumulator is emitted from the
* returned observable. The observable returns the final value returned from the accumulator.
*
* @param callable $accumulator
* @param mixed $seed
*
* @return \Icicle\Observable\Observable
*/
public function reduce(callable $accumulator, $seed = null): Observable;

/**
* Throttles the observable to only emit a value every $time seconds.
*
Expand All @@ -77,7 +89,7 @@ public function throttle(float $time): Observable;
* This method is a modified form of map() that expects the observable to emit an array or Traversable that is
* used as arguments to the given callback function. The array is key sorted before being used as function
* arguments. If the observable does not emit an array or Traversable, the observable will error with an instance
* of Icicle\Observable\Exception\InvalidArgumentError.
* of Icicle\Exception\UnexpectedTypeError.
*
* @param callable(mixed ...$args): mixed $onNext
* @param callable(mixed ...$args): mixed|null $onComplete
Expand Down
6 changes: 3 additions & 3 deletions src/Observable/ObservableIterator.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ interface ObservableIterator
/**
* @coroutine
*
* Resolves with true if a new value is available by calling current() or false if the observable has completed.
* Calling current() will throw an exception if the observable completed. If an error occurs with the observable,
* Resolves with true if a new value is available by calling getCurrent() or false if the observable has completed.
* Calling getCurrent() will throw an exception if the observable completed. If an error occurs with the observable,
* this coroutine will be rejected with the exception used to fail the observable.
*
* @return \Generator
Expand All @@ -32,7 +32,7 @@ public function isValid(): \Generator;
* @return mixed Value emitted from observable.
*
* @throws \Icicle\Observable\Exception\CompletedError If the observable has successfully completed.
* @throws \Icicle\Observable\Exception\UninitializedError If wait() was not called before calling this method.
* @throws \Icicle\Observable\Exception\UninitializedError If isValid() was not called before calling this method.
* @throws \Exception Exception used to fail the observable.
*/
public function getCurrent();
Expand Down

0 comments on commit f684727

Please sign in to comment.