Skip to content

Commit

Permalink
Merge b2e5230 into 5f3af98
Browse files Browse the repository at this point in the history
  • Loading branch information
elliotttf committed Apr 30, 2019
2 parents 5f3af98 + b2e5230 commit 8c15612
Show file tree
Hide file tree
Showing 6 changed files with 6,956 additions and 40 deletions.
48 changes: 34 additions & 14 deletions .travis.yml
Expand Up @@ -5,20 +5,40 @@ cache:
- node_modules
notifications:
email: false
node_js:
- '6'
- 'node'
before_install:
- npm i -g npm@^2.0.0
before_script:
- npm prune
script: npm run lint && npm t
after_success:
- npm run coveralls
- 'curl -Lo travis_after_all.py https://git.io/travis_after_all'
- python travis_after_all.py
- export $(cat .to_export_back) &> /dev/null
- npm run semantic-release
branches:
except:
- /^v\d+\.\d+\.\d+$/

stages:
- name: test
# require the event type to not be a semantic release tag.
if: NOT tag =~ ^v\d+\.\d+\.\d+$
- name: release
# require the branch name to be one of the environment branches.
if: branch = master AND fork = false AND type IN (push, api)

jobs:
include:
-
stage: test
node_js: lts/boron
install: npm i
script:
- yarn lint
- yarn test
after_success:
- yarn coveralls
-
stage: test
node_js: lts/carbon
install: npm i
script:
- yarn lint
- yarn test
after_success:
- yarn coveralls
-
stage: release
node_js: lts/carbon
sciprt:
- yarn semantic-release
9 changes: 9 additions & 0 deletions README.md
Expand Up @@ -55,6 +55,15 @@ else {

Returns a promise that resolves when the server is listening.

* `Adios.master.kill(pid)` - Method to kill a worker by process id
* `pid` - The process id to kill

* `Adios.master.term(pid)` - Method to terminate a worker by process id, this
will call the graceful shutdown defined by the worker.
* `pid` - The process id to terminate

Returns a promise that resolves when the worker is terminated.

* `Adios.child.init(cleanCb[, path])` - The initialize function for adios
children. Sets up a connection to the master. Note: there can be only one per
process and it must be running on a child process.
Expand Down
114 changes: 91 additions & 23 deletions lib/master.js
Expand Up @@ -8,6 +8,52 @@ let workerSockets = {};

const DEFAULT_PATH = require('../package.json').config.defaultPath;

/**
* Kill workers.
*
* @param {cluster.Worker} worker
* The worker to kill.
*
* @return {undefined}
*/
function killWorker(worker) {
if (workerSockets[worker.process.pid]) {
workerSockets[worker.process.pid].end();
}
worker.kill();
}

/**
* Terminate workers gracefully and fall back to killing.
*
* @param {int} killTimeout
* The time in milliseconds before killing a child process.
* @param {cluster.Worker} worker
* The worker to terminate.
*
* @return {Promise}
* Resolves when the child process has been terminated or killed.
*/
function termWorker(killTimeout, worker) {
return new Promise((resolve) => {
const timeout = setTimeout(() => {
killWorker(worker);
resolve();
}, killTimeout);
worker.on('disconnect', () => {
if (workerSockets[worker.process.pid]) {
workerSockets[worker.process.pid].end();
}
clearTimeout(timeout);
resolve();
});
if (workerSockets[worker.process.pid]) {
workerSockets[worker.process.pid].write('SIGINT');
}
worker.disconnect();
});
}

/**
* Helper method to execute a callback on each worker.
*
Expand All @@ -34,26 +80,7 @@ function eachWorker(cb) {
* @return {undefined}
*/
function sigint(killTimeout) {
const shutdowns = eachWorker(worker => new Promise((resolve) => {
const timeout = setTimeout(() => {
if (workerSockets[worker.process.pid]) {
workerSockets[worker.process.pid].end();
}
worker.kill();
resolve();
}, killTimeout);
worker.on('disconnect', () => {
if (workerSockets[worker.process.pid]) {
workerSockets[worker.process.pid].end();
}
clearTimeout(timeout);
resolve();
});
if (workerSockets[worker.process.pid]) {
workerSockets[worker.process.pid].write('SIGINT');
}
worker.disconnect();
}));
const shutdowns = eachWorker(termWorker.bind(null, killTimeout));

Promise.all(shutdowns)
.then(() => server.close(() => {
Expand All @@ -72,7 +99,7 @@ function sigint(killTimeout) {
* @return {undefined}
*/
function sigterm() {
eachWorker(worker => worker.kill());
eachWorker(killWorker);
server.close(() => {
server = null;
process.exit(0);
Expand Down Expand Up @@ -102,7 +129,7 @@ module.exports = {
throw new Error('Adios can only be initialized once per process');
}

const timeout = (config && config.timeout) || 10000;
this.timeout = (config && config.timeout) || 10000;

return new Promise((resolve) => {
server = net.createServer((c) => {
Expand All @@ -123,7 +150,7 @@ module.exports = {
});
server.listen(path, resolve);

process.on('SIGINT', () => sigint(timeout));
process.on('SIGINT', () => sigint(this.timeout));
process.on('SIGTERM', sigterm);
});
},
Expand Down Expand Up @@ -151,5 +178,46 @@ module.exports = {

return Promise.resolve();
},

/**
* Method to kill a worker by process id.
*
* @param {int} pid
* The process id to kill.
*
* @return {undefined}
*/
kill(pid) {
const worker = cluster.workers[
Object.keys(cluster.workers)
.find(k => cluster.workers[k].process.pid === parseInt(pid, 10))
];
if (!worker) {
throw new Error(`No worker found with pid: ${pid}`);
}

killWorker(worker);
},

/**
* Method to terminate a worker by process id, this will call the graceful
* shutdown defined by the worker.
*
* @param {int} pid
* The process id to terminate.
*
* @return {Promise<undefined>}
* Resolved when the worker is terminated.
*/
term(pid) {
const worker = cluster.workers[
Object.keys(cluster.workers)
.find(k => cluster.workers[k].process.pid === parseInt(pid, 10))
];
if (!worker) {
throw new Error(`No worker found with pid: ${pid}`);
}
return termWorker(this.timeout, worker);
},
};

6 changes: 3 additions & 3 deletions package.json
Expand Up @@ -7,7 +7,7 @@
"lint": "eslint .",
"coverage": "istanbul check-coverage --statements 100 --lines 100 --branches 81 --functions 100",
"coveralls": "cat ./coverage/lcov.info | coveralls",
"semantic-release": "semantic-release pre && npm publish && semantic-release post"
"semantic-release": "semantic-release"
},
"repository": {
"type": "git",
Expand All @@ -30,8 +30,8 @@
"ghooks": "^2.0.0",
"istanbul": "^0.4.2",
"nodeunit": "^0.11.1",
"sinon": "^3.1.0",
"semantic-release": "^6.3.6"
"semantic-release": "^15.13.3",
"sinon": "^3.1.0"
},
"config": {
"defaultPath": "/var/run/adios.sock",
Expand Down
44 changes: 44 additions & 0 deletions test/master.js
Expand Up @@ -193,4 +193,48 @@ module.exports = {
process.kill(process.pid, 'SIGTERM');
});
},
kill(test) {
test.expect(2);

test.throws(() => Adios.master.kill(999999));

class DummyWorker extends EventEmitter {
constructor() {
super();
this.process = { pid: 1 };
EventEmitter.call(this);
}

kill() {
test.ok(true, 'Kill called');
}
}
cluster.workers.foo = new DummyWorker();
Adios.master.kill(1);
test.done();
},
term(test) {
test.expect(2);

test.throws(() => Adios.master.term(999999));

class DummyWorker extends EventEmitter {
constructor() {
super();
this.process = { pid: 1 };
EventEmitter.call(this);
}

kill() {
}

disconnect() {
test.ok(true, 'Kill called');
this.emit('disconnect');
}
}
cluster.workers.foo = new DummyWorker();
Adios.master.term(1)
.then(() => test.done());
},
};

0 comments on commit 8c15612

Please sign in to comment.