diff --git a/REFERENCE.md b/REFERENCE.md index a033dc459..e57adac50 100644 --- a/REFERENCE.md +++ b/REFERENCE.md @@ -96,6 +96,7 @@ interface AdvancedSettings { retryProcessDelay: number = 5000; // delay before processing next job in case of internal error. backoffStrategies: {}; // A set of custom backoff strategies keyed by name. drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs). + keepProcesses: boolean = true; // Flag if true then child processes are kept for each processor, false kills child processes after finish } ``` diff --git a/lib/process/child-pool.js b/lib/process/child-pool.js index 122a03f0b..5b21c9e97 100644 --- a/lib/process/child-pool.js +++ b/lib/process/child-pool.js @@ -10,10 +10,17 @@ var ChildPool = function ChildPool() { return new ChildPool(); } + // Flag for keeping or killing processes after finished processing, default keep always + this.keepProcesses = true; + this.retained = {}; this.free = {}; }; +ChildPool.prototype.setKeepProcesses = function(keepProcesses) { + this.keepProcesses = keepProcesses; +}; + ChildPool.prototype.retain = function(processFile) { var _this = this; var child = _this.getFree(processFile).pop(); @@ -42,8 +49,12 @@ ChildPool.prototype.retain = function(processFile) { }; ChildPool.prototype.release = function(child) { - delete this.retained[child.pid]; - this.getFree(child.processFile).push(child); + if (this.keepProcesses === true) { + delete this.retained[child.pid]; + this.getFree(child.processFile).push(child); + } else { + this.kill(child); + } }; ChildPool.prototype.remove = function(child) { diff --git a/lib/queue.js b/lib/queue.js index dad3772a8..a03223d50 100755 --- a/lib/queue.js +++ b/lib/queue.js @@ -229,7 +229,8 @@ var Queue = function Queue(name, url, opts) { guardInterval: 5000, retryProcessDelay: 5000, drainDelay: 5, - backoffStrategies: {} + backoffStrategies: {}, + keepProcesses: true }); this.settings.lockRenewTime = @@ -250,6 +251,13 @@ var Queue = function Queue(name, url, opts) { this.processJob = this.processJob.bind(this); this.getJobFromId = Job.fromId.bind(null, this); + // Check settings to see if processes will be kept after finishing processing, default set to true + this.keepProcesses = + this.settings.keepProcesses !== undefined && + this.settings.keepProcesses === false + ? false + : true; + var keys = {}; _.each( [ @@ -672,6 +680,10 @@ Queue.prototype.start = function(concurrency) { }); }; +Queue.prototype.setKeepProcesses = function(keepProcesses) { + this.keepProcesses = keepProcesses; +}; + Queue.prototype.setHandler = function(name, handler) { if (!handler) { throw new Error('Cannot set an undefined handler'); @@ -691,6 +703,7 @@ Queue.prototype.setHandler = function(name, handler) { } this.childPool = this.childPool || require('./process/child-pool')(); + this.childPool.setKeepProcesses(this.keepProcesses); var sandbox = require('./process/sandbox'); this.handlers[name] = sandbox(handler, this.childPool).bind(this); diff --git a/test/test_child-pool.js b/test/test_child-pool.js index 5f54e8681..e78a1302c 100644 --- a/test/test_child-pool.js +++ b/test/test_child-pool.js @@ -142,4 +142,20 @@ describe('Child pool', function() { expect(children).to.include(child); }); }); + + it('should kill child after processing is finished and not retain it', function() { + var processor = __dirname + '/fixtures/fixture_processor_bar.js'; + var child; + + pool.setKeepProcesses(false); + + return pool.retain(processor).then(function(_child) { + expect(_child).to.be.ok; + child = _child; + pool.release(child); + + expect(pool.retained).to.be.empty; + expect(pool.free[processor]).to.be.empty; + }); + }); });