Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
antonmedv committed Jul 22, 2020
1 parent 97347ba commit f512cfd
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 56 deletions.
26 changes: 7 additions & 19 deletions src/Console/WorkerCommand.php
Expand Up @@ -7,11 +7,11 @@

namespace Deployer\Console;

use Deployer\Collection\PersistentCollection;
use Deployer\Deployer;
use Deployer\Exception\Exception;
use Deployer\Exception\GracefulShutdownException;
use Deployer\Exception\NonFatalException;
use Deployer\Exception\RunException;
use Deployer\Executor\Worker;
use Deployer\Task\Context;
use Symfony\Component\Console\Input\InputArgument;
use Symfony\Component\Console\Input\InputInterface;
Expand Down Expand Up @@ -46,8 +46,8 @@ protected function execute(InputInterface $input, OutputInterface $output)
define('NO_ANSI', 'true');
}

$host = $this->deployer->hosts->get($input->getArgument('worker-host'));
$task = $this->deployer->tasks->get($input->getArgument('worker-task'));
$host = $this->deployer->hosts->get($input->getArgument('worker-host'));

$this->deployer->config->set('config_directory', $input->getArgument('config-directory'));
$host->getConfig()->load();
Expand All @@ -56,22 +56,10 @@ protected function execute(InputInterface $input, OutputInterface $output)
$this->deployer->config->set($name, $value);
}

