Skip to content
This repository has been archived by the owner on Mar 4, 2022. It is now read-only.

Commit

Permalink
Add --setup feature, enhance tracker ps killing.
Browse files Browse the repository at this point in the history
Fixes #51

* Add --setup feature for task over lifetime of actions.
* Abort actions with error if setup fails.
* Move tracker to separate file.
* Have tracker kill entire process tree of tasks.
* Wait on tracker to kill everything before finishing tasks.
* Add convenience helper functions.
  • Loading branch information
ryan-roemer committed Dec 14, 2015
1 parent 60cba15 commit c250c0f
Show file tree
Hide file tree
Showing 7 changed files with 196 additions and 65 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ Flags:

* `--builderrc`: Path to builder config file (default: `.builderrc`)
* `--tries`: Number of times to attempt a task (default: `1`)
* `--setup`: Single task to run for the entirety of <action>.

##### builder concurrent

Expand All @@ -173,6 +174,7 @@ Flags:

* `--builderrc`: Path to builder config file (default: `.builderrc`)
* `--tries`: Number of times to attempt a task (default: `1`)
* `--setup`: Single task to run for the entirety of <action>.
* `--queue`: Number of concurrent processes to run (default: unlimited - `0|null`)
* `--[no-]buffer`: Buffer output until process end (default: `false`)

Expand Down Expand Up @@ -209,6 +211,7 @@ Flags:

* `--builderrc`: Path to builder config file (default: `.builderrc`)
* `--tries`: Number of times to attempt a task (default: `1`)
* `--setup`: Single task to run for the entirety of <action>.
* `--queue`: Number of concurrent processes to run (default: unlimited - `0|null`)
* `--[no-]buffer`: Buffer output until process end (default: `false`)
* `--envs-path`: Path to JSON env variable array file (default: `null`)
Expand Down
15 changes: 14 additions & 1 deletion bin/builder-core.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,18 @@ module.exports = function (callback) {
});

// Run the task
task.execute(callback);
var chalk = require("chalk");
var log = require("../lib/log");

log.info("builder-core:start:" + process.pid, "Started: " + chalk.gray(task));
task.execute(function (err) {
if (err) {
log.error("builder-core:end:" + process.pid,
"Ended with error: " + chalk.gray(task) + " - " + chalk.red(err.message.split("\n")[0]));
} else {
log.info("builder-core:end:" + process.pid, "Ended normally: " + chalk.gray(task));
}

callback(err);
});
};
10 changes: 9 additions & 1 deletion lib/args.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ var FLAG_BUFFER = {
types: [Boolean],
default: false
};
var FLAG_SETUP = {
desc: "Single task to run for the entirety of <action>.",
types: [String],
default: function (val) { return val || null; }
};

