-
-
Notifications
You must be signed in to change notification settings - Fork 255
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
508 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
<?php | ||
|
||
namespace Amp; | ||
|
||
final class AsyncGenerator implements Flow | ||
{ | ||
/** @var \Amp\Flow */ | ||
private $flow; | ||
|
||
/** | ||
* @param callable(callable(mixed $value, mixed $key = null): Promise $yield): \Generator $callable | ||
* | ||
* @throws \Error Thrown if the callable does not return a Generator. | ||
*/ | ||
public function __construct(callable $callable) | ||
{ | ||
$generator = new class { | ||
use Internal\Generator { | ||
yield as public; | ||
complete as public; | ||
fail as public; | ||
} | ||
}; | ||
|
||
if (\PHP_VERSION_ID < 70100) { | ||
$yield = static function ($value, $key = null) use ($generator): Promise { | ||
return $generator->yield($value, $key); | ||
}; | ||
} else { | ||
$yield = \Closure::fromCallable([$generator, "yield"]); | ||
} | ||
|
||
$result = $callable($yield); | ||
|
||
if (!$result instanceof \Generator) { | ||
throw new \Error("The callable did not return a Generator"); | ||
} | ||
|
||
$coroutine = new Coroutine($result); | ||
$coroutine->onResolve(static function ($exception) use ($generator) { | ||
if ($exception) { | ||
$generator->fail($exception); | ||
return; | ||
} | ||
|
||
$generator->complete(); | ||
}); | ||
|
||
$this->flow = $generator->iterate(); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function continue(): Promise | ||
{ | ||
return $this->flow->continue(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
<?php | ||
|
||
namespace Amp; | ||
|
||
/** | ||
* Will be thrown in case an operation is cancelled. | ||
* | ||
* @see CancellationToken | ||
* @see CancellationTokenSource | ||
*/ | ||
class DisposedException extends \Exception | ||
{ | ||
public function __construct(\Throwable $previous = null) | ||
{ | ||
parent::__construct("The flow has been disposed", 0, $previous); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
<?php | ||
|
||
namespace Amp; | ||
|
||
/** | ||
* Defines a flow, an asynchronous generator that yields key/value pairs as data is available. | ||
*/ | ||
interface Flow | ||
{ | ||
/** | ||
* Succeeds with a [value, key] pair or null if no more values are available. If the flow fails, the returned promise | ||
* will fail with the same exception. | ||
* | ||
* @return \Amp\Promise<[mixed $value, mixed $key]|null> | ||
* | ||
* @throws \Error If the prior promise returned from this method has not resolved. | ||
* @throws \Throwable The exception used to fail the flow. | ||
*/ | ||
public function continue(): Promise; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,64 @@ | ||
<?php | ||
|
||
namespace Amp; | ||
|
||
/** | ||
* Generator is a container for a Flow that can yield values using the yield() method and completed using the | ||
* complete() and fail() methods of this object. The contained Flow may be accessed using the iterate() | ||
* method. This object should not be part of a public API, but used internally to create and yield values to a Flow. | ||
*/ | ||
final class Generator | ||
{ | ||
/** @var object Has public yield, complete, and fail methods. */ | ||
private $generator; | ||
|
||
public function __construct() | ||
{ | ||
$this->generator = new class { | ||
use Internal\Generator { | ||
yield as public; | ||
complete as public; | ||
fail as public; | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* @return \Amp\Flow | ||
*/ | ||
public function iterate(): Flow | ||
{ | ||
return $this->generator->flow(); | ||
} | ||
|
||
/** | ||
* Yields a value to the flow. | ||
* | ||
* @param mixed $value | ||
* @param mixed $key Using null auto-generates an incremental integer key. | ||
* | ||
* @return \Amp\Promise | ||
*/ | ||
public function yield($value, $key = null): Promise | ||
{ | ||
return $this->generator->yield($value, $key); | ||
} | ||
|
||
/** | ||
* Completes the flow. | ||
*/ | ||
public function complete() | ||
{ | ||
$this->generator->complete(); | ||
} | ||
|
||
/** | ||
* Fails the flow with the given reason. | ||
* | ||
* @param \Throwable $reason | ||
*/ | ||
public function fail(\Throwable $reason) | ||
{ | ||
$this->generator->fail($reason); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,222 @@ | ||
<?php | ||
|
||
namespace Amp\Internal; | ||
|
||
use Amp\Deferred; | ||
use Amp\DisposedException; | ||
use Amp\Failure; | ||
use Amp\Flow; | ||
use Amp\Promise; | ||
use Amp\Success; | ||
|
||
/** | ||
* Trait used by Iterator implementations. Do not use this trait in your code, instead compose your class from one of | ||
* the available classes implementing \Amp\Iterator. | ||
* | ||
* @internal | ||
*/ | ||
trait Generator | ||
{ | ||
/** @var \Amp\Promise|null */ | ||
private $complete; | ||
|
||
/** @var mixed[] */ | ||
private $values = []; | ||
|
||
/** @var \Amp\Deferred[] */ | ||
private $backPressure = []; | ||
|
||
/** @var \Amp\Deferred|null */ | ||
private $waiting; | ||
|
||
/** @var bool */ | ||
private $disposed = false; | ||
|
||
/** @var null|array */ | ||
private $resolutionTrace; | ||
|
||
/** @var int. */ | ||
private $nextKey = 0; | ||
|
||
/** | ||
* Returns an flow instance that when destroyed fails further calls to yield() with an instance of \Amp\DisposedException. | ||
* | ||
* @return \Amp\Flow | ||
*/ | ||
public function iterate(): Flow | ||
{ | ||
$values = &$this->values; | ||
$backPressure = &$this->backPressure; | ||
$complete = &$this->complete; | ||
$waiting = &$this->waiting; | ||
$disposed = &$this->disposed; | ||
|
||
return new class($values, $backPressure, $disposed, $waiting, $complete) implements Flow { | ||
/** @var \Amp\Promise|null */ | ||
private $complete; | ||
|
||
/** @var mixed[] */ | ||
private $values = []; | ||
|
||
/** @var \Amp\Deferred[] */ | ||
private $backPressure = []; | ||
|
||
/** @var \Amp\Deferred|null */ | ||
private $waiting; | ||
|
||
/** @var bool */ | ||
private $disposed = false; | ||
|
||
/** @var int */ | ||
private $position = -1; | ||
|
||
public function __construct( | ||
array &$values, | ||
array &$backpressure, | ||
bool &$disposed, | ||
Promise &$waiting = null, | ||
Promise &$complete = null | ||
) { | ||
$this->values = &$values; | ||
$this->backPressure = &$backpressure; | ||
$this->disposed = &$disposed; | ||
$this->waiting = &$waiting; | ||
$this->complete = &$complete; | ||
} | ||
|
||
public function __destruct() | ||
{ | ||
if (!empty($this->backPressure)) { | ||
for ($key = \key($this->backPressure); isset($this->backPressure[$key]); $key++) { | ||
$deferred = $this->backPressure[$key]; | ||
unset($this->values[$key], $this->backPressure[$key]); | ||
$deferred->resolve(); | ||
} | ||
} | ||
|
||
$this->disposed = true; | ||
} | ||
|
||
public function continue(): Promise | ||
{ | ||
if ($this->waiting !== null) { | ||
throw new \Error("The prior promise returned must resolve before invoking this method again"); | ||
} | ||
|
||
if (isset($this->backPressure[$this->position])) { | ||
$deferred = $this->backPressure[$this->position]; | ||
unset($this->backPressure[$this->position]); | ||
$deferred->resolve(); | ||
} | ||
|
||
unset($this->values[$this->position]); | ||
|
||
++$this->position; | ||
|
||
if (isset($this->values[$this->position])) { | ||
return new Success($this->values[$this->position]); | ||
} | ||
|
||
if ($this->complete) { | ||
return $this->complete; | ||
} | ||
|
||
$this->waiting = new Deferred; | ||
return $this->waiting->promise(); | ||
} | ||
}; | ||
} | ||
|
||
/** | ||
* Yields a value from the flow. The returned promise is resolved with the yielded value once all disposed | ||
* have been invoked. | ||
* | ||
* @param mixed $value | ||
* @param mixed $key Using null auto-generates an incremental integer key. | ||
* | ||
* @return \Amp\Promise | ||
* | ||
* @throws \Error If the iterator has completed. | ||
*/ | ||
private function yield($value, $key = null): Promise | ||
{ | ||
if ($this->complete) { | ||
throw new \Error("Flows cannot yield values after calling complete"); | ||
} | ||
|
||
if ($this->disposed) { | ||
return new Failure(new DisposedException); | ||
} | ||
|
||
if ($key === null) { | ||
$key = $this->nextKey++; | ||
} elseif (\is_int($key) && $key > $this->nextKey) { | ||
$this->nextKey = $key + 1; | ||
} | ||
|
||
$this->values[] = $yielded = [$value, $key]; | ||
$this->backPressure[] = $pressure = new Deferred; | ||
|
||
if ($this->waiting !== null) { | ||
$waiting = $this->waiting; | ||
$this->waiting = null; | ||
$waiting->resolve($yielded); | ||
} | ||
|
||
return $pressure->promise(); | ||
} | ||
|
||
/** | ||
* Completes the flow. | ||
* | ||
* @throws \Error If the flow has already been completed. | ||
*/ | ||
private function complete() | ||
{ | ||
if ($this->complete) { | ||
$message = "Flow has already been completed"; | ||
|
||
if (isset($this->resolutionTrace)) { | ||
$trace = formatStacktrace($this->resolutionTrace); | ||
$message .= ". Previous completion trace:\n\n{$trace}\n\n"; | ||
} else { | ||
// @codeCoverageIgnoreStart | ||
$message .= ", define environment variable AMP_DEBUG or const AMP_DEBUG = true and enable assertions " | ||
. "for a stacktrace of the previous resolution."; | ||
// @codeCoverageIgnoreEnd | ||
} | ||
|
||
throw new \Error($message); | ||
} | ||
|
||
\assert((function () { | ||
$env = \getenv("AMP_DEBUG"); | ||
if (($env !== "0" && $env !== "false") || (\defined("AMP_DEBUG") && \AMP_DEBUG)) { | ||
$trace = \debug_backtrace(\DEBUG_BACKTRACE_IGNORE_ARGS); | ||
\array_shift($trace); // remove current closure | ||
$this->resolutionTrace = $trace; | ||
} | ||
|
||
return true; | ||
})()); | ||
|
||
$this->complete = new Success; | ||
|
||
if ($this->waiting !== null) { | ||
$waiting = $this->waiting; | ||
$this->waiting = null; | ||
$waiting->resolve($this->complete); | ||
} | ||
} | ||
|
||
private function fail(\Throwable $exception) | ||
{ | ||
$this->complete = new Failure($exception); | ||
|
||
if ($this->waiting !== null) { | ||
$waiting = $this->waiting; | ||
$this->waiting = null; | ||
$waiting->resolve($this->complete); | ||
} | ||
} | ||
} |
Oops, something went wrong.