Skip to content

Commit

Permalink
Add Flow
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Nov 29, 2018
1 parent 5889f4e commit 41b18fe
Show file tree
Hide file tree
Showing 6 changed files with 508 additions and 0 deletions.
59 changes: 59 additions & 0 deletions lib/AsyncGenerator.php
@@ -0,0 +1,59 @@
<?php

namespace Amp;

final class AsyncGenerator implements Flow
{
/** @var \Amp\Flow */
private $flow;

/**
* @param callable(callable(mixed $value, mixed $key = null): Promise $yield): \Generator $callable
*
* @throws \Error Thrown if the callable does not return a Generator.
*/
public function __construct(callable $callable)
{
$generator = new class {
use Internal\Generator {
yield as public;
complete as public;
fail as public;
}
};

if (\PHP_VERSION_ID < 70100) {
$yield = static function ($value, $key = null) use ($generator): Promise {
return $generator->yield($value, $key);
};
} else {
$yield = \Closure::fromCallable([$generator, "yield"]);
}

$result = $callable($yield);

if (!$result instanceof \Generator) {
throw new \Error("The callable did not return a Generator");
}

$coroutine = new Coroutine($result);
$coroutine->onResolve(static function ($exception) use ($generator) {
if ($exception) {
$generator->fail($exception);
return;
}

$generator->complete();
});

$this->flow = $generator->iterate();
}

/**
* {@inheritdoc}
*/
public function continue(): Promise
{
return $this->flow->continue();
}
}
17 changes: 17 additions & 0 deletions lib/DisposedException.php
@@ -0,0 +1,17 @@
<?php

namespace Amp;

/**
* Will be thrown in case an operation is cancelled.
*
* @see CancellationToken
* @see CancellationTokenSource
*/
class DisposedException extends \Exception
{
public function __construct(\Throwable $previous = null)
{
parent::__construct("The flow has been disposed", 0, $previous);
}
}
20 changes: 20 additions & 0 deletions lib/Flow.php
@@ -0,0 +1,20 @@
<?php

namespace Amp;

/**
* Defines a flow, an asynchronous generator that yields key/value pairs as data is available.
*/
interface Flow
{
/**
* Succeeds with a [value, key] pair or null if no more values are available. If the flow fails, the returned promise
* will fail with the same exception.
*
* @return \Amp\Promise<[mixed $value, mixed $key]|null>
*
* @throws \Error If the prior promise returned from this method has not resolved.
* @throws \Throwable The exception used to fail the flow.
*/
public function continue(): Promise;
}
64 changes: 64 additions & 0 deletions lib/Generator.php
@@ -0,0 +1,64 @@
<?php

namespace Amp;

/**
* Generator is a container for a Flow that can yield values using the yield() method and completed using the
* complete() and fail() methods of this object. The contained Flow may be accessed using the iterate()
* method. This object should not be part of a public API, but used internally to create and yield values to a Flow.
*/
final class Generator
{
/** @var object Has public yield, complete, and fail methods. */
private $generator;

public function __construct()
{
$this->generator = new class {
use Internal\Generator {
yield as public;
complete as public;
fail as public;
}
};
}

/**
* @return \Amp\Flow
*/
public function iterate(): Flow
{
return $this->generator->flow();
}

/**
* Yields a value to the flow.
*
* @param mixed $value
* @param mixed $key Using null auto-generates an incremental integer key.
*
* @return \Amp\Promise
*/
public function yield($value, $key = null): Promise
{
return $this->generator->yield($value, $key);
}

/**
* Completes the flow.
*/
public function complete()
{
$this->generator->complete();
}

/**
* Fails the flow with the given reason.
*
* @param \Throwable $reason
*/
public function fail(\Throwable $reason)
{
$this->generator->fail($reason);
}
}
222 changes: 222 additions & 0 deletions lib/Internal/Generator.php
@@ -0,0 +1,222 @@
<?php

namespace Amp\Internal;

use Amp\Deferred;
use Amp\DisposedException;
use Amp\Failure;
use Amp\Flow;
use Amp\Promise;
use Amp\Success;

