Skip to content

Commit

Permalink
merged in changes from tmcw
Browse files Browse the repository at this point in the history
  • Loading branch information
coopernurse committed May 23, 2011
2 parents 0c91eb2 + a3b65a3 commit e1235ef
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 21 deletions.
45 changes: 36 additions & 9 deletions lib/generic-pool.js
Expand Up @@ -117,7 +117,7 @@ exports.Pool = function (factory) {
* @param {Object} obj
* The acquired item to be destoyed.
*/
function destroy(obj) {
me.destroy = function(obj) {
count -= 1;
factory.destroy(obj);
}
Expand All @@ -144,7 +144,7 @@ exports.Pool = function (factory) {
} else {
// The client timed out, call it's destroyer.
log("removeIdle() destroying obj - now:" + now + " timeout:" + timeout);
destroy(availableObjects[i].obj);
me.destroy(availableObjects[i].obj);
}
}

Expand Down Expand Up @@ -173,6 +173,20 @@ exports.Pool = function (factory) {
}
}

/**
* Handle callbacks with either the [obj] or [err, obj] arguments in an
* adaptive manner. Uses the `cb.length` property to determine the number
* of arguments expected by `cb`.
*/
function adjustCallback(cb, err, obj) {
if (!cb) return;
if (cb.length <= 1) {
cb(obj);
} else {
cb(err, obj);
}
};

/**
* Try to get a new client to work, and clean up pool unused (idle) items.
*
Expand All @@ -186,23 +200,36 @@ exports.Pool = function (factory) {
function dispense() {
var obj = null,
objWithTimeout = null,
err = null,
waitingCount = waitingClients.size();
log("dispense() clients=" + waitingCount + " available=" + availableObjects.length);
if (waitingCount > 0) {
if (availableObjects.length > 0) {
log("dispense() - reusing obj");
objWithTimeout = availableObjects.shift();
waitingClients.dequeue()(objWithTimeout.obj);
adjustCallback(waitingClients.dequeue(), err, objWithTimeout.obj);
}
else if (count < factory.max) {
count += 1;
log("dispense() - creating obj - count=" + count);
factory.create(function (obj) {
factory.create(function () {
var cb = waitingClients.dequeue();
if (cb) {
cb(obj);
if (arguments.length > 1) {
err = arguments[0];
obj = arguments[1];
} else {
err = (arguments[0] instanceof Error) ? arguments[0] : null;
obj = (arguments[0] instanceof Error) ? null : arguments[0];
}
if (err) {
count -= 1;
adjustCallback(cb, err, obj);
} else {
me.release(obj);
if (cb) {
adjustCallback(cb, err, obj);
} else {
me.release(obj);
}
}
});
}
Expand Down Expand Up @@ -298,7 +325,7 @@ exports.Pool = function (factory) {
availableObjects = [];
var obj = willDie.shift();
while (obj != null) {
destroy(obj.obj);
me.destroy(obj.obj);
obj = willDie.shift();
}
if (callback) {
Expand All @@ -307,4 +334,4 @@ exports.Pool = function (factory) {
}

return me;
};
};
70 changes: 58 additions & 12 deletions test/generic-pool.test.js
@@ -1,5 +1,5 @@
var assert = require('assert');
var poolModule = require('generic-pool');
var poolModule = require('..');

module.exports = {

Expand All @@ -11,17 +11,17 @@ module.exports = {
var pool = poolModule.Pool({
name : 'test1',
create : function(callback) {
createCount++;
callback(createCount);
callback(null, { count: ++createCount });
},
destroy : function(client) { destroyCount++; },
max : 2,
idleTimeoutMillis : 100
});

for (var i = 0; i < 10; i++) {
pool.acquire(function(obj) {
return function() {
pool.acquire(function(err, obj) {
return function(err, obj) {
assert.equal(typeof obj.count, 'number');
setTimeout(function() {
borrowCount++;
pool.release(obj);
Expand Down Expand Up @@ -53,7 +53,7 @@ module.exports = {
});

for (i = 0; i < 10; i++) {
pool.acquire(function(obj) {
pool.acquire(function(err, obj) {
return function() {
setTimeout(function() {
var t = new Date().getTime();
Expand Down Expand Up @@ -90,17 +90,19 @@ module.exports = {

var pool = poolModule.Pool({
name : 'test3',
create : function(callback) { callback({ id : ++clientCount }); },
create : function(callback) { callback(null, { id : ++clientCount }); },
destroy : function(client) { destroyed.push(client.id); },
max : 2,
idleTimeoutMillis : 100
});

pool.acquire(function(client) {
pool.acquire(function(err, client) {
assert.equal(typeof client.id, 'number');
// should be removed second
setTimeout(function() { pool.release(client); }, 5);
});
pool.acquire(function(client) {
pool.acquire(function(err, client) {
assert.equal(typeof client.id, 'number');
// should be removed first
pool.release(client);
});
Expand All @@ -121,15 +123,16 @@ module.exports = {

var pool = poolModule.Pool({
name : 'test4',
create : function(callback) { callback({id: ++created}); },
create : function(callback) { callback(null, {id: ++created}); },
destroy : function(client) { destroyed += 1; },
max : 2,
idletimeoutMillis : 300000
});

for (var i = 0; i < count; i++) {
pool.acquire(function(client) {
pool.acquire(function(err, client) {
acquired += 1;
assert.equal(typeof client.id, 'number');
setTimeout(function() { pool.release(client); }, 250);
});
}
Expand All @@ -146,6 +149,49 @@ module.exports = {
assert.throws(function() {
pool.acquire(function(client) {});
}, Error);
},

'supports single arg callbacks' : function (beforeExit) {
var pool = poolModule.Pool({
name : 'test5',
create : function(callback) { callback({ id : 1 }); },
destroy : function(client) { destroyed.push(client.id); },
max : 2,
idleTimeoutMillis : 100
});

pool.acquire(function(client) {
assert.equal(client.id, 1);
});
},

'handle creation errors' : function (beforeExit) {
var created = 0;
var pool = poolModule.Pool({
name : 'test6',
create : function(callback) {
if (created < 5) {
callback(new Error('Error occurred.'));
} else {
callback({ id : created });
}
created++;
},
destroy : function(client) { },
max : 1,
idleTimeoutMillis : 1000
});
// ensure that creation errors do not populate the pool.
for (var i = 0; i < 5; i++) {
pool.acquire(function(err, client) {
assert.ok(err instanceof Error);
assert.ok(client === null);
});
}
pool.acquire(function(err, client) {
assert.ok(err === null);
assert.equal(typeof client.id, 'number');
});
}

};
};

0 comments on commit e1235ef

Please sign in to comment.