Skip to content

Commit

Permalink
Merge pull request #1042 from OptimalBits/throw-error-if-invalid-proc…
Browse files Browse the repository at this point in the history
…essor-file

throw error if missing processor file
  • Loading branch information
manast committed Sep 2, 2018
2 parents 8ca2728 + 4f27b30 commit af1dd4c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 48 deletions.
52 changes: 17 additions & 35 deletions lib/process/child-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,29 @@ var ChildPool = function ChildPool() {

ChildPool.prototype.retain = function(processFile) {
var _this = this;
return checkProcessorFile(processFile).then(function() {
var child = _this.getFree(processFile).pop();
var child = _this.getFree(processFile).pop();

if (child) {
_this.retained[child.pid] = child;
return child;
}
if (child) {
_this.retained[child.pid] = child;
return Promise.resolve(child);
}

// if node process is running with --inspect, don't include that option
// when spawning the children
var execArgv = _.filter(process.execArgv, function(arg) {
return arg.indexOf('--inspect') === -1;
});
// if node process is running with --inspect, don't include that option
// when spawning the children
var execArgv = _.filter(process.execArgv, function(arg) {
return arg.indexOf('--inspect') === -1;
});

child = fork(path.join(__dirname, './master.js'), {
execArgv: execArgv
});
child.processFile = processFile;
child = fork(path.join(__dirname, './master.js'), {
execArgv: execArgv
});
child.processFile = processFile;

_this.retained[child.pid] = child;
_this.retained[child.pid] = child;

child.on('exit', _this.remove.bind(_this, child));
child.on('exit', _this.remove.bind(_this, child));

return initChild(child, processFile).return(child);
});
return initChild(child, processFile).return(child);
};

ChildPool.prototype.release = function(child) {
Expand Down Expand Up @@ -91,20 +89,4 @@ var initChild = function(child, processFile) {
});
};

var checkProcessorFile = function(processorFile) {
return new Promise(function(resolve, reject) {
fs.exists(processorFile, function(stats, err) {
if (err) {
reject(err);
} else {
if (stats) {
resolve();
} else {
reject(new Error('File does not exists'));
}
}
});
});
};

module.exports = ChildPool;
12 changes: 11 additions & 1 deletion lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ var EventEmitter = require('events');

var _ = require('lodash');

var fs = require('fs');
var path = require('path');
var util = require('util');
var url = require('url');
var Job = require('./job');
Expand Down Expand Up @@ -37,7 +39,7 @@ var commands = require('./commands/');
job -> wait -> active
\ ^ \
v | -- > failed
delayed
delayed
*/

/**
Expand Down Expand Up @@ -660,7 +662,15 @@ Queue.prototype.setHandler = function(name, handler) {
this.setWorkerName();

if (typeof handler === 'string') {
var processorFile =
handler + (path.extname(handler) === '.js' ? '' : '.js');

if (!fs.existsSync(processorFile)) {
throw new Error('File ' + processorFile + ' does not exist');
}

this.childPool = this.childPool || require('./process/child-pool')();

var sandbox = require('./process/sandbox');
this.handlers[name] = sandbox(handler, this.childPool).bind(this);
} else {
Expand Down
11 changes: 0 additions & 11 deletions test/test_child-pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,6 @@ describe('Child pool', function() {
pool.clean();
});

it('should raise an expection if invalid processor', function() {
return pool.retain('foobar').then(
function() {
throw new Error('Should raise an exception');
},
function(err) {
expect(err).to.be.instanceOf(Error);
}
);
});

it('should return same child if free', function() {
var processor = __dirname + '/fixtures/fixture_processor_bar.js';
var child;
Expand Down
11 changes: 10 additions & 1 deletion test/test_sandboxed_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,16 @@ describe('sandboxed process', function() {
queue.add({ foo: 'bar' });
});

it('should process and fail', function(done) {
it('should error if processor file is missing', function(done) {
try {
queue.process(__dirname + '/fixtures/missing_processor.js');
done(new Error('did not throw error'));
} catch (err) {
done();
}
});

it('should process and fail using callback', function(done) {
queue.process(__dirname + '/fixtures/fixture_processor_callback_fail.js');

queue.on('failed', function(job, err) {
Expand Down

0 comments on commit af1dd4c

Please sign in to comment.