try {
Exception::setTaskSourceLocation($task->getSourceLocation());
$worker = new Worker($this->deployer);
$exitCode = $worker->execute($task, $host);

$task->run(new Context($host, $input, $output));

if ($task->getName() !== 'connect') {
$this->deployer->messenger->endOnHost($host);
}
$host->getConfig()->save();
return 0;
} catch (GracefulShutdownException $e) {
$this->deployer->messenger->renderException($e, $host);
return GracefulShutdownException::EXIT_CODE;
} catch (\Throwable $e) {
$this->deployer->messenger->renderException($e, $host);
return 255;
}
$host->getConfig()->save();
return $exitCode;
}
}
6 changes: 3 additions & 3 deletions src/Deployer.php
Expand Up @@ -23,7 +23,7 @@
use Deployer\Console\TreeCommand;
use Deployer\Console\WorkerCommand;
use Deployer\Executor\Messenger;
use Deployer\Executor\ParallelExecutor;
use Deployer\Executor\Master;
use Deployer\Logger\Handler\FileHandler;
use Deployer\Logger\Handler\NullHandler;
use Deployer\Logger\Logger;
Expand Down Expand Up @@ -56,7 +56,7 @@
* @property ProcessRunner $processRunner
* @property Task\ScriptManager $scriptManager
* @property Selector $selector
* @property ParallelExecutor $executor
* @property Master $executor
* @property Messenger $messenger
* @property Messenger $logger
* @property Printer $pop
Expand Down Expand Up @@ -146,7 +146,7 @@ public function __construct(Application $console)
return new Messenger($c['input'], $c['output']);
};
$this['executor'] = function ($c) {
return new ParallelExecutor(
return new Master(
$c['input'],
$c['output'],
$c['messenger'],
Expand Down
41 changes: 26 additions & 15 deletions src/Executor/ParallelExecutor.php → src/Executor/Master.php
Expand Up @@ -9,17 +9,15 @@

use Deployer\Component\Ssh\Client;
use Deployer\Configuration\Configuration;
use Deployer\Exception\Exception;
use Deployer\Exception\GracefulShutdownException;
use Deployer\Console\WorkerCommand;
use Deployer\Deployer;
use Deployer\Host\Host;
use Deployer\Host\Localhost;
use Deployer\Selector\Selector;
use Deployer\Task\Context;
use Deployer\Task\Task;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Process\Process;
use function Deployer\Support\str_contains;

const FRAMES = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];

Expand All @@ -29,7 +27,7 @@ function spinner($message = '')
return " $frame $message\r";
}

class ParallelExecutor
class Master
{
private $input;
private $output;
Expand Down Expand Up @@ -69,7 +67,7 @@ private function connect(array $hosts)
if ($host instanceof Localhost) {
continue;
}
$process = $this->getProcess($host, new Task('connect'), true);
$process = $this->getProcess($host, new Task('connect'));
$process->start();

while ($process->isRunning()) {
Expand Down Expand Up @@ -130,7 +128,7 @@ public function run(array $tasks, array $hosts, $plan = null): int
continue;
}

$exitCode = $this->runTask($task, [$currentHost], true);
$exitCode = $this->runTask($task, [$currentHost]);
if ($exitCode !== 0) {
return $exitCode;
}
Expand All @@ -151,7 +149,7 @@ public function run(array $tasks, array $hosts, $plan = null): int
continue;
}

$exitCode = $this->runTask($task, $selectedHosts, false);
$exitCode = $this->runTask($task, $selectedHosts);
if ($exitCode !== 0) {
return $exitCode;
}
Expand All @@ -169,14 +167,29 @@ public function run(array $tasks, array $hosts, $plan = null): int
/**
* @param Task $task
* @param Host[] $hosts
* @param bool $tty
* @return int
*/
private function runTask(Task $task, array $hosts, bool $tty): int
private function runTask(Task $task, array $hosts): int
{
if (getenv('DEPLOYER_LOCAL_WORKER') === 'true') {
// This allows to code coverage all recipe,
// as well as speedup tests by not spawning
// lots of processes. Also there is a few tests
// what runs with workers for tests subprocess
// communications.
foreach ($hosts as $host) {
$worker = new Worker(Deployer::get());
$exitCode = $worker->execute($task, $host);
if ($exitCode !== 0) {
return $exitCode;
}
}
return 0;
}

$processes = [];
foreach ($hosts as $host) {
$processes[] = $this->getProcess($host, $task, $tty);
$processes[] = $this->getProcess($host, $task);
}

$callback = function (string $output) use (&$showSpinner) {
Expand All @@ -202,7 +215,7 @@ private function runTask(Task $task, array $hosts, bool $tty): int
return $this->cumulativeExitCode($processes);
}

protected function getProcess(Host $host, Task $task, bool $tty): Process
protected function getProcess(Host $host, Task $task): Process
{
$dep = PHP_BINARY . ' ' . DEPLOYER_BIN;
$configDirectory = $host->get('config_directory');
Expand All @@ -213,9 +226,7 @@ protected function getProcess(Host $host, Task $task, bool $tty): Process
$this->output->writeln("[{$host->getTag()}] $command");
}

$process = Process::fromShellCommandline($command);
$process->setTty($tty);
return $process;
return Process::fromShellCommandline($command);
}

/**
Expand Down
50 changes: 50 additions & 0 deletions src/Executor/Worker.php
@@ -0,0 +1,50 @@
<?php


namespace Deployer\Executor;


use Deployer\Deployer;
use Deployer\Exception\Exception;
use Deployer\Exception\GracefulShutdownException;
use Deployer\Exception\RunException;
use Deployer\Host\Host;
use Deployer\Task\Context;
use Deployer\Task\Task;
use Throwable;

class Worker
{
/**
* @var Deployer
*/
private $deployer;

public function __construct(Deployer $deployer)
{
$this->deployer = $deployer;
}

public function execute(Task $task, Host $host)
{
try {
Exception::setTaskSourceLocation($task->getSourceLocation());

$task->run(new Context($host, $this->deployer->input, $this->deployer->output));

if ($task->getName() !== 'connect') {
$this->deployer->messenger->endOnHost($host);
}
return 0;
} catch (Throwable $e) {
$this->deployer->messenger->renderException($e, $host);
if ($e instanceof GracefulShutdownException) {
return GracefulShutdownException::EXIT_CODE;
}
if ($e instanceof RunException) {
return $e->getExitCode();
}
return 255;
}
}
}
30 changes: 15 additions & 15 deletions src/functions.php
Expand Up @@ -187,52 +187,52 @@ function task($name, $body = null)
* Call that task before specified task runs.
*
* @param string $task The task before $that should be run.
* @param string|callable $todo The task to be run.
* @param string|callable $do The task to be run.
* @return T|void
*/
function before($task, $todo)
function before($task, $do)
{
if (is_callable($todo)) {
$newTask = task("before:$task", $todo);
if (is_callable($do)) {
$newTask = task("before:$task", $do);
before($task, "before:$task");
return $newTask;
}
task($task)->addBefore($todo);
task($task)->addBefore($do);
}

/**
* Call that task after specified task runs.
*
* @param string $task The task after $that should be run.
* @param string|callable $todo The task to be run.
* @param string|callable $do The task to be run.
* @return T|void
*/
function after($task, $todo)
function after($task, $do)
{
if (is_callable($todo)) {
$newTask = task("after:$task", $todo);
if (is_callable($do)) {
$newTask = task("after:$task", $do);
after($task, "after:$task");
return $newTask;
}
task($task)->addAfter($todo);
task($task)->addAfter($do);
}

/**
* Setup which task run on failure of first.
*
* @param string $task The task which need to fail so $that should be run.
* @param string $todo The task to be run.
* @param string $do The task to be run.
* @return T|void
*/
function fail($task, $todo)
function fail($task, $do)
{
if (is_callable($todo)) {
$newTask = task("fail:$task", $todo);
if (is_callable($do)) {
$newTask = task("fail:$task", $do);
fail($task, "fail:$task");
return $newTask;
}
$deployer = Deployer::get();
$deployer->fail->set($task, $todo);
$deployer->fail->set($task, $do);
}

/**
Expand Down
1 change: 1 addition & 0 deletions test/bootstrap.php
Expand Up @@ -18,5 +18,6 @@

require_once __DIR__ . '/recipe/AbstractTest.php';

putenv('DEPLOYER_LOCAL_WORKER=true');
define('DEPLOYER_BIN', __DIR__ . '/../bin/dep');
define('__TEMP_DIR__', sys_get_temp_dir() . '/deployer');
19 changes: 16 additions & 3 deletions test/recipe/DeployTest.php
Expand Up @@ -16,7 +16,11 @@ public function testDeploy()
$recipe = __DIR__ . '/deploy.php';
$deployer = $this->init($recipe);

$this->tester->run(['deploy', '-s' => 'all', '-f' => $recipe, '-l' => 1], [
$this->tester->run([
'deploy',
'-s' => 'all',
'-f' => $recipe
], [
'verbosity' => Output::VERBOSITY_NORMAL,
'interactive' => false,
]);
Expand All @@ -40,15 +44,24 @@ public function testDeploy()
}
}

public function testDeployParallel()
public function testWorker()
{
// Allow to start workers. Don't forget to disable it later.
putenv('DEPLOYER_LOCAL_WORKER=FALSE');

$recipe = __DIR__ . '/deploy.php';
$this->init($recipe);

$this->tester->run(['deploy', '-f' => $recipe, '-s' => 'all'], [
$this->tester->run([
'deploy',
'-f' => $recipe,
'-s' => 'all'
], [
'verbosity' => Output::VERBOSITY_NORMAL,
]);
self::assertEquals(0, $this->tester->getStatusCode(), $this->tester->getDisplay());

putenv('DEPLOYER_LOCAL_WORKER=true');
}

public function testDeploySelectHosts()
Expand Down
2 changes: 1 addition & 1 deletion test/recipe/deploy.php
Expand Up @@ -34,7 +34,7 @@
]);

task('fail', function () {
run('¯\_(ツ)_/¯');
run('false');
});

fail('deploy:fail', 'deploy:unlock');
Expand Down

0 comments on commit f512cfd

Please sign in to comment.