Permalink
Browse files

Added RabbitMQ middleware & tests

  • Loading branch information...
1 parent ccbe8fc commit 067e3248131d9215a7cd109f1ef6607a218b4749 @mendezcode mendezcode committed Sep 5, 2012
Showing with 256 additions and 3 deletions.
  1. +4 −1 .travis.yml
  2. +4 −2 README.md
  3. +3 −0 dependencies.json
  4. +149 −0 middleware/mq.js
  5. +96 −0 test/middleware/mq.js
View
@@ -17,4 +17,7 @@ script:
- "NODE_ENV=travis make tests"
notifications:
- email: false
+ email: false
+
+services:
+ - rabbitmq
View
@@ -13,7 +13,7 @@ support might be added in the future.
- Fast Configuration with bootstrap file
- Multi Core Cluster support
- Easy Deployment with JSON or Command line
-- MongoDB, MySQL & Redis Development Stack
+- MongoDB, MySQL, Redis & RabbitMQ Development Stack
- Application Helpers
- Application Models supporting ORM & Relationships
- Database Drivers & Storages
@@ -71,6 +71,7 @@ Here's the Development Stack the framework provides, in a nutshell:
[Redis](https://github.com/derdesign/protos/blob/master/middleware/logger/transport-redis.js), [File](https://github.com/derdesign/protos/blob/master/middleware/logger/transport-file.js),
[Console](https://github.com/derdesign/protos/blob/master/middleware/logger/transport-console.js),
[JSON](https://github.com/derdesign/protos/blob/master/middleware/logger/transport-console.js)
+- **Message Queue**   _Use the mq middleware_
To install the driver & storage component dependencies, use the `protos install <component>` command. For a full list of components and
their dependencies, see the [dependencies.json](https://github.com/derdesign/protos/blob/master/dependencies.json) file.
@@ -94,7 +95,8 @@ The Application's functionality can be extended with the following (ready to use
- [session](http://derdesign.github.com/protos/middleware#session) &nbsp; *Full session support with Storages, guest sessions and regeneration*
- [shortcode](http://derdesign.github.com/protos/middleware#shortcode) &nbsp; *Allows custom content to be inserted into views using shortcodes*
- [socket_io](http://derdesign.github.com/protos/middleware#socket_io) &nbsp; *Socket.io Integration with applications*
-- [static_server](http://derdesign.github.com/protos/middleware#static_server) &nbsp; *Complete Static Server solution, supporting Ranges, Conditional GETs, etc.*
+- [static_server](http://derdesign.github.com/protos/middleware#static_server) &nbsp; *Complete Static Server solution, supporting Ranges, Conditional GETs, etc*
+- [mq](http://derdesign.github.com/protos/middleware#mq) &nbsp; *Message Queue solution using RabbitMQ*
To install the middleware dependencies, use the `protos install <middleware>` command. For a full list of components and
their dependencies, see the [dependencies.json](https://github.com/derdesign/protos/blob/master/dependencies.json) file.
View
@@ -24,6 +24,9 @@
"aws": {
"aws2js" : ">= 0.7.2"
},
+ "mq": {
+ "amqp" : ">= 0.1.3"
+ },
"bcrypt": {
"bcrypt" : ">= 0.7.1"
},
View
@@ -0,0 +1,149 @@
+
+/**
+ AMQP (Advanced Message Queue Protocol)
+
+ Provides support for RabbitMQ or any other server that implements the AMQP protocol.
+
+ » References:
+
+ https://github.com/postwait/node-amqp
+ http://www.rabbitmq.com/documentation.html
+ http://www.rabbitmq.com/man/rabbitmqctl.1.man.html
+ http://www.rabbitmq.com/tutorials/amqp-concepts.html
+
+ » Example:
+
+ app.use('mq', {
+ server: 'amqp://localhost:5672',
+ queues: ['my_queue'],
+ exchanges: {
+ alpha: {type: 'fanout'}
+ },
+ onInitialize: function(err, queues, exchanges) {
+ app.log("MQ Initialized...");
+ app.mq.queue('', function(err, q) {
+ var counter = 0;
+ q.bind(exchanges.alpha, '');
+ q.subscribe(function(msg) {
+ app.log("mq: " + msg.data.toString('utf8'));
+ });
+ setInterval(function() {
+ counter++;
+ exchanges.alpha.publish('', "Hello World! " + counter);
+ }, 1000);
+ });
+ }
+ });
+
+ */
+
+var app = protos.app,
+ Multi = protos.require('multi'),
+ amqp = protos.requireDependency('amqp', 'Message Queue Middleware', 'mq');
+
+function MessageQueue(config, middleware) {
+
+ var self = this;
+
+ // Set app instance
+ app[middleware] = this;
+
+ // Extend config
+ var config = protos.extend({
+ server: null,
+ defaultExchange: null,
+ queues: {},
+ exchanges: {},
+ onBeforeCreate: function(multi) { },
+ onInitialize: function(err, queues, exchanges) { }
+ }, config);
+
+ // Exit if config.server not available
+ if (!config.server) throw new Error("MQ: Server is required");
+
+ // Set queues
+ this.queues = {};
+
+ // Set random queues
+ this.randomQueues = [];
+
+ // Set exchanges
+ this.exchanges = {};
+
+ // Set connection
+ this.connection = amqp.createConnection({
+ url: config.server,
+ defaultExchangeName: config.defaultExchange
+ });
+
+ // Process queues & exchanges on ready
+ this.connection.on('ready', function() {
+
+ var multi = new Multi(self);
+ var qCount = 0;
+ var eCount = 0;
+
+ // Queues
+ if (config.queues instanceof Array) {
+ config.queues.forEach(function(q) {
+ qCount++;
+ multi.queue(q);
+ });
+ } else {
+ for (var q in config.queues) {
+ qCount++;
+ multi.queue(q, config.queues[q]);
+ }
+ }
+
+ // Exchanges
+ if (config.exchanges instanceof Array) {
+ config.exchanges.forEach(function(e) {
+ eCount++;
+ multi.exchange(e);
+ });
+ } else {
+ for (var e in config.exchanges) {
+ eCount++;
+ multi.exchange(e, config.exchanges[e]);
+ }
+ }
+
+ // Run config.onBeforeCreate
+ config.onBeforeCreate(multi);
+
+ // Run multi
+ multi.exec(function(err, results) {
+ app.emit('mq_init', err, self.queues, self.exchanges);
+ config.onInitialize.call(self, err, self.queues, self.exchanges);
+ });
+ });
+
+}
+
+MessageQueue.prototype.queue = function(name, options, callback) {
+ var self = this;
+ if (!callback) { callback = options; options = {}; }
+ if (name && name in this.queues) callback.call(this, null, this.queues[name]);
+ else {
+ this.connection.queue(name, options, function(queue) {
+ if (name) self.queues[name] = queue;
+ else self.randomQueues.push(queue);
+ callback.call(self, null, queue);
+ });
+ }
+}
+
+MessageQueue.prototype.exchange = function(name, options, callback) {
+ var self = this;
+ if (!callback) { callback = options; options = {}; }
+ if (name in this.exchanges) callback.call(this, null, this.exchanges[name]);
+ else {
+ this.connection.exchange(name, options, function(exchange) {
+ self.exchanges[name] = exchange;
+ callback.call(self, null, exchange);
+ });
+ }
+}
+
+module.exports = MessageQueue;
View
@@ -0,0 +1,96 @@
+
+var app = require('../fixtures/bootstrap');
+
+var vows = require('vows'),
+ assert = require('assert'),
+ EventEmitter = require('events').EventEmitter;
+
+var onBeforeCreateCheck;
+
+vows.describe('Message Queue (middleware)').addBatch({
+
+ '': {
+
+ topic: function() {
+
+ var promise = new EventEmitter();
+
+ app.use('mq', {
+ server: 'amqp://localhost:5672',
+ queues: ['my_queue', 'another_queue'],
+ exchanges: {
+ alpha: {type: 'fanout'},
+ beta: {type: 'topic'}
+ },
+ onBeforeCreate: function(multi) {
+ onBeforeCreateCheck =
+ (Object.keys(app.mq.queues).length === 0) // There should be no queues set
+ && (Object.keys(app.mq.exchanges).length === 0) // There should be no exchanges set
+ && (multi.constructor.name == 'Multi') // The parameter should contain a multi instance
+ && (multi.queue instanceof Function && multi.exchange instanceof Function); // Is the correct multi
+ },
+ onInitialize: function(err, queues, exchanges) {
+ app.log("MQ Initialized...");
+ app.mq.queue('', function(err, q) {
+ var counter = 0;
+ q.bind(exchanges.alpha, '');
+ q.subscribe(function(msg) {
+ promise.emit('success', msg.data.toString('utf8'));
+ });
+ var intID = setInterval(function() {
+ counter++;
+ exchanges.alpha.publish('', "Hello World! " + counter);
+ clearTimeout(intID);
+ }, 500);
+ });
+ }
+ });
+
+ return promise;
+
+ },
+
+ 'Passes Integrity Checks': function() {
+ assert.equal(app.mq.constructor.name, 'MessageQueue');
+ assert.equal(app.mq.connection.constructor.name, 'Connection');
+ },
+
+ 'Properly sets queues': function(msg) {
+ var queues = app.mq.queues;
+ assert.isTrue(typeof queues == 'object');
+ assert.deepEqual(Object.keys(queues), ['my_queue', 'another_queue']);
+ assert.isTrue(queues.my_queue.name == 'my_queue');
+ assert.isTrue(queues.my_queue.constructor.name == 'Queue');
+ assert.isTrue(queues.another_queue.name == 'another_queue');
+ assert.isTrue(queues.another_queue.constructor.name == 'Queue');
+ },
+
+ 'Properly sets random queues': function() {
+ assert.isArray(app.mq.randomQueues);
+ assert.isTrue(app.mq.randomQueues.length == 1);
+ assert.equal(app.mq.randomQueues[0].constructor.name, 'Queue');
+ },
+
+ 'Runs the onBeforeCreate callback': function() {
+ assert.isTrue(onBeforeCreateCheck);
+ },
+
+ 'Properly configures exchanges': function(msg) {
+ var exchanges = app.mq.exchanges;
+ assert.isTrue(typeof exchanges == 'object');
+ assert.deepEqual(Object.keys(exchanges), ['alpha', 'beta']);
+ assert.isTrue(exchanges.alpha.name == 'alpha');
+ assert.isTrue(exchanges.alpha.constructor.name == 'Exchange');
+ assert.isTrue(exchanges.alpha.options.type == 'fanout');
+ assert.isTrue(exchanges.beta.name == 'beta');
+ assert.isTrue(exchanges.beta.constructor.name == 'Exchange');
+ assert.isTrue(exchanges.beta.options.type == 'topic');
+ },
+
+ 'Messages are sent/received properly': function(msg) {
+ assert.strictEqual(msg, 'Hello World! 1');
+ }
+
+ }
+
+}).export(module);

0 comments on commit 067e324

Please sign in to comment.