/**
* Trait used by Iterator implementations. Do not use this trait in your code, instead compose your class from one of
* the available classes implementing \Amp\Iterator.
*
* @internal
*/
trait Generator
{
/** @var \Amp\Promise|null */
private $complete;

/** @var mixed[] */
private $values = [];

/** @var \Amp\Deferred[] */
private $backPressure = [];

/** @var \Amp\Deferred|null */
private $waiting;

/** @var bool */
private $disposed = false;

/** @var null|array */
private $resolutionTrace;

/** @var int. */
private $nextKey = 0;

/**
* Returns an flow instance that when destroyed fails further calls to yield() with an instance of \Amp\DisposedException.
*
* @return \Amp\Flow
*/
public function iterate(): Flow
{
$values = &$this->values;
$backPressure = &$this->backPressure;
$complete = &$this->complete;
$waiting = &$this->waiting;
$disposed = &$this->disposed;

return new class($values, $backPressure, $disposed, $waiting, $complete) implements Flow {
/** @var \Amp\Promise|null */
private $complete;

/** @var mixed[] */
private $values = [];

/** @var \Amp\Deferred[] */
private $backPressure = [];

/** @var \Amp\Deferred|null */
private $waiting;

/** @var bool */
private $disposed = false;

/** @var int */
private $position = -1;

public function __construct(
array &$values,
array &$backpressure,
bool &$disposed,
Promise &$waiting = null,
Promise &$complete = null
) {
$this->values = &$values;
$this->backPressure = &$backpressure;
$this->disposed = &$disposed;
$this->waiting = &$waiting;
$this->complete = &$complete;
}

public function __destruct()
{
if (!empty($this->backPressure)) {
for ($key = \key($this->backPressure); isset($this->backPressure[$key]); $key++) {
$deferred = $this->backPressure[$key];
unset($this->values[$key], $this->backPressure[$key]);
$deferred->resolve();
}
}

$this->disposed = true;
}

public function continue(): Promise
{
if ($this->waiting !== null) {
throw new \Error("The prior promise returned must resolve before invoking this method again");
}

if (isset($this->backPressure[$this->position])) {
$deferred = $this->backPressure[$this->position];
unset($this->backPressure[$this->position]);
$deferred->resolve();
}

unset($this->values[$this->position]);

++$this->position;

if (isset($this->values[$this->position])) {
return new Success($this->values[$this->position]);
}

if ($this->complete) {
return $this->complete;
}

$this->waiting = new Deferred;
return $this->waiting->promise();
}
};
}

/**
* Yields a value from the flow. The returned promise is resolved with the yielded value once all disposed
* have been invoked.
*
* @param mixed $value
* @param mixed $key Using null auto-generates an incremental integer key.
*
* @return \Amp\Promise
*
* @throws \Error If the iterator has completed.
*/
private function yield($value, $key = null): Promise
{
if ($this->complete) {
throw new \Error("Flows cannot yield values after calling complete");
}

if ($this->disposed) {
return new Failure(new DisposedException);
}

if ($key === null) {
$key = $this->nextKey++;
} elseif (\is_int($key) && $key > $this->nextKey) {
$this->nextKey = $key + 1;
}

$this->values[] = $yielded = [$value, $key];
$this->backPressure[] = $pressure = new Deferred;

if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve($yielded);
}

return $pressure->promise();
}

/**
* Completes the flow.
*
* @throws \Error If the flow has already been completed.
*/
private function complete()
{
if ($this->complete) {
$message = "Flow has already been completed";

if (isset($this->resolutionTrace)) {
$trace = formatStacktrace($this->resolutionTrace);
$message .= ". Previous completion trace:\n\n{$trace}\n\n";
} else {
// @codeCoverageIgnoreStart
$message .= ", define environment variable AMP_DEBUG or const AMP_DEBUG = true and enable assertions "
. "for a stacktrace of the previous resolution.";
// @codeCoverageIgnoreEnd
}

throw new \Error($message);
}

\assert((function () {
$env = \getenv("AMP_DEBUG");
if (($env !== "0" && $env !== "false") || (\defined("AMP_DEBUG") && \AMP_DEBUG)) {
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS);
\array_shift($trace); // remove current closure
$this->resolutionTrace = $trace;
}

return true;
})());

$this->complete = new Success;

if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve($this->complete);
}
}

private function fail(\Throwable $exception)
{
$this->complete = new Failure($exception);

if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve($this->complete);
}
}
}

0 comments on commit 41b18fe

Please sign in to comment.