/
poolr.js
82 lines (71 loc) · 2.26 KB
/
poolr.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
'use strict';
var util = require('util'),
events = require('events');
exports.version = '0.0.9';
var Poolr = function (poolSize, context) {
events.EventEmitter.call(this);
this.ctx = context;
this.poolSize = poolSize || 1;
this.queue = [];
this.runningJobs = 0;
this.throttled = false;
};
exports.createPool = module.exports.createPool = function (poolSize, context) {
return new Poolr(poolSize, context);
};
util.inherits(Poolr, events.EventEmitter);
Poolr.prototype._addTask = function (job, callback) {
// job must be a function expecting a callback with two parameters.
// callback is the callback to be used for that.
this.queue.push({'job' : job, 'callback' : callback});
return this.runNext();
};
Poolr.prototype.addTask = function () {
var args = Array.prototype.slice.call(arguments),
func = args.shift(),
originalCallback = (typeof(args[args.length - 1]) === 'function') ?
args.pop() : function () {};
return this._addTask(
function (callback) {
args.push(callback);
return func.apply(this.ctx, args);
}.bind(this),
originalCallback
);
};
Poolr.prototype.runNext = function () {
if (this.queue.length === 0) {
this.emit('last');
return;
}
if (this.runningJobs >= this.poolSize) {
// console.log('Resource pool full: ' + this.runningJobs);
if (! this.throttled) {
// only emit on first queued task:
this.emit('throttle');
this.throttled = true;
}
return false;
}
var job = this.queue.shift();
this.runningJobs ++;
if (this.throttled && this.runningJobs < this.poolSize) {
// in case anybody is interested: we have free slots again..
this.throttled = false;
this.emit('drain');
}
job.job(function () {
var args = Array.prototype.slice.call(arguments),
result;
this.runningJobs --;
process.nextTick(function () {
this.runNext();
}.bind(this));
result = job.callback.apply(null, args);
if (this.runningJobs < 1 && this.queue.length === 0) {
this.emit('idle');
}
return result;
}.bind(this));
return true;
};