Permalink
Browse files

Expending the test suite for the connection pool, it now correctly ha…

…ndles invalid

connection and gives errors.

It will also emit 'global' errors when a error occures on the connection.
  • Loading branch information...
1 parent d751d14 commit 6c7a8ce4df750f8081f16a08dcef35b78a0c4474 @3rd-Eden committed Jun 23, 2012
Showing with 170 additions and 6 deletions.
  1. +79 −4 lib/connectionpool.js
  2. +3 −0 test/common.js
  3. +88 −2 test/connectionpool.test.js
View
@@ -12,13 +12,23 @@ var EventEmitter = require('events').EventEmitter;
*/
function Manager(limit, builder) {
- this.limit = limit || 10; // defaults to 10 connections max
+ this.limit = +limit || 20; // defaults to 20 connections max
this.pool = [];
this.generator = null;
+ // some stats that can be used for metrics
+ this.metrics = {
+ allocations: 0
+ , releases: 0
+ };
+
if (builder) this.factory(builder);
+ EventEmitter.call(this);
}
+Manager.prototype = new EventEmitter();
+Manager.prototype.constructor = Manager;
+
/**
* Add a stream generator so we can generate streams for the pool.
*
@@ -35,6 +45,41 @@ Manager.prototype.factory = function factory(builder) {
};
/**
+ * Start listening to events that could influence the state of the connection.
+ *
+ * @param {net.Connection} net
+ * @api private
+ */
+
+Manager.prototype.listen = function listen(net) {
+ if (!net) return;
+
+ var self = this;
+
+ /**
+ * Simple helper function that allows us to automatically remove the
+ * connection from the pool when we are unable to connect using it.
+ *
+ * @param {Error} err optional error
+ * @api private
+ */
+
+ function regenerate(err) {
+ net.destroy();
+
+ self.remove(net);
+ net.removeListener('error', regenerate);
+ net.removeListener('end', regenerate);
+
+ if (err) self.emit('error', err);
+ }
+
+ // listen for events that would mess up the connection
+ net.on('error', regenerate)
+ .on('end', regenerate);
+};
+
+/**
* Allocate a new connection from the connection pool, this can be done async
* that's why we use a error first callback pattern.
*
@@ -45,12 +90,32 @@ Manager.prototype.factory = function factory(builder) {
Manager.prototype.allocate = function allocate(fn) {
if (!this.generator) return fn(new Error('Specify a stream #factory'));
+ /**
+ * Small helper function that allows us to correctly call the callback with
+ * the correct arguments when we generate a new connection as the connection
+ * should be emitting 'connect' befor we can use it. But it can also emit
+ * error if it fails to connect.
+ *
+ * @param {Error} err
+ * @api private
+ */
+
+ function either(err) {
+ fn(err, connection);
+
+ connection.removeListener('error', either);
+ connection.removeListener('connect', either);
+ }
+
var probabilities = []
, self = this
, total, i, probability, connection;
i = total = this.pool.length;
+ // increase the allocation metric
+ this.metrics.allocations++;
+
// check the current pool if we already have a few connections available, so
// we don't have to generate a new connection
while (i--) {
@@ -76,14 +141,21 @@ Manager.prototype.allocate = function allocate(fn) {
// equals the length..
if (this.generator.length === 0) {
connection = this.generator();
- if (connection) return fn(undefined, connection);
+
+ if (connection) {
+ this.listen(connection);
+ this.pool.push(connection);
+
+ return connection.on('error', either).on('connect', either);
+ }
} else {
return this.generator(function generate(err, connection) {
if (err) return fn(err);
if (!connection) return fn(new Error('The #factory failed to generate a stream'));
-
+ self.listen(connection);
self.pool.push(connection);
- return fn(undefined, connection);
+
+ return connection.on('error', either).on('connect', either);
});
}
}
@@ -156,6 +228,9 @@ Manager.prototype.release = function release(net) {
this.pool.splice(net, 1);
if (net.end) net.end();
+ // increase the releases metric
+ this.metrics.releases++;
+
return true;
};
View
@@ -3,13 +3,16 @@
/**
* Expose some globals which will be used during the test suite.
*/
+global.net = require('net');
global.chai = require('chai');
global.expect = global.chai.expect;
/**
* Awesome global hack to automatically increase numbers
*/
var testnumbers = 10000;
+
global.__defineGetter__('TESTNUMBER', function testnumber() {
return testnumbers++;
});
+
@@ -1,10 +1,96 @@
-/*globals expect */
+/*globals expect, TESTNUMBER, net */
describe('connectionpool', function () {
'use strict';
- var ConnectionPool = require('../lib/connectionpool');
+ var ConnectionPool = require('../lib/connectionpool')
+ , port = TESTNUMBER
+ , host = 'localhost'
+ , server;
+
+ before(function before(done) {
+ server = net.createServer(function create(socket) {
+ socket.write('<3');
+ });
+
+ server.listen(port, host, done);
+ });
it('should exported as a function', function () {
expect(ConnectionPool).to.be.a('function');
});
+
+ it('should be an instance of EventEmitter', function () {
+ expect(new ConnectionPool).to.be.instanceof(process.EventEmitter);
+ });
+
+ describe('initialize', function () {
+ it('should update the limit if its supplied', function () {
+ var pool = new ConnectionPool(100);
+
+ expect(pool.limit).to.eql(100);
+ });
+
+ it('should also update the generator if its supplied', function () {
+ function test() {}
+ var pool = new ConnectionPool(100, test);
+
+ expect(pool.generator).to.eql(test);
+ });
+ });
+
+ describe('#factory', function () {
+ it('should throw an error if the supplied factory is not a function', function () {
+ var pool = new ConnectionPool();
+
+ expect(pool.factory).to.throw(Error);
+ });
+
+ it('should not throw an error if a function is supplied', function () {
+ var pool = new ConnectionPool();
+
+ function test() {}
+ expect(pool.factory.bind(pool, test)).to.not.throw(Error);
+ });
+ });
+
+ describe('#allocate', function () {
+ it('should give an error when no #factory is specified', function (done) {
+ var pool = new ConnectionPool();
+
+ pool.allocate(function allocate(err, conn) {
+ expect(err).to.be.an.instanceof(Error);
+ expect(err.message).to.contain('#factory');
+
+ done();
+ });
+ });
+
+ it('should emit an error when we cannot establish a connection', function (done) {
+ var pool = new ConnectionPool()
+ , differentport = TESTNUMBER;
+
+ pool.once('error', function error(err) {
+ expect(err).to.be.an.instanceof(Error);
+ });
+
+ pool.factory(function factory() {
+ return net.connect(differentport, host);
+ });
+
+ // make sure the port is different
+ expect(differentport).to.not.eql(port);
+
+ pool.allocate(function allocate(err, connection) {
+ expect(err).to.be.an.instanceof(Error);
+
+ done();
+ });
+ });
+ });
+
+ after(function after(done) {
+ // in 0.7, we can supply the server with the done callback
+ server.once('close', done);
+ server.close();
+ });
});

0 comments on commit 6c7a8ce

Please sign in to comment.