Skip to content

Commit

Permalink
Breaking Changes:
Browse files Browse the repository at this point in the history
 * ConductorWorker – master messaging callbacks are now of signature (err, data) instead of (data)
   * fireRequestCallback(name, err, data)
   * sendRequestToMaster(name, data, (err, data) => {...})
   * getNextJob((err, job) => {...})
   * lookup(name, key, (err, val) => {...})
   * setLookup(name, key, value, (err) => {...})

Other Changes:
 * Fixed: Conductor.start never fired the callback unless there was an error
 * Callback functions have been wrapped with promisify so they can be awaited
 * Updated docs
 * Removed yarn
 * Updated travis and dependencies
  • Loading branch information
kfitzgerald committed Mar 25, 2019
1 parent 35f85c4 commit 5f0dade
Show file tree
Hide file tree
Showing 17 changed files with 158 additions and 1,929 deletions.
22 changes: 22 additions & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,30 @@
"es6": true
},
"parserOptions": {
"ecmaVersion": 8,
"sourceType": "module"
},
"plugins": [
],
"extends": "eslint:recommended",
"globals": {
"require": true,
"module": true,
"describe": true,
"it": true,
"before": true,
"beforeEach": true,
"after": true,
"afterEach": true,
"Promise": true
},
"overrides": [
{
"files": ["docs/**"],
"rules": {
"no-console": "off",
"no-unused-vars": "off"
}
}
]
}
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ coverage
node_modules
.idea
*.tgz
.nyc_output
.nyc_output
package-lock.json
5 changes: 4 additions & 1 deletion .npmignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ test
*.tgz
coverage
.nyc_output
.idea
.idea
docs
.eslint*
.travis.yml
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
language: node_js
node_js:
- lts/*
- '6'
- '10'
- '8'
script:
- npm run report
after_script:
Expand Down
43 changes: 29 additions & 14 deletions Conductor.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
"use strict";

const Async = require('async');
const Os = require('os');
const OS = require('os');
const Cluster = require('cluster');
const EventEmitter = require('events').EventEmitter;
const ShortId = require('shortid');
const Util = require('util');

/**
* Distributes job processing across multiple processes
Expand All @@ -20,7 +21,7 @@ class Conductor extends EventEmitter {
options = options || {};

this.workerType = options.workerType || 'conductor_worker';
this.workerLimit = options.workerLimit || Math.min(4, Os.cpus().length);
this.workerLimit = options.workerLimit || Math.min(4, OS.cpus().length);
this.jobsQueue = options.jobsQueue || [];
this.logging = options.logging !== undefined ? options.logging : true;
this.processing = false;
Expand All @@ -32,6 +33,10 @@ class Conductor extends EventEmitter {
this.workerStatsReportCount = 0;
this.id = ShortId.generate();
this._workerIds = [];

// awaitable methods
this.start = Util.promisify(this.start.bind(this));
this.generateJobs = Util.promisify(this.generateJobs.bind(this));
}

//region Override These Methods
Expand Down Expand Up @@ -104,7 +109,6 @@ class Conductor extends EventEmitter {
* Broadcasts a message to all workers.
* @param commandName
* @param data
* @param callback
*/
broadcastMessageToWorkers(commandName, data) {
this._workerIds.forEach((id) => {
Expand Down Expand Up @@ -172,7 +176,7 @@ class Conductor extends EventEmitter {
(next) => this.generateJobs(next),

// Start workers
(next) => this._startWorkers()
(next) => { this._startWorkers(); next(); }

], callback);
} else {
Expand All @@ -187,7 +191,7 @@ class Conductor extends EventEmitter {

/**
* Gets the next job to process
* @return {T}
* @return {*}
*/
_getNextJob() {
return this.jobsQueue.shift();
Expand All @@ -197,14 +201,14 @@ class Conductor extends EventEmitter {
* Logs to the output if logging is enabled
*/
log() {
if (this.logging) console.log.apply(console, [].slice.call(arguments));
if (this.logging) console.log.apply(console, [].slice.call(arguments)); // eslint-disable-line no-console
}

/**
* Logs to the output if logging is enabled
*/
error() {
if (this.logging) console.error.apply(console, [].slice.call(arguments));
if (this.logging) console.error.apply(console, [].slice.call(arguments)); // eslint-disable-line no-console
}

/**
Expand All @@ -223,19 +227,30 @@ class Conductor extends EventEmitter {
// Message handler
Cluster.workers[id].on('message', (msg) => {
if (msg && msg.cmd) {

let job;
let lookupName;
let lookupKey;

let setLookupName;
let setLookupKey;
let setLookupValue;

let stats;

switch (msg.cmd) {
// Get task
case "getJob":
const job = this._getNextJob();
job = this._getNextJob();
this.log(`> Assigned worker ${id} job:`, job);
this.sendMessageToWorker(id, "getJob", job, msg.callback);
break;

// Check master for known value
case "lookup":

const lookupName = msg.data.name;
const lookupKey = msg.data.key;
lookupName = msg.data.name;
lookupKey = msg.data.key;

this.sendMessageToWorker(
id,
Expand All @@ -250,16 +265,16 @@ class Conductor extends EventEmitter {

// Set master known value
case "setLookup":
const setLookupName = msg.data.name;
const setLookupKey = msg.data.key;
const setLookupValue = msg.data.value;
setLookupName = msg.data.name;
setLookupKey = msg.data.key;
setLookupValue = msg.data.value;

this.setLookupValue(setLookupName, setLookupKey, setLookupValue);
this.sendMessageToWorker(id, "setLookup", null, msg.callback);
break;

case "stats":
const stats = msg.data;
stats = msg.data;
this._updateStatsForWorker(id, stats);
this.sendMessageToWorker(id, "stats", null, msg.callback);
break;
Expand Down
28 changes: 19 additions & 9 deletions ConductorWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const Async = require('async');
const Cluster = require('cluster');
const EventEmitter = require('events').EventEmitter;
const ShortId = require('shortid');
const Util = require('util');

/**
* Conductor worker base class. Should be extended to be useful!
Expand Down Expand Up @@ -32,6 +33,14 @@ class ConductorWorker extends EventEmitter {
// Buckets
this._requestCallbacks = {};
this.logging = options.logging !== undefined ? options.logging : true;

// awaitable methods
this.start = Util.promisify(this.start.bind(this));
this.bindRouter = Util.promisify(this.bindRouter.bind(this));
this.setLookup = Util.promisify(this.setLookup.bind(this));
this.lookup = Util.promisify(this.lookup.bind(this));
this.getNextJob = Util.promisify(this.getNextJob.bind(this));
this.sendRequestToMaster = Util.promisify(this.sendRequestToMaster.bind(this));
}

//region Override These Methods
Expand Down Expand Up @@ -62,7 +71,7 @@ class ConductorWorker extends EventEmitter {

case "override-me":
this.log('got dummy message back');
this.fireRequestCallback(msg.callback, msg);
this.fireRequestCallback(msg.callback, null, msg);
return;
}
}
Expand All @@ -71,7 +80,7 @@ class ConductorWorker extends EventEmitter {
this.error('Unknown message from master! workerid:', this.workerId, msg);

/* istanbul ignore else: would cause a hang with no callback */
if (msg.callback) this.fireRequestCallback(msg.callback, msg);
if (msg.callback) this.fireRequestCallback(msg.callback, null, msg);
}

//endregion
Expand Down Expand Up @@ -115,7 +124,7 @@ class ConductorWorker extends EventEmitter {
* @param err
*/
crash(err) {
console.error('> !! Got error in processing job, workerId: ', this.workerId, err);
this.error('> !! Got error in processing job, workerId: ', this.workerId, err);
this.stopMonitoring();
this.rollStats();
process.exit(4);
Expand All @@ -140,14 +149,14 @@ class ConductorWorker extends EventEmitter {
* Logs to the output if logging is enabled
*/
log() {
if (this.logging) console.log.apply(console, [].slice.call(arguments));
if (this.logging) console.log.apply(console, [].slice.call(arguments)); // eslint-disable-line no-console
}

/**
* Logs to the output if logging is enabled
*/
error() {
if (this.logging) console.error.apply(console, [].slice.call(arguments));
if (this.logging) console.error.apply(console, [].slice.call(arguments)); // eslint-disable-line no-console
}

/**
Expand Down Expand Up @@ -218,7 +227,7 @@ class ConductorWorker extends EventEmitter {
// Process the job
this.processJob(job);

this.fireRequestCallback(msg.callback, job);
this.fireRequestCallback(msg.callback, null, job);
}

/**
Expand All @@ -230,7 +239,7 @@ class ConductorWorker extends EventEmitter {
const keyMapKey = msg.data.key;
const keyMapValue = msg.data.value;
const callbackName = msg.callback;
this.fireRequestCallback(callbackName, { name: keyMapName, key: keyMapKey, value: keyMapValue });
this.fireRequestCallback(callbackName, null, { name: keyMapName, key: keyMapKey, value: keyMapValue });
}

/**
Expand All @@ -245,11 +254,12 @@ class ConductorWorker extends EventEmitter {
/**
* Fires a callback, if it's present
* @param callbackName
* @param err
* @param data
*/
fireRequestCallback(callbackName, data) {
fireRequestCallback(callbackName, err, data) {
if (typeof this._requestCallbacks[callbackName] === "function") {
this._requestCallbacks[callbackName](data);
this._requestCallbacks[callbackName](err, data);
delete this._requestCallbacks[callbackName];
} else if (this._requestCallbacks[callbackName] === null) {
// No callback, ignore
Expand Down

0 comments on commit 5f0dade

Please sign in to comment.