Skip to content

Commit

Permalink
Periodically check Future to see if thread crashed
Browse files Browse the repository at this point in the history
  • Loading branch information
trowski committed Feb 18, 2019
1 parent 7fca17d commit 36d3a3d
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions lib/Context/Parallel.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use Amp\Parallel\Sync\SynchronizationError;
use Amp\Promise;
use parallel\Runtime;
use parallel\TimeoutException as ParallelTimeoutException;
use function Amp\call;

/**
Expand All @@ -19,17 +20,27 @@
*/
final class Parallel implements Context
{
const EXIT_CHECK_FREQUENCY = 250;
const KEY_LENGTH = 32;

/** @var string|null */
private static $autoloadPath;

/** @var int Next ID to be used for IPC hub. */
private static $id = 1;
private static $nextId = 1;

/** @var array Array of [\parallel\Future, ChannelledSocket] pairs. */
private static $futures = [];

/** @var string|null */
private static $watcher;

/** @var Internal\ProcessHub */
private $hub;

/** @var int|null */
private $id;

/** @var Runtime|null */
private $runtime;

Expand All @@ -39,7 +50,7 @@ final class Parallel implements Context
/** @var string Script path. */
private $script;

/** @var mixed[] */
/** @var string[] */
private $args = [];

/** @var int */
Expand All @@ -48,9 +59,6 @@ final class Parallel implements Context
/** @var bool */
private $killed = false;

/** @var \parallel\Future|null */
private $future;

/**
* Checks if threading is enabled.
*
Expand Down Expand Up @@ -130,8 +138,8 @@ public function __construct($script)
public function __clone()
{
$this->runtime = null;
$this->future = null;
$this->channel = null;
$this->id = null;
$this->oid = 0;
$this->killed = false;
}
Expand Down Expand Up @@ -172,13 +180,28 @@ public function start(): Promise
throw new StatusError('The thread has already been started.');
}

if (self::$watcher === null) {
self::$watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () {
foreach (self::$futures as list($future, $channel)) {
try {
$future->value(0);
} catch (ParallelTimeoutException $exception) {
// Ignore timeout – that just means the thread is still running.
} catch (\Throwable $exception) {
$channel->close();
}
}
});
Loop::unreference(self::$watcher);
}

$this->oid = \getmypid();

$this->runtime = new Runtime(self::$autoloadPath);

$id = self::$id++;
$this->id = self::$nextId++;

$this->future = $this->runtime->run(static function (string $uri, string $key, string $path, array $argv): int {
$future = $this->runtime->run(static function (string $uri, string $key, string $path, array $argv): int {
\define("AMP_CONTEXT", "parallel");

if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
Expand All @@ -204,14 +227,15 @@ public function start(): Promise
return $result;
}, [
$this->hub->getUri(),
$this->hub->generateKey($id, self::KEY_LENGTH),
$this->hub->generateKey($this->id, self::KEY_LENGTH),
$this->script,
$this->args
]);

return call(function () use ($id) {
return call(function () use ($future) {
try {
$this->channel = yield $this->hub->accept($id);
$this->channel = $channel = yield $this->hub->accept($this->id);
self::$futures[$this->id] = [$future, $channel];
} catch (\Throwable $exception) {
$this->kill();
throw new ContextException("Starting the parallel runtime failed", 0, $exception);
Expand Down Expand Up @@ -244,6 +268,8 @@ public function kill()
*/
private function close()
{
unset(self::$futures[$this->id]);

$this->runtime = null;

if ($this->channel !== null) {
Expand Down

0 comments on commit 36d3a3d

Please sign in to comment.