Skip to content

Commit

Permalink
amphp#64 refactor context and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Ekstazi committed Dec 10, 2018
1 parent 80e0b01 commit 866fbf4
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 154 deletions.
2 changes: 1 addition & 1 deletion lib/Context/Context.php
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public function join(): Promise;
/**
* Restarts the execution context.
* @param $force bool Whether to force restart or wait until finish first
* @return Promise
* @return Promise<Context> new context instance
*/
public function restart($force = false): Promise;
}
19 changes: 15 additions & 4 deletions lib/Context/Process.php
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ public function start(): Promise
try {
$pid = yield $this->process->start();

yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH));

$this->channel = yield $this->hub->accept($pid);
$this->channel = yield $this->createChannel();

return $pid;
} catch (\Throwable $exception) {
Expand All @@ -199,6 +197,16 @@ public function start(): Promise
});
}

private function createChannel(): Promise
{
return call(function () {
$pid = $this->process->getPid();
yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH));
return $this->hub->accept($pid);
});
}


/**
* {@inheritdoc}
*/
Expand Down Expand Up @@ -370,7 +378,10 @@ public function restart($force = false): Promise
} else {
yield $this->join();
}
yield $this->start();
$instance = clone $this;
$instance->process = yield $this->process->restart($force);
$instance->channel = yield $instance->createChannel();
return $instance;
});
}
}
4 changes: 3 additions & 1 deletion lib/Context/Thread.php
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,9 @@ public function restart($force = false): Promise
} else {
yield $this->join();
}
yield $this->start();
$instance = new static($this->function, ...$this->args);
yield $instance->start();
return $instance;
});
}
}
9 changes: 3 additions & 6 deletions lib/Worker/DefaultPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,7 @@ private function pull(): Worker
return $worker;
}

public function restart($force = false): Promise
{

}


public function restart($force = false): Promise
{
}
}
10 changes: 4 additions & 6 deletions lib/Worker/Internal/PooledWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,8 @@ public function kill()
$this->worker->kill();
}

public function restart($force = false): Promise
{
return $this->worker->restart($force);
}


public function restart($force = false): Promise
{
return $this->worker->restart($force);
}
}
25 changes: 17 additions & 8 deletions lib/Worker/Internal/WorkerProcess.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,8 @@ public function start(): Promise
return call(function () {
$result = yield $this->process->start();

$stdout = $this->process->getStdout();
$stdout->unreference();

$stderr = $this->process->getStderr();
$stderr->unreference();

ByteStream\pipe($stdout, ByteStream\getStdout());
ByteStream\pipe($stderr, ByteStream\getStderr());
$this->redirectOutput();

return $result;
});
Expand All @@ -69,7 +63,22 @@ public function restart($force = false): Promise
} else {
yield $this->join();
}
return $this->start();
$instance = clone $this;
$instance->process = yield $this->process->restart();
$instance->redirectOutput();
return $instance;
});
}

private function redirectOutput()
{
$stdout = $this->process->getStdout();
$stdout->unreference();

$stderr = $this->process->getStderr();
$stderr->unreference();

ByteStream\pipe($stdout, ByteStream\getStdout());
ByteStream\pipe($stderr, ByteStream\getStderr());
}
}
22 changes: 11 additions & 11 deletions lib/Worker/TaskWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -151,15 +151,15 @@ public function kill()
$this->exitStatus = new Success(0);
}

public function restart($force = false): Promise
{
return call(function () use ($force){
if($force) {
$this->context->kill();
} else {
yield $this->shutdown();
}
yield $this->context->start();
});
}
public function restart($force = false): Promise
{
return call(function () use ($force) {
if ($force) {
$this->context->kill();
} else {
yield $this->shutdown();
}
yield $this->context->start();
});
}
}
12 changes: 6 additions & 6 deletions lib/Worker/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ public function shutdown(): Promise;
*/
public function kill();

/**
* Restart worker
* @param bool $force Whether for cancel current task or wait it finished
* @return Promise
*/
public function restart($force = false): Promise;
/**
* Restart worker.
* @param bool $force Whether for cancel current task or wait it finished
* @return Promise
*/
public function restart($force = false): Promise;
}
72 changes: 25 additions & 47 deletions test/Context/AbstractContextTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,26 @@ public function testStartWhileRunningThrowsError()
});
}

/**
* @expectedException \Amp\Parallel\Context\StatusError
*/
public function testStartMultipleTimesThrowsError()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
\sleep(1);
});

yield $context->start();
yield $context->join();

yield $context->start();
yield $context->join();
});
}, 2000);
}

