Skip to content

Commit

Permalink
amphp#64 finished
Browse files Browse the repository at this point in the history
  • Loading branch information
Ekstazi committed Dec 14, 2018
1 parent 0e060da commit f17d827
Show file tree
Hide file tree
Showing 14 changed files with 238 additions and 207 deletions.
7 changes: 0 additions & 7 deletions lib/Context/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,4 @@ public function kill();
* @throws \Amp\Parallel\Sync\PanicError If the context throws an uncaught exception.
*/
public function join(): Promise;

/**
* Restarts the execution context.
* @param $force bool Whether to force restart or wait until finish first
* @return Promise<Context> new context instance
*/
public function restart($force = false): Promise;
}
32 changes: 3 additions & 29 deletions lib/Context/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ public function start(): Promise
try {
$pid = yield $this->process->start();

$this->channel = yield $this->createChannel();
yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH));

$this->channel = yield $this->hub->accept($pid);

return $pid;
} catch (\Throwable $exception) {
Expand All @@ -197,16 +199,6 @@ public function start(): Promise
});
}

private function createChannel(): Promise
{
return call(function () {
$pid = $this->process->getPid();
yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH));
return $this->hub->accept($pid);
});
}


/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -366,22 +358,4 @@ public function kill()
{
$this->process->kill();
}

/**
* {@inheritdoc}
*/
public function restart($force = false): Promise
{
return call(function () use ($force) {
if ($force) {
$this->kill();
} else {
yield $this->join();
}
$instance = clone $this;
$instance->process = yield $this->process->restart($force);
$instance->channel = yield $instance->createChannel();
return $instance;
});
}
}
17 changes: 0 additions & 17 deletions lib/Context/Thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -308,21 +308,4 @@ public function send($data): Promise
return $result;
});
}

/**
* {@inheritdoc}
*/
public function restart($force = false): Promise
{
return call(function () use ($force) {
if ($force) {
$this->kill();
} else {
yield $this->join();
}
$instance = new static($this->function, ...$this->args);
yield $instance->start();
return $instance;
});
}
}
16 changes: 15 additions & 1 deletion lib/Worker/DefaultPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Parallel\Worker;

use function Amp\call;
use Amp\Parallel\Context\StatusError;
use Amp\Promise;

Expand Down Expand Up @@ -69,7 +70,9 @@ public function __construct(int $maxSize = self::DEFAULT_MAX_SIZE, WorkerFactory

$this->push = static function (Worker $worker) use ($workers, $idleWorkers, $busyQueue) {
\assert($workers->contains($worker), "The provided worker was not part of this queue");

if(!$workers->contains($worker)) {
$workers->attach($worker, 1);
}
if (($workers[$worker] -= 1) === 0) {
// Worker is completely idle, remove from busy queue and add to idle queue.
foreach ($busyQueue as $key => $busy) {
Expand Down Expand Up @@ -253,7 +256,18 @@ private function pull(): Worker
return $worker;
}

/**
* {@inheritdoc}
*/
public function restart($force = false): Promise
{
return call(function () use($force) {
if($force) {
$this->kill();
} else {
yield $this->shutdown();
}
return new static($this->maxSize, $this->factory);
});
}
}
6 changes: 5 additions & 1 deletion lib/Worker/Internal/PooledWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Amp\Parallel\Worker\Internal;

use function Amp\call;
use Amp\Parallel\Worker\Task;
use Amp\Parallel\Worker\Worker;
use Amp\Promise;
Expand Down Expand Up @@ -75,6 +76,9 @@ public function kill()

public function restart($force = false): Promise
{
return $this->worker->restart($force);
return call(function() use($force){
$worker = yield $this->worker->restart($force);
return new static($worker, $this->push);
});
}
}
35 changes: 7 additions & 28 deletions lib/Worker/Internal/WorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ public function start(): Promise
return call(function () {
$result = yield $this->process->start();

$stdout = $this->process->getStdout();
$stdout->unreference();

$this->redirectOutput();
$stderr = $this->process->getStderr();
$stderr->unreference();

ByteStream\pipe($stdout, ByteStream\getStdout());
ByteStream\pipe($stderr, ByteStream\getStderr());

return $result;
});
Expand All @@ -54,31 +60,4 @@ public function join(): Promise
{
return $this->process->join();
}

public function restart($force = false): Promise
{
return call(function () use ($force) {
if ($force) {
$this->kill();
} else {
yield $this->join();
}
$instance = clone $this;
$instance->process = yield $this->process->restart();
$instance->redirectOutput();
return $instance;
});
}

private function redirectOutput()
{
$stdout = $this->process->getStdout();
$stdout->unreference();

$stderr = $this->process->getStderr();
$stderr->unreference();

ByteStream\pipe($stdout, ByteStream\getStdout());
ByteStream\pipe($stderr, ByteStream\getStderr());
}
}
9 changes: 7 additions & 2 deletions lib/Worker/TaskWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,13 @@ public function restart($force = false): Promise
yield $this->shutdown();
}

