Skip to content

Commit

Permalink
fix(pool): added optional setting for killing processes and not retai…
Browse files Browse the repository at this point in the history
…ning them
  • Loading branch information
davis.jaunzems committed Jan 14, 2019
1 parent 632ad7c commit ac446e4
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 3 deletions.
1 change: 1 addition & 0 deletions REFERENCE.md
Expand Up @@ -96,6 +96,7 @@ interface AdvancedSettings {
retryProcessDelay: number = 5000; // delay before processing next job in case of internal error.
backoffStrategies: {}; // A set of custom backoff strategies keyed by name.
drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs).
keepProcesses: boolean = true; // Flag if true then child processes are kept for each processor, false kills child processes after finish
}
```
Expand Down
15 changes: 13 additions & 2 deletions lib/process/child-pool.js
Expand Up @@ -10,10 +10,17 @@ var ChildPool = function ChildPool() {
return new ChildPool();
}

// Flag for keeping or killing processes after finished processing, default keep always
this.keepProcesses = true;

this.retained = {};
this.free = {};
};

ChildPool.prototype.setKeepProcesses = function(keepProcesses) {
this.keepProcesses = keepProcesses;
};

ChildPool.prototype.retain = function(processFile) {
var _this = this;
var child = _this.getFree(processFile).pop();
Expand Down Expand Up @@ -42,8 +49,12 @@ ChildPool.prototype.retain = function(processFile) {
};

ChildPool.prototype.release = function(child) {
delete this.retained[child.pid];
this.getFree(child.processFile).push(child);
if (this.keepProcesses === true) {
delete this.retained[child.pid];
this.getFree(child.processFile).push(child);
} else {
this.kill(child);
}
};

ChildPool.prototype.remove = function(child) {
Expand Down
15 changes: 14 additions & 1 deletion lib/queue.js
Expand Up @@ -229,7 +229,8 @@ var Queue = function Queue(name, url, opts) {
guardInterval: 5000,
retryProcessDelay: 5000,
drainDelay: 5,
backoffStrategies: {}
backoffStrategies: {},
keepProcesses: true
});

this.settings.lockRenewTime =
Expand All @@ -250,6 +251,13 @@ var Queue = function Queue(name, url, opts) {
this.processJob = this.processJob.bind(this);
this.getJobFromId = Job.fromId.bind(null, this);

// Check settings to see if processes will be kept after finishing processing, default set to true
this.keepProcesses =
this.settings.keepProcesses !== undefined &&
this.settings.keepProcesses === false
? false
: true;

var keys = {};
_.each(
[
Expand Down Expand Up @@ -672,6 +680,10 @@ Queue.prototype.start = function(concurrency) {
});
};

Queue.prototype.setKeepProcesses = function(keepProcesses) {
this.keepProcesses = keepProcesses;
};

Queue.prototype.setHandler = function(name, handler) {
if (!handler) {
throw new Error('Cannot set an undefined handler');
Expand All @@ -691,6 +703,7 @@ Queue.prototype.setHandler = function(name, handler) {
}

this.childPool = this.childPool || require('./process/child-pool')();
this.childPool.setKeepProcesses(this.keepProcesses);

var sandbox = require('./process/sandbox');
this.handlers[name] = sandbox(handler, this.childPool).bind(this);
Expand Down
16 changes: 16 additions & 0 deletions test/test_child-pool.js
Expand Up @@ -142,4 +142,20 @@ describe('Child pool', function() {
expect(children).to.include(child);
});
});

it('should kill child after processing is finished and not retain it', function() {
var processor = __dirname + '/fixtures/fixture_processor_bar.js';
var child;

pool.setKeepProcesses(false);

return pool.retain(processor).then(function(_child) {
expect(_child).to.be.ok;
child = _child;
pool.release(child);

expect(pool.retained).to.be.empty;
expect(pool.free[processor]).to.be.empty;
});
});
});

0 comments on commit ac446e4

Please sign in to comment.