Skip to content

Commit

Permalink
Observable → Emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Jan 2, 2017
1 parent 7cd5053 commit b9c46d1
Show file tree
Hide file tree
Showing 19 changed files with 297 additions and 297 deletions.
38 changes: 38 additions & 0 deletions lib/Emanator.php
@@ -0,0 +1,38 @@
<?php

namespace Amp;

use Interop\Async\Loop;

final class Emanator implements Emitter {
use CallableMaker, Internal\Producer;

/**
* @param callable(callable(mixed $value): Promise $emit): \Generator $emitter
*
* @throws \TypeError Thrown if the callable does not return a Generator.
*/
public function __construct(callable $emitter) {
$result = $emitter($this->callableFromInstanceMethod("emit"));

if (!$result instanceof \Generator) {
throw new \TypeError("The callable did not return a Generator");
}

Loop::defer(function () use ($result) {
$coroutine = new Coroutine($result);
$coroutine->when(function ($exception, $value) {
if ($this->resolved) {
return;
}

if ($exception) {
$this->fail($exception);
return;
}

$this->resolve($value);
});
});
}
}
43 changes: 14 additions & 29 deletions lib/Emitter.php
Expand Up @@ -2,37 +2,22 @@

namespace Amp;

use Interop\Async\Loop;

final class Emitter implements Observable {
use CallableMaker, Internal\Producer;
use Interop\Async\Promise;

/**
* Represents a set of asynchronous values. An emitter is analogous to an asynchronous generator, yielding (emitting)
* values when they are available, returning a value (success value) when the emitter completes or throwing an
* exception (failure reason).
*/
interface Emitter extends Promise {
/**
* @param callable(callable(mixed $value): Promise $emit): \Generator $emitter
* Registers a callback to be invoked each time value is emitted from the emitter. If the function returns an
* promise, back-pressure is applied to the promise until the returned promise is resolved.
*
* Exceptions thrown from $onNext (or failures of promises returned from $onNext) will fail the returned
* Subscriber with the thrown exception.
*
* @throws \Error Thrown if the callable does not return a Generator.
* @param callable $onNext Function invoked each time a value is emitted from the emitter.
*/
public function __construct(callable $emitter) {
$result = $emitter($this->callableFromInstanceMethod("emit"));

if (!$result instanceof \Generator) {
throw new \Error("The callable did not return a Generator");
}

Loop::defer(function () use ($result) {
$coroutine = new Coroutine($result);
$coroutine->when(function ($exception, $value) {
if ($this->resolved) {
return;
}

if ($exception) {
$this->fail($exception);
return;
}

$this->resolve($value);
});
});
}
public function subscribe(callable $onNext);
}
4 changes: 2 additions & 2 deletions lib/Failure.php
Expand Up @@ -5,9 +5,9 @@
use Interop\Async\Promise\ErrorHandler;

/**
* Creates a failed observable using the given exception.
* Creates a failed emitter using the given exception.
*/
final class Failure implements Observable {
final class Failure implements Emitter {
/** @var \Throwable $exception */
private $exception;

Expand Down
Expand Up @@ -2,23 +2,23 @@

namespace Amp\Internal;

use Amp\Observable;
use Amp\Emitter;
use Interop\Async\Promise;

/**
* An observable that cannot externally emit values. Used by Postponed in development mode.
* An emitter that cannot externally emit values. Used by Postponed in development mode.
*
* @internal
*/
final class PrivateObservable implements Observable {
final class PrivateEmitter implements Emitter {
use Producer;

/**
* @param callable(callable $emit, callable $complete, callable $fail): void $emitter
*/
public function __construct(callable $emitter) {
/**
* Emits a value from the observable.
* Emits a value from the emitter.
*
* @param mixed $value
*
Expand All @@ -29,7 +29,7 @@ public function __construct(callable $emitter) {
};

/**
* Completes the observable with the given value.
* Completes the emitter with the given value.
*
* @param mixed $value
*/
Expand All @@ -38,7 +38,7 @@ public function __construct(callable $emitter) {
};

/**
* Fails the observable with the given exception.
* Fails the emitter with the given exception.
*
* @param \Throwable $reason
*/
Expand Down
16 changes: 8 additions & 8 deletions lib/Internal/Producer.php
Expand Up @@ -6,8 +6,8 @@
use Interop\Async\{ Promise, Promise\ErrorHandler };

/**
* Trait used by Observable implementations. Do not use this trait in your code, instead compose your class from one of
* the available classes implementing \Amp\Observable.
* Trait used by Emitter implementations. Do not use this trait in your code, instead compose your class from one of
* the available classes implementing \Amp\Emitter.
* Note that it is the responsibility of the user of this trait to ensure that subscribers have a chance to subscribe first
* before emitting values.
*
Expand All @@ -33,26 +33,26 @@ public function subscribe(callable $onNext) {
}

/**
* Emits a value from the observable. The returned promise is resolved with the emitted value once all subscribers
* Emits a value from the emitter. The returned promise is resolved with the emitted value once all subscribers
* have been invoked.
*
* @param mixed $value
*
* @return \Interop\Async\Promise
*
* @throws \Error If the observable has resolved.
* @throws \Error If the emitter has resolved.
*/
private function emit($value): Promise {
if ($this->resolved) {
throw new \Error("The observable has been resolved; cannot emit more values");
throw new \Error("The emitter has been resolved; cannot emit more values");
}

if ($value instanceof Promise) {
$deferred = new Deferred;
$value->when(function ($e, $v) use ($deferred) {
if ($this->resolved) {
$deferred->fail(
new \Error("The observable was resolved before the promise result could be emitted")
new \Error("The emitter was resolved before the promise result could be emitted")
);
return;
}
Expand Down Expand Up @@ -106,11 +106,11 @@ private function emit($value): Promise {


/**
* Resolves the observable with the given value.
* Resolves the emitter with the given value.
*
* @param mixed $value
*
* @throws \Error If the observable has already been resolved.
* @throws \Error If the emitter has already been resolved.
*/
private function resolve($value = null) {
$this->complete($value);
Expand Down
62 changes: 31 additions & 31 deletions lib/Observer.php → lib/Listener.php
Expand Up @@ -5,18 +5,18 @@
use Interop\Async\Promise;

/**
* Asynchronous iterator that can be used within a coroutine to iterate over the emitted values from an Observable.
* Asynchronous iterator that can be used within a coroutine to iterate over the emitted values from an Emitter.
*
* Example:
* $observer = new Observer($observable); // $observable is an instance of \Amp\Observable
* while (yield $observer->advance()) {
* $emitted = $observer->getCurrent();
* $listener = new Listener($emitter); // $emitter is an instance of \Amp\Emitter
* while (yield $listener->advance()) {
* $emitted = $listener->getCurrent();
* }
* $result = $observer->getResult();
* $result = $listener->getResult();
*/
class Observer {
/** @var \Amp\Observable */
private $observable;
class Listener {
/** @var \Amp\Emitter */
private $emitter;

/** @var mixed[] */
private $values = [];
Expand All @@ -40,17 +40,17 @@ class Observer {
private $exception;

/**
* @param \Amp\Observable $observable
* @param \Amp\Emitter $emitter
*/
public function __construct(Observable $observable) {
$this->observable = $observable;
public function __construct(Emitter $emitter) {
$this->emitter = $emitter;

$deferred = &$this->deferred;
$values = &$this->values;
$deferreds = &$this->deferreds;
$resolved = &$this->resolved;

$this->observable->subscribe(static function ($value) use (&$deferred, &$values, &$deferreds, &$resolved) {
$this->emitter->subscribe(static function ($value) use (&$deferred, &$values, &$deferreds, &$resolved) {
$values[] = $value;
$deferreds[] = $pressure = new Deferred;

Expand All @@ -70,7 +70,7 @@ public function __construct(Observable $observable) {
$result = &$this->result;
$error = &$this->exception;

$this->observable->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$resolved) {
$this->emitter->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$resolved) {
$resolved = true;

if ($exception) {
Expand All @@ -90,7 +90,7 @@ public function __construct(Observable $observable) {
}

/**
* Marks the observer as resolved to relieve back-pressure on the observable.
* Marks the listener as resolved to relieve back-pressure on the emitter.
*/
public function __destruct() {
$this->resolved = true;
Expand All @@ -101,15 +101,15 @@ public function __destruct() {
}

/**
* @return \Amp\Observable The observable being observed.
* @return \Amp\Emitter The emitter being observed.
*/
public function observe(): Observable {
return $this->observable;
public function listen(): Emitter {
return $this->emitter;
}

/**
* Succeeds with true if an emitted value is available by calling getCurrent() or false if the observable has
* resolved. If the observable fails, the returned promise will fail with the same exception.
* Succeeds with true if an emitted value is available by calling getCurrent() or false if the emitter has
* resolved. If the emitter fails, the returned promise will fail with the same exception.
*
* @return \Interop\Async\Promise<bool>
*/
Expand Down Expand Up @@ -141,15 +141,15 @@ public function advance(): Promise {
}

/**
* Gets the last emitted value or throws an exception if the observable has completed.
* Gets the last emitted value or throws an exception if the emitter has completed.
*
* @return mixed Value emitted from observable.
* @return mixed Value emitted from emitter.
*
* @throws \Error If the observable has resolved or advance() was not called before calling this method.
* @throws \Error If the emitter has resolved or advance() was not called before calling this method.
*/
public function getCurrent() {
if (empty($this->values) && $this->resolved) {
throw new \Error("The observable has resolved");
throw new \Error("The emitter has resolved");
}

if (!\array_key_exists($this->position, $this->values)) {
Expand All @@ -160,17 +160,17 @@ public function getCurrent() {
}

/**
* Gets the result of the observable or throws the failure reason. Also throws an exception if the observable has
* Gets the result of the emitter or throws the failure reason. Also throws an exception if the emitter has
* not completed.
*
* @return mixed Final return value of the observable.
* @return mixed Final return value of the emitter.
*
* @throws \Error If the observable has not completed.
* @throws \Throwable The exception used to fail the observable.
* @throws \Error If the emitter has not completed.
* @throws \Throwable The exception used to fail the emitter.
*/
public function getResult() {
if (!$this->resolved) {
throw new \Error("The observable has not resolved");
throw new \Error("The emitter has not resolved");
}

if ($this->exception) {
Expand All @@ -181,15 +181,15 @@ public function getResult() {
}

/**
* Returns an array of values that were not consumed by the Observer before the Observable completed.
* Returns an array of values that were not consumed by the Listener before the Emitter completed.
*
* @return array Unconsumed emitted values.
*
* @throws \Error If the observable has not completed.
* @throws \Error If the emitter has not completed.
*/
public function drain(): array {
if (!$this->resolved) {
throw new \Error("The observable has not resolved");
throw new \Error("The emitter has not resolved");
}

$values = $this->values;
Expand Down
23 changes: 0 additions & 23 deletions lib/Observable.php

This file was deleted.

0 comments on commit b9c46d1

Please sign in to comment.