// Option flags.
var FLAGS = {
Expand All @@ -37,17 +42,20 @@ var FLAGS = {
},

run: {
tries: FLAG_TRIES
tries: FLAG_TRIES,
setup: FLAG_SETUP
},

concurrent: {
tries: FLAG_TRIES,
setup: FLAG_SETUP,
queue: FLAG_QUEUE,
buffer: FLAG_BUFFER
},

envs: {
tries: FLAG_TRIES,
setup: FLAG_SETUP,
queue: FLAG_QUEUE,
buffer: FLAG_BUFFER,
"envs-path": {
Expand Down
96 changes: 56 additions & 40 deletions lib/runner.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ var exec = require("child_process").exec;
var _ = require("lodash");
var async = require("async");
var chalk = require("chalk");
var log = require("../lib/log");
var log = require("./log");
var Tracker = require("./utils/tracker");

/**
* Run a single task.
Expand Down Expand Up @@ -102,45 +103,58 @@ var retry = function (cmd, shOpts, opts, callback) {
};

/**
* Multi-process tracker.
* Add and invoke a setup task if present in options.
*
* @returns {void}
*/
var Tracker = function Tracker() {
this.procs = [];
};

/**
* Add process and track close.
*
* @param {Object} proc Child process object
* @returns {Object} Child process object
* @param {String} setup Setup task
* @param {Object} shOpts Shell options
* @returns {Object} Process object or `null`.
*/
Tracker.prototype.add = function (proc) {
var self = this;
var addSetup = function (setup, shOpts) {
if (!setup) { return null; }

// Track.
self.procs.push(proc);

// Remove from tracked list when closed.
proc.on("close", function () {
self.procs = self.procs.filter(function (obj) {
return obj.pid !== proc.pid;
});
var done = _.once(function (code) {
var level = code === 0 ? "info" : "error";
log[level]("setup:end", "Setup command ended with code: " + code);
});

log.info("setup:start", "Starting setup task: " + setup);
var proc = run("builder run " + setup, shOpts, {}, done);
proc.on("exit", done);

return proc;
};

/**
* Terminate all open processes
* Wrap callback with setup termination behavior.
*
* @returns {void}
* - Create and invoke setup.
* - Early termination on setup error.
* - Ensure callback only called once.
*
* @param {Object} shOpts Shell options
* @param {Object} opts Runner options
* @param {Object} tracker Process tracker
* @param {Function} callback Callback `(err)`
* @returns {Function} Wrapped callback
*/
Tracker.prototype.kill = function () {
this.procs.forEach(function (proc) {
proc.kill();
var createFinish = function (shOpts, opts, tracker, callback) {
// Wrap callback
var finish = _.once(function (err) {
tracker.kill(function () {
callback(err);
});
});

// Add, invoke, and hook to final callback if setup dies early.
var setup = tracker.add(addSetup(opts.setup, shOpts));
if (setup) {
// If setup exit happens before normal termination, kill everything.
setup.on("exit", function (code) {
finish(new Error("Setup exited with code: " + code));
});
}

return finish;
};

/**
Expand All @@ -157,7 +171,11 @@ module.exports = {
* @returns {Object} Child process object
*/
run: function (cmd, shOpts, opts, callback) {
return retry(cmd, shOpts, opts, callback);
// Add + invoke setup (if any), bind tracker cleanup, and wrap callback.
var tracker = new Tracker();
var finish = createFinish(shOpts, opts, tracker, callback);

return retry(cmd, shOpts, opts, finish);
},

/**
Expand All @@ -170,21 +188,20 @@ module.exports = {
* @returns {void}
*/
concurrent: function (cmds, shOpts, opts, callback) {
// Add + invoke setup (if any), bind tracker cleanup, and wrap callback.
var tracker = new Tracker();
var queue = opts.queue;
var finish = createFinish(shOpts, opts, tracker, callback);

// Get mapper (queue vs. non-queue)
var queue = opts.queue;
var map = queue ?
async.mapLimit.bind(async, cmds, queue) :
async.map.bind(async, cmds);

log.info("concurrent", "Starting with queue size: " + chalk.magenta(queue || "unlimited"));
map(function (cmd, cb) {
retry(cmd, shOpts, _.extend({ tracker: tracker }, opts), cb);
}, function (err) {
tracker.kill();
callback(err);
});
}, finish);
},

/**
Expand All @@ -197,11 +214,13 @@ module.exports = {
* @returns {void}
*/
envs: function (cmd, shOpts, opts, callback) {
// Add + invoke setup (if any), bind tracker cleanup, and wrap callback.
var tracker = new Tracker();
var queue = opts.queue;
var taskEnvs = opts._envs;
var finish = createFinish(shOpts, opts, tracker, callback);

// Get mapper (queue vs. non-queue)
var queue = opts.queue;
var taskEnvs = opts._envs;
var map = queue ?
async.mapLimit.bind(async, taskEnvs, queue) :
async.map.bind(async, taskEnvs);
Expand All @@ -215,9 +234,6 @@ module.exports = {
log.info("envs", "Starting environment " + chalk.magenta(JSON.stringify(taskEnv)) +
" run for command: " + chalk.gray(cmd));
retry(cmd, taskShOpts, taskOpts, cb);
}, function (err) {
tracker.kill();
callback(err);
});
}, finish);
}
};
52 changes: 52 additions & 0 deletions lib/utils/tracker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"use strict";

var async = require("async");
var treeKill = require("tree-kill");

/**
* Multi-process tracker.
*
* @returns {void}
*/
var Tracker = module.exports = function Tracker() {
this.procs = [];
};

/**
* Add process and track close.
*
* @param {Object} proc Child process object
* @returns {Object} Child process object
*/
Tracker.prototype.add = function (proc) {
if (!proc) { return proc; }

var self = this;

// Track.
self.procs.push(proc);

// Remove from tracked list when closed.
proc.on("close", function () {
self.procs = self.procs.filter(function (obj) {
return obj.pid !== proc.pid;
});
});

return proc;
};

/**
* Terminate all open processes
*
* @param {Function} callback Called when kills are issued
* @returns {void}
*/
Tracker.prototype.kill = function (callback) {
if (this.procs.length === 0) { return callback(); }

async.map(this.procs, function (proc, cb) {
// Ignore errors: We want to kill as many procs as we can.
treeKill(proc.pid, "SIGTERM", function () { cb(); });
}, callback);
};
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
"chalk": "^1.1.1",
"js-yaml": "^3.4.3",
"lodash": "^3.10.1",
"nopt": "^3.0.6"
"nopt": "^3.0.6",
"tree-kill": "^1.0.0"
},
"devDependencies": {
"chai": "^3.4.1",
Expand Down
Loading

0 comments on commit c250c0f

Please sign in to comment.