From 866fbf494716af7dbefcd6c44ac185c1da7369b6 Mon Sep 17 00:00:00 2001 From: Max Furtuna Date: Mon, 10 Dec 2018 15:52:58 +0200 Subject: [PATCH] #64 refactor context and tests --- lib/Context/Context.php | 2 +- lib/Context/Process.php | 19 +++++-- lib/Context/Thread.php | 4 +- lib/Worker/DefaultPool.php | 9 ++-- lib/Worker/Internal/PooledWorker.php | 10 ++-- lib/Worker/Internal/WorkerProcess.php | 25 +++++++--- lib/Worker/TaskWorker.php | 22 ++++---- lib/Worker/Worker.php | 12 ++--- test/Context/AbstractContextTest.php | 72 ++++++++++----------------- test/Context/ProcessTest.php | 43 ++-------------- test/Worker/AbstractWorkerTest.php | 49 +++++++++--------- 11 files changed, 113 insertions(+), 154 deletions(-) diff --git a/lib/Context/Context.php b/lib/Context/Context.php index 2e4f9a36..c04d942f 100644 --- a/lib/Context/Context.php +++ b/lib/Context/Context.php @@ -35,7 +35,7 @@ public function join(): Promise; /** * Restarts the execution context. * @param $force bool Whether to force restart or wait until finish first - * @return Promise + * @return Promise new context instance */ public function restart($force = false): Promise; } diff --git a/lib/Context/Process.php b/lib/Context/Process.php index def1b6aa..d36b7142 100644 --- a/lib/Context/Process.php +++ b/lib/Context/Process.php @@ -187,9 +187,7 @@ public function start(): Promise try { $pid = yield $this->process->start(); - yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH)); - - $this->channel = yield $this->hub->accept($pid); + $this->channel = yield $this->createChannel(); return $pid; } catch (\Throwable $exception) { @@ -199,6 +197,16 @@ public function start(): Promise }); } + private function createChannel(): Promise + { + return call(function () { + $pid = $this->process->getPid(); + yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH)); + return $this->hub->accept($pid); + }); + } + + /** * {@inheritdoc} */ @@ -370,7 +378,10 @@ public function restart($force = false): Promise } else { yield $this->join(); } - yield $this->start(); + $instance = clone $this; + $instance->process = yield $this->process->restart($force); + $instance->channel = yield $instance->createChannel(); + return $instance; }); } } diff --git a/lib/Context/Thread.php b/lib/Context/Thread.php index 9077b5d1..af14ba5b 100644 --- a/lib/Context/Thread.php +++ b/lib/Context/Thread.php @@ -321,7 +321,9 @@ public function restart($force = false): Promise } else { yield $this->join(); } - yield $this->start(); + $instance = new static($this->function, ...$this->args); + yield $instance->start(); + return $instance; }); } } diff --git a/lib/Worker/DefaultPool.php b/lib/Worker/DefaultPool.php index 965eed3d..de8e7add 100644 --- a/lib/Worker/DefaultPool.php +++ b/lib/Worker/DefaultPool.php @@ -253,10 +253,7 @@ private function pull(): Worker return $worker; } - public function restart($force = false): Promise - { - - } - - + public function restart($force = false): Promise + { + } } diff --git a/lib/Worker/Internal/PooledWorker.php b/lib/Worker/Internal/PooledWorker.php index c5206c7f..9b92ee79 100644 --- a/lib/Worker/Internal/PooledWorker.php +++ b/lib/Worker/Internal/PooledWorker.php @@ -73,10 +73,8 @@ public function kill() $this->worker->kill(); } - public function restart($force = false): Promise - { - return $this->worker->restart($force); - } - - + public function restart($force = false): Promise + { + return $this->worker->restart($force); + } } diff --git a/lib/Worker/Internal/WorkerProcess.php b/lib/Worker/Internal/WorkerProcess.php index 3416605e..d9c3a435 100644 --- a/lib/Worker/Internal/WorkerProcess.php +++ b/lib/Worker/Internal/WorkerProcess.php @@ -38,14 +38,8 @@ public function start(): Promise return call(function () { $result = yield $this->process->start(); - $stdout = $this->process->getStdout(); - $stdout->unreference(); - $stderr = $this->process->getStderr(); - $stderr->unreference(); - - ByteStream\pipe($stdout, ByteStream\getStdout()); - ByteStream\pipe($stderr, ByteStream\getStderr()); + $this->redirectOutput(); return $result; }); @@ -69,7 +63,22 @@ public function restart($force = false): Promise } else { yield $this->join(); } - return $this->start(); + $instance = clone $this; + $instance->process = yield $this->process->restart(); + $instance->redirectOutput(); + return $instance; }); } + + private function redirectOutput() + { + $stdout = $this->process->getStdout(); + $stdout->unreference(); + + $stderr = $this->process->getStderr(); + $stderr->unreference(); + + ByteStream\pipe($stdout, ByteStream\getStdout()); + ByteStream\pipe($stderr, ByteStream\getStderr()); + } } diff --git a/lib/Worker/TaskWorker.php b/lib/Worker/TaskWorker.php index 7777b251..799185b3 100644 --- a/lib/Worker/TaskWorker.php +++ b/lib/Worker/TaskWorker.php @@ -151,15 +151,15 @@ public function kill() $this->exitStatus = new Success(0); } - public function restart($force = false): Promise - { - return call(function () use ($force){ - if($force) { - $this->context->kill(); - } else { - yield $this->shutdown(); - } - yield $this->context->start(); - }); - } + public function restart($force = false): Promise + { + return call(function () use ($force) { + if ($force) { + $this->context->kill(); + } else { + yield $this->shutdown(); + } + yield $this->context->start(); + }); + } } diff --git a/lib/Worker/Worker.php b/lib/Worker/Worker.php index 8e474cb9..9ccdb7c9 100644 --- a/lib/Worker/Worker.php +++ b/lib/Worker/Worker.php @@ -42,10 +42,10 @@ public function shutdown(): Promise; */ public function kill(); - /** - * Restart worker - * @param bool $force Whether for cancel current task or wait it finished - * @return Promise - */ - public function restart($force = false): Promise; + /** + * Restart worker. + * @param bool $force Whether for cancel current task or wait it finished + * @return Promise + */ + public function restart($force = false): Promise; } diff --git a/test/Context/AbstractContextTest.php b/test/Context/AbstractContextTest.php index a1917668..60e7b3e2 100644 --- a/test/Context/AbstractContextTest.php +++ b/test/Context/AbstractContextTest.php @@ -66,6 +66,26 @@ public function testStartWhileRunningThrowsError() }); } + /** + * @expectedException \Amp\Parallel\Context\StatusError + */ + public function testStartMultipleTimesThrowsError() + { + $this->assertRunTimeGreaterThan(function () { + Loop::run(function () { + $context = $this->createContext(function () { + \sleep(1); + }); + + yield $context->start(); + yield $context->join(); + + yield $context->start(); + yield $context->join(); + }); + }, 2000); + } + /** * @expectedException \Amp\Parallel\Sync\PanicError */ @@ -289,53 +309,11 @@ public function testExitingContextOnSend() }); } - public function testStartAfterJoin() - { - $this->assertRunTimeGreaterThan(function () { - Loop::run(function () { - $context = $this->createContext(function () { - \usleep(100000); - }); - for ($i=0; $i<=1;$i++) { - $this->assertFalse($context->isRunning()); - - yield $context->start(); - - $this->assertTrue($context->isRunning()); - - yield $context->join(); - $this->assertFalse($context->isRunning()); - } - }); - }, 200); - } - - public function testStartAfterKill() - { - $this->assertRunTimeLessThan(function () { - Loop::run(function () { - $context = $this->createContext(function () { - \usleep(100000); - }); - for ($i=0; $i<=1;$i++) { - $this->assertFalse($context->isRunning()); - - yield $context->start(); - - $this->assertTrue($context->isRunning()); - - $this->assertRunTimeLessThan([$context, 'kill'], 1000); - $this->assertFalse($context->isRunning()); - } - }); - }, 200); - } - public function testRestart() { $this->assertRunTimeGreaterThan(function () { Loop::run(function () { - $context = $this->createContext(function () { + $context = $original = $this->createContext(function () { \usleep(100000); }); $this->assertFalse($context->isRunning()); @@ -343,8 +321,8 @@ public function testRestart() for ($i = 0; $i <= 1; $i++) { $this->assertTrue($context->isRunning()); - - yield $context->restart(); + $context = yield $context->restart(); + $this->assertNotSame($context, $original); } }); }, 200); @@ -354,7 +332,7 @@ public function testForceRestart() { $this->assertRunTimeLessThan(function () { Loop::run(function () { - $context = $this->createContext(function () { + $context = $original = $this->createContext(function () { \usleep(100000); }); $this->assertFalse($context->isRunning()); @@ -363,7 +341,7 @@ public function testForceRestart() for ($i = 0; $i <= 1; $i++) { $this->assertTrue($context->isRunning()); - yield $context->restart(true); + $context = yield $context->restart(true); } }); }, 200); diff --git a/test/Context/ProcessTest.php b/test/Context/ProcessTest.php index 7baffd79..446300bf 100644 --- a/test/Context/ProcessTest.php +++ b/test/Context/ProcessTest.php @@ -85,56 +85,21 @@ public function testParseError() }); } - public function testStartAfterJoin() - { - $this->assertRunTimeGreaterThan(function () { - Loop::run(function () { - $context = new Process(__DIR__ . "/wait-process.php"); - for ($i=0; $i<=1; $i++) { - $this->assertFalse($context->isRunning()); - - yield $context->start(); - - $this->assertTrue($context->isRunning()); - - yield $context->join(); - $this->assertFalse($context->isRunning()); - } - }); - }, 2000); - } - - public function testStartAfterKill() - { - $this->assertRunTimeLessThan(function () { - Loop::run(function () { - $context = new Process(__DIR__ . "/wait-process.php"); - for ($i=0; $i<=1;$i++) { - $this->assertFalse($context->isRunning()); - - yield $context->start(); - - $this->assertTrue($context->isRunning()); - - $this->assertRunTimeLessThan([$context, 'kill'], 1000); - $this->assertFalse($context->isRunning()); - } - }); - }, 2000); - } public function testRestart() { $this->assertRunTimeGreaterThan(function () { Loop::run(function () { - $context = new Process(__DIR__ . "/wait-process.php"); + $context = $original = new Process(__DIR__ . "/wait-process.php"); $this->assertFalse($context->isRunning()); yield $context->start(); for ($i = 0; $i <= 1; $i++) { $this->assertTrue($context->isRunning()); - yield $context->restart(); + $context = yield $context->restart(); + + $this->assertNotSame($context, $original); } }); }, 2000); diff --git a/test/Worker/AbstractWorkerTest.php b/test/Worker/AbstractWorkerTest.php index 0c721d62..d18b1c06 100644 --- a/test/Worker/AbstractWorkerTest.php +++ b/test/Worker/AbstractWorkerTest.php @@ -264,30 +264,29 @@ public function run(Environment $environment) } - public function testRestart() - { - Loop::run(function () { - $worker = $this->createWorker(); - for($i=0; $i<=1; $i++) { - $returnValue = yield $worker->enqueue(new TestTask(42)); - $this->assertEquals(42, $returnValue); - - yield $worker->restart(); - } - }); - } - - public function testForceRestart() - { - Loop::run(function () { - $worker = $this->createWorker(); - for($i=0; $i<=1; $i++) { - $returnValue = yield $worker->enqueue(new TestTask(42)); - $this->assertEquals(42, $returnValue); - - yield $worker->restart(true); - } - }); - } + public function testRestart() + { + Loop::run(function () { + $worker = $this->createWorker(); + for ($i=0; $i<=1; $i++) { + $returnValue = yield $worker->enqueue(new TestTask(42)); + $this->assertEquals(42, $returnValue); + + yield $worker->restart(); + } + }); + } + public function testForceRestart() + { + Loop::run(function () { + $worker = $this->createWorker(); + for ($i=0; $i<=1; $i++) { + $returnValue = yield $worker->enqueue(new TestTask(42)); + $this->assertEquals(42, $returnValue); + + yield $worker->restart(true); + } + }); + } }