/**
* @expectedException \Amp\Parallel\Sync\PanicError
*/
Expand Down Expand Up @@ -289,62 +309,20 @@ public function testExitingContextOnSend()
});
}

public function testStartAfterJoin()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
\usleep(100000);
});
for ($i=0; $i<=1;$i++) {
$this->assertFalse($context->isRunning());

yield $context->start();

$this->assertTrue($context->isRunning());

yield $context->join();
$this->assertFalse($context->isRunning());
}
});
}, 200);
}

public function testStartAfterKill()
{
$this->assertRunTimeLessThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
\usleep(100000);
});
for ($i=0; $i<=1;$i++) {
$this->assertFalse($context->isRunning());

yield $context->start();

$this->assertTrue($context->isRunning());

$this->assertRunTimeLessThan([$context, 'kill'], 1000);
$this->assertFalse($context->isRunning());
}
});
}, 200);
}

public function testRestart()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
$context = $original = $this->createContext(function () {
\usleep(100000);
});
$this->assertFalse($context->isRunning());
yield $context->start();

for ($i = 0; $i <= 1; $i++) {
$this->assertTrue($context->isRunning());

yield $context->restart();
$context = yield $context->restart();
$this->assertNotSame($context, $original);
}
});
}, 200);
Expand All @@ -354,7 +332,7 @@ public function testForceRestart()
{
$this->assertRunTimeLessThan(function () {
Loop::run(function () {
$context = $this->createContext(function () {
$context = $original = $this->createContext(function () {
\usleep(100000);
});
$this->assertFalse($context->isRunning());
Expand All @@ -363,7 +341,7 @@ public function testForceRestart()
for ($i = 0; $i <= 1; $i++) {
$this->assertTrue($context->isRunning());

yield $context->restart(true);
$context = yield $context->restart(true);
}
});
}, 200);
Expand Down
43 changes: 4 additions & 39 deletions test/Context/ProcessTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,56 +85,21 @@ public function testParseError()
});
}

public function testStartAfterJoin()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = new Process(__DIR__ . "/wait-process.php");
for ($i=0; $i<=1; $i++) {
$this->assertFalse($context->isRunning());

yield $context->start();

$this->assertTrue($context->isRunning());

yield $context->join();
$this->assertFalse($context->isRunning());
}
});
}, 2000);
}

public function testStartAfterKill()
{
$this->assertRunTimeLessThan(function () {
Loop::run(function () {
$context = new Process(__DIR__ . "/wait-process.php");
for ($i=0; $i<=1;$i++) {
$this->assertFalse($context->isRunning());

yield $context->start();

$this->assertTrue($context->isRunning());

$this->assertRunTimeLessThan([$context, 'kill'], 1000);
$this->assertFalse($context->isRunning());
}
});
}, 2000);
}

public function testRestart()
{
$this->assertRunTimeGreaterThan(function () {
Loop::run(function () {
$context = new Process(__DIR__ . "/wait-process.php");
$context = $original = new Process(__DIR__ . "/wait-process.php");
$this->assertFalse($context->isRunning());
yield $context->start();

for ($i = 0; $i <= 1; $i++) {
$this->assertTrue($context->isRunning());

yield $context->restart();
$context = yield $context->restart();

$this->assertNotSame($context, $original);
}
});
}, 2000);
Expand Down
49 changes: 24 additions & 25 deletions test/Worker/AbstractWorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -264,30 +264,29 @@ public function run(Environment $environment)
}


public function testRestart()
{
Loop::run(function () {
$worker = $this->createWorker();
for($i=0; $i<=1; $i++) {
$returnValue = yield $worker->enqueue(new TestTask(42));
$this->assertEquals(42, $returnValue);

yield $worker->restart();
}
});
}

public function testForceRestart()
{
Loop::run(function () {
$worker = $this->createWorker();
for($i=0; $i<=1; $i++) {
$returnValue = yield $worker->enqueue(new TestTask(42));
$this->assertEquals(42, $returnValue);

yield $worker->restart(true);
}
});
}
public function testRestart()
{
Loop::run(function () {
$worker = $this->createWorker();
for ($i=0; $i<=1; $i++) {
$returnValue = yield $worker->enqueue(new TestTask(42));
$this->assertEquals(42, $returnValue);

yield $worker->restart();
}
});
}

public function testForceRestart()
{
Loop::run(function () {
$worker = $this->createWorker();
for ($i=0; $i<=1; $i++) {
$returnValue = yield $worker->enqueue(new TestTask(42));
$this->assertEquals(42, $returnValue);

yield $worker->restart(true);
}
});
}
}

0 comments on commit 866fbf4

Please sign in to comment.