$context = yield $this->context->restart($force);
return new static($context);
return $this->createInstance();
});
}

/**
* Create new instance of worker
* @return Worker
*/
abstract protected function createInstance(): Worker;
}
33 changes: 28 additions & 5 deletions lib/Worker/WorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,20 @@
final class WorkerProcess extends TaskWorker
{
const SCRIPT_PATH = __DIR__ . "/Internal/worker-process.php";
/**
* @var string
*/
private $envClassName;
/**
* @var array|mixed[]
*/
private $env;
/**
* @var null|string
*/
private $binary;

/**
/**
* @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate.
* Defaults to \Amp\Parallel\Worker\BasicEnvironment.
* @param mixed[] $env Array of environment variables to pass to the worker. Empty array inherits from the current
Expand All @@ -20,11 +32,22 @@ final class WorkerProcess extends TaskWorker
*/
public function __construct(string $envClassName = BasicEnvironment::class, array $env = [], string $binary = null)
{
$script = [
self::SCRIPT_PATH,
$this->envClassName = $envClassName;
$this->env = $env;
$this->binary = $binary;

$script = [
self::SCRIPT_PATH,
$envClassName,
];
];

parent::__construct(new Internal\WorkerProcess($script, $env, $binary));
}
}

protected function createInstance(): Worker
{
return new static($this->envClassName, $this->env, $this->binary);
}


}
42 changes: 28 additions & 14 deletions lib/Worker/WorkerThread.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,29 +11,43 @@
*/
final class WorkerThread extends TaskWorker
{
/**
/**
* @var string
*/
private $envClassName;

/**
* @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate.
* Defaults to \Amp\Parallel\Worker\BasicEnvironment.
*/
public function __construct(string $envClassName = BasicEnvironment::class)
{
$this->envClassName = $envClassName;

parent::__construct(new Thread(function (Channel $channel, string $className): Promise {
if (!\class_exists($className)) {
throw new \Error(\sprintf("Invalid environment class name '%s'", $className));
}
if (!\class_exists($className)) {
throw new \Error(\sprintf("Invalid environment class name '%s'", $className));
}

if (!\is_subclass_of($className, Environment::class)) {
throw new \Error(\sprintf("The class '%s' does not implement '%s'", $className, Environment::class));
}
if (!\is_subclass_of($className, Environment::class)) {
throw new \Error(\sprintf("The class '%s' does not implement '%s'", $className, Environment::class));
}

$environment = new $className;
$environment = new $className;

if (!\defined("AMP_WORKER")) {
\define("AMP_WORKER", "amp-worker");
}
if (!\defined("AMP_WORKER")) {
\define("AMP_WORKER", "amp-worker");
}

$runner = new TaskRunner($channel, $environment);
return $runner->run();
$runner = new TaskRunner($channel, $environment);
return $runner->run();
}, $envClassName));
}
}

protected function createInstance(): Worker
{
return new static($this->envClassName);
}


}
38 changes: 0 additions & 38 deletions test/Context/AbstractContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -308,42 +308,4 @@ public function testExitingContextOnSend()
yield $context->send(\str_pad("", 1024 * 1024, "-"));
});
}

public function testRestart()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $original = $this->createContext(function () {
\usleep(100000);
});
$this->assertFalse($context->isRunning());
yield $context->start();

for ($i = 0; $i <= 1; $i++) {
$this->assertTrue($context->isRunning());
$context = yield $context->restart();
$this->assertNotSame($context, $original);
}
});
}, 200);
}

public function testForceRestart()
{
$this->assertRunTimeLessThan(function () {
Loop::run(function () {
$context = $original = $this->createContext(function () {
\usleep(100000);
});
$this->assertFalse($context->isRunning());
yield $context->start();

for ($i = 0; $i <= 1; $i++) {
$this->assertTrue($context->isRunning());

$context = yield $context->restart(true);
}
});
}, 200);
}
}
37 changes: 0 additions & 37 deletions test/Context/ProcessTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -84,41 +84,4 @@ public function testParseError()
\var_dump(yield $process->join());
});
}


public function testRestart()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $original = new Process(__DIR__ . "/wait-process.php");
$this->assertFalse($context->isRunning());
yield $context->start();

for ($i = 0; $i <= 1; $i++) {
$this->assertTrue($context->isRunning());

$context = yield $context->restart();

$this->assertNotSame($context, $original);
}
});
}, 2000);
}

public function testForceRestart()
{
$this->assertRunTimeLessThan(function () {
Loop::run(function () {
$context = new Process(__DIR__ . "/wait-process.php");
$this->assertFalse($context->isRunning());
yield $context->start();

for ($i = 0; $i <= 1; $i++) {
$this->assertTrue($context->isRunning());

yield $context->restart(true);
}
});
}, 2000);
}
}
Loading

0 comments on commit f17d827

Please sign in to comment.