Skip to content

Commit

Permalink
updated: pooling + fixed critical bug
Browse files Browse the repository at this point in the history
  • Loading branch information
petersirka committed Nov 25, 2014
1 parent 8e862ee commit 863dc7a
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 118 deletions.
5 changes: 1 addition & 4 deletions README.md
Expand Up @@ -47,7 +47,7 @@ var Firebird = require('node-firebird');
- `Firebird.attach(options, function(err, db))` attach a database
- `Firebird.create(options, function(err, db))` create a database
- `Firebird.attachOrCreate(options, function(err, db))` attach or create database
- `Firebird.pool(maxSockets, options, function(err, db)) -> return {Object}` create a connection pooling
- `Firebird.pool(max, options, function(err, db)) -> return {Object}` create a connection pooling

## Connection types

Expand Down Expand Up @@ -99,9 +99,6 @@ pool.get(function(err, db) {
});
});

// close all opened connections
pool.detach();

// Destroy pool
pool.destroy();
```
Expand Down
142 changes: 35 additions & 107 deletions lib/index.js
Expand Up @@ -7,7 +7,7 @@ var
BlrReader = serialize.BlrReader,
XdrWriter = serialize.XdrWriter,
BlrWriter = serialize.BlrWriter,
messages = require('./messages.js');
messages = require('./messages.js')

if (typeof(setImmediate) === 'undefined') {
global.setImmediate = function(cb) {
Expand Down Expand Up @@ -986,6 +986,11 @@ function doCallback(obj, callback) {
if (!callback)
return;

if (obj instanceof Error) {
callback(obj);
return;
}

if (isError(obj)) {
callback(new Error(obj.message));
return;
Expand Down Expand Up @@ -1395,73 +1400,26 @@ exports.attachOrCreate = function(options, callback) {

// Pooling
exports.pool = function(max, options, callback) {

var pool = new Pool();

var pool = new Pool(max, options);
options.isPool = true;

function create(max) {
exports.attach(options, function(err, db) {

if (err)
throw err;

max--;
db.pool = max;
pool.db.push(db);
if (max <= 0) {
pool.isReady = true;
pool.check();
if (callback)
callback(null, pool);
return;
}

create(max);
});
};

create(max);

return pool;
};

function poolEvents(db, pool) {
db.removeAllListeners('detach');
db.on('detach', function(is) {

if (!is)
return;

db.connection._queue = [];
db.connection._pending = [];
db.connection._isUsed = false;

setImmediate(function() {
pool.check();
});
});
}

/***************************************
*
* Simple Pooling
*
***************************************/

function Pool() {
this.db = [];
function Pool(max, options) {
this.db = 0;
this.max = max || 4;
this.pending = [];
this.isReady = false;
this.isDestroy = false;
this.options = options;
}

Pool.prototype.get = function(callback) {

var self = this;
if (self.isDestroy)
return self;

self.pending.push(callback);
self.check();
return self;
Expand All @@ -1470,55 +1428,30 @@ Pool.prototype.get = function(callback) {
Pool.prototype.check = function() {

var self = this;
if (self.db >= self.max)
return self;

for (var i = 0, length = self.db.length; i < length; i++) {

var db = self.db[i];
if (db.connection._isUsed)
continue;

var cb = self.pending.shift();
if (!cb)
continue;

poolEvents(db, self);
db.connection._isUsed = true;
cb(null, db);

setImmediate(function() {
self.check();
});
break;
}

return self;
};

Pool.prototype.detach = function() {

var self = this;
var count = self.db.length;
var cb = self.pending.shift();
if (!cb)
return self;

var fn = function() {
count--;
if (count > 0 || !self.isDestroy)
return;
self.db = null;
self.pending = null;
};
self.db++;
exports.attach(self.options, function(err, db) {
if (!err) {
db.on('detach', function() { self.db--; self.check(); });
} else
self.db--;
cb(err, db);
});

for (var i = 0; i < self.db.length; i++)
self.db[i].detach(fn, true);
setImmediate(function() {
self.check();
});

return self;
};

Pool.prototype.destroy = function() {
var self = this;
self.detach();
self.isDestroy = true;
return self;
};
Pool.prototype.destroy = function() {};

/***************************************
*
Expand Down Expand Up @@ -1910,21 +1843,13 @@ Connection.prototype.attach = function (options, callback, db) {
this._queueEvent(cb);
};

Connection.prototype.detach = function (callback, force) {
Connection.prototype.detach = function (callback) {

var self = this;

if (self._isClosed)
return;

if (self.options.isPool && !force) {
self._isUsed = false;
self._queue = [];
self._pending = [];
self.db.emit('detach', true);
return;
}

self._isUsed = false;
self._isDetach = true;

Expand Down Expand Up @@ -2464,7 +2389,7 @@ Connection.prototype.executeStatement = function(transaction, statement, params,
break;
default:
//throw new Error('Unexpected parametter: ' + JSON.stringify(params) + ' - ' + JSON.stringify(input));
ret[i] = new SQLVarNull();
ret[i] = value === null || value === undefined ? new SQLVarNull() : new SQLParamString(value.toString());
break;
}
done();
Expand All @@ -2484,8 +2409,11 @@ Connection.prototype.executeStatement = function(transaction, statement, params,
params = [];
}

if (params === undefined || params.length !== input.length)
throw new Error('Expected parameters: ' + input.length);
if (params === undefined || params.length !== input.length) {
self._pending.pop();
callback(new Error('Expected parameters: (params=' + params.length + ' vs. expected=' + input.length + ') - ' + statement.query));
return;
}

PrepareParams(params, input, function(prms) {

Expand Down
24 changes: 17 additions & 7 deletions test/run.js
Expand Up @@ -441,7 +441,7 @@ function test_pooling(next) {
pool.get(function(err, db) {
db.query('SELECT * FROM test WHERE id=1', function(err, results) {
setImmediate(function() {
assert.ok(db.pool === 1 && results.length === 1, 'pool selector 1');
assert.ok(results.length === 1, 'pool selector 1');
db.detach();
});
});
Expand All @@ -457,7 +457,7 @@ function test_pooling(next) {
query.push(function(next) {
pool.get(function(err, db) {
db.query('SELECT * FROM test WHERE id=2', function(err, results) {
assert.ok(db.pool === 0 && results.length === 1, 'pool selector 2');
assert.ok(results.length === 1, 'pool selector 2');
db.detach();
next();
});
Expand All @@ -468,7 +468,7 @@ function test_pooling(next) {
pool.get(function(err, db) {
db.query('SELECT * FROM test WHERE id=1', function(err, results) {
setImmediate(function() {
assert.ok(db.pool === 1 && results.length === 1, 'pool selector 3');
assert.ok(results.length === 1, 'pool selector 3');
db.detach();
});
next();
Expand All @@ -479,7 +479,17 @@ function test_pooling(next) {
query.push(function(next) {
pool.get(function(err, db) {
db.query('SELECT * FROM test WHERE id=2', function(err, results) {
assert.ok(db.pool === 0 && results.length === 1, 'pool selector 4');
assert.ok(results.length === 1, 'pool selector 4');
db.detach();
next();
});
});
});

query.push(function(next) {
pool.get(function(err, db) {
db.query('INSERT INTO test (ID) VALUES(?)', function(err, results) {
assert.ok(err, 'pool exception');
db.detach();
next();
});
Expand All @@ -488,10 +498,10 @@ function test_pooling(next) {

query.push(function(next) {
setTimeout(function() {
pool.destroy();
assert.ok(pool.db === 0, 'pool detach');
console.timeEnd(name);
}, 500);
next();
next();
}, 1000);
});

setTimeout(function() {
Expand Down

0 comments on commit 863dc7a

Please sign in to comment.