diff --git a/.gitignore b/.gitignore index 2fcbb03..4ecb5ef 100644 --- a/.gitignore +++ b/.gitignore @@ -36,3 +36,5 @@ jspm_packages # Optional REPL history .node_repl_history + +local.js \ No newline at end of file diff --git a/.travis.yml b/.travis.yml index 3fb1b13..33e6a43 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,12 @@ language: node_js +sudo: required +addons: + hosts: + - rabbit +services: +- rabbitmq +env: +- NODE_CONFIG_DIR='./test/integration/config' node_js: - "6" - "6.1" diff --git a/README.md b/README.md index 1b8d61b..2ef86c4 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,97 @@ +[![Build Status](https://travis-ci.org/GannettDigital/fudd.svg?branch=master)](https://travis-ci.org/GannettDigital/fudd) +[![Coverage Status](https://coveralls.io/repos/github/GannettDigital/fudd/badge.svg?branch=master)](https://coveralls.io/github/GannettDigital/fudd?branch=master) + # fudd simple rabbit mq infrastructure setup/teardown utility + +## Installation +``` +npm install fudd``` + +## Test + +```npm run test``` +This will run both unit tests & integration tests. To successfully run integration tests, you must have a rabbitmq instance +available & configured. See Configuration for examples + +## Coverage + +```npm run cover-html``` + +## Usage + +``` +var fudd = require('fudd'); + +fudd.setup(config, function(err){ + if(err) throw err; + + // do your thing with rabbitmq +}); + +fudd.teardown(config, function(err){ + if(err) throw err; + + // all things torn +}); +``` + +## Configuration + +Configuration should look like the following: +```javascript +{ + cluster: { + port: 5672, + vhost: '/', + login: 'guest', + heartbeat: 10, + password: 'guest', + host: 'rabbit' + }, + exchanges: [ + { + name: 'fanout.fx', + type: 'fanout', + options: {} + }, + { + name: 'topic.tx', + type: 'topic', + options: {} + } + ], + queues: [ + { + name: 'queue1', + options: {durable: true} + }, + { + name: 'queue2', + options: {durable: false} + } + ], + + bindings: [ + { + bindingType: 'queue', + from: 'fanout.fx', + to: 'queue1', + bindingKeys: ['#'] + }, + { + bindingType: 'exchange', + from: 'fanout.fx', + to: 'topic.tx', + bindingKeys: ['#'], + options: {} + }, + { + bindingType: 'queue', + from: 'topic.tx', + to: 'queue2', + bindingKeys: ['#.topic1', '#.topic2'] + } + ] +} +``` \ No newline at end of file diff --git a/index.js b/index.js new file mode 100644 index 0000000..418e013 --- /dev/null +++ b/index.js @@ -0,0 +1 @@ +module.exports = require('./lib/fudd.js'); \ No newline at end of file diff --git a/lib/amqp-config-utils.js b/lib/amqp-config-utils.js new file mode 100644 index 0000000..80fcd87 --- /dev/null +++ b/lib/amqp-config-utils.js @@ -0,0 +1,15 @@ +'use strict'; + +var format = require('util').format; + +module.exports = function formatAmqpUrl(urlComponents) { + return format( + 'amqp://%s:%s@%s:%s/%s?heartbeat=%s', + urlComponents.login, + urlComponents.password, + urlComponents.host, + urlComponents.port, + encodeURIComponent(urlComponents.vhost), + urlComponents.heartbeat + ); +}; diff --git a/lib/fudd.js b/lib/fudd.js index 85d2ae0..25c9217 100644 --- a/lib/fudd.js +++ b/lib/fudd.js @@ -1,19 +1,107 @@ 'use strict'; +var amqp = require('amqplib/callback_api'); +var formatAmqpUrl = require('./amqp-config-utils.js'); +var series = require('palinode').series; +var mapEach = require('palinode').mapEach; -function Fudd() { +var Fudd = { + setup: function(config, finalCallback) { -} + var establishChannel = [Fudd._connect.bind(null, config), Fudd._createChannel]; -Fudd.prototype.init = function(config) { - return true; -}; + series(establishChannel, function(error, connection, channel) { + if (error) return finalCallback(error); + var infrastructure = []; -Fudd.prototype.setup = function() { + config.exchanges.reduce(function(series, exchangeDefinition) { + series.push(Fudd._createExchange.bind(null, channel, exchangeDefinition)); + return series; + }, infrastructure); -}; + config.queues.reduce(function(series, queueDefinition) { + series.push(Fudd._createQueue.bind(null, channel, queueDefinition)); + return series; + }, infrastructure); + + config.bindings.reduce(function(series, bindingDefinition) { + series.push(Fudd._createBindings.bind(null, channel, bindingDefinition)); + return series; + }, infrastructure); + + series(infrastructure, function(error) { + if (error) return finalCallback(error); + Fudd._disconnect(connection, finalCallback); + }); + }); + }, + teardown: function(config, finalCallback) { + var establishChannel = [Fudd._connect.bind(null, config), Fudd._createChannel]; + + series(establishChannel, function(error, connection, channel) { + if (error) return finalCallback(error); + var infrastructure = []; + + config.exchanges.reduce(function(series, exchangeDefinition) { + infrastructure.push(Fudd._deleteExchange.bind(null, channel, exchangeDefinition)); + }, infrastructure); + + config.queues.reduce(function(series, queueDefinition) { + series.push(Fudd._deleteQueue.bind(null, channel, queueDefinition)); + return series; + }, infrastructure); + + series(infrastructure, function(error) { + if (error) return finalCallback(error); + Fudd._disconnect(connection, finalCallback); + }); + }); + }, + _createExchange: function(channel, exchangeDefinition, callback) { + channel.assertExchange(exchangeDefinition.name, exchangeDefinition.type, exchangeDefinition.options, function(err) { + callback(err); + }); + }, + _createQueue: function(channel, queueDefinition, callback) { + channel.assertQueue(queueDefinition.name, queueDefinition.options, function(err) { + callback(err); + }); + }, + _createBindings: function(channel, bindingDefinition, callback) { + var boundCreateBinding = Fudd._createBinding.bind(null, channel, bindingDefinition); + mapEach(bindingDefinition.bindingKeys, boundCreateBinding, function(error) { + callback(error); + }); + }, + _createBinding: function(channel, bindingDefinition, bindingKey, callback) { + if (bindingDefinition.bindingType === 'queue') { + channel.bindQueue(bindingDefinition.to, bindingDefinition.from, bindingKey, bindingDefinition.options, callback); -Fudd.prototype.tearDown = function() { + } else if (bindingDefinition.bindingType === 'exchange') { + channel.bindExchange(bindingDefinition.to, bindingDefinition.from, bindingKey, bindingDefinition.options, callback); + } else { + callback(new Error('unsupported binding type: ' + bindingDefinition.bindingType)); + } + }, + _deleteExchange: function(channel, exchangeDefinition, callback) { + channel.deleteExchange(exchangeDefinition.name, {}, function(error) { callback(error);}); + }, + _deleteQueue: function(channel, queueDefinition, callback) { + channel.deleteQueue(queueDefinition.name, {}, function(error) { callback(error);}); + }, + _connect: function(config, callback) { + var url = formatAmqpUrl(config.cluster); + amqp.connect(url, callback); + }, + _createChannel: function(connection, callback) { + connection.createChannel(function(error, channel) { + if (error) return callback(error); + callback(null, connection, channel); + }); + }, + _disconnect: function(connection, callback) { + connection.close(callback); + } }; -module.exports = Fudd; +module.exports = Object.create(Fudd); diff --git a/package.json b/package.json index b156fa4..ea1b010 100644 --- a/package.json +++ b/package.json @@ -15,18 +15,22 @@ "homepage": "https://github.com/GannettDigital/fudd#readme", "devDependencies": { "chai": "3.5.0", + "config": "^1.21.0", "coveralls": "2.11.9", "istanbul": "0.4.3", "jscs": "^3.0.4", "mocha": "2.5.3", - "mockery": "1.7.0" + "mockery": "1.7.0", + "sinon": "1.17.4" }, "dependencies": { - "amqplib": "0.4.2" + "amqplib": "0.4.2", + "palinode": "0.0.3" }, "scripts": { - "test": "mocha --recursive test/unit", + "test": "mocha --recursive test/unit --recursive test/integration", "lint": "./node_modules/.bin/jscs ./lib/ ./test", + "fix-lint": "npm run lint -- -x", "cover": "node ./node_modules/.bin/istanbul cover ./node_modules/mocha/bin/_mocha --report lcovonly -- -R spec test/unit --recursive", "coveralls": "npm run cover && ./node_modules/coveralls/bin/coveralls.js < coverage/lcov.info" } diff --git a/test/integration/config/default.js b/test/integration/config/default.js new file mode 100644 index 0000000..2d54edd --- /dev/null +++ b/test/integration/config/default.js @@ -0,0 +1,55 @@ +'use strict'; +module.exports = { + cluster: { + port: 5672, + vhost: '/', + login: 'guest', + heartbeat: 10, + password: 'guest', + host: 'rabbit' + }, + exchanges: [ + { + name: 'fanout.fx', + type: 'fanout', + options: {} + }, + { + name: 'topic.tx', + type: 'topic', + options: {} + } + ], + queues: [ + { + name: 'queue1', + options: {durable: true} + }, + { + name: 'queue2', + options: {durable: false} + } + ], + + bindings: [ + { + bindingType: 'queue', + from: 'fanout.fx', + to: 'queue1', + bindingKeys: ['#'] + }, + { + bindingType: 'exchange', + from: 'fanout.fx', + to: 'topic.tx', + bindingKeys: ['#'], + options: {} + }, + { + bindingType: 'queue', + from: 'topic.tx', + to: 'queue2', + bindingKeys: ['#.topic1', '#.topic2'] + } + ] +}; diff --git a/test/integration/fudd.js b/test/integration/fudd.js new file mode 100644 index 0000000..b32f99b --- /dev/null +++ b/test/integration/fudd.js @@ -0,0 +1,91 @@ +'use strict'; + +var expect = require('chai').expect; +var fudd = require('../../index.js'); +var config = require('config'); +var series = require('palinode').series; + +describe('fudd integration test', function() { + this.timeout(5000); + + var verificationChannel; + var verificationConnection; + var messageCounter = 0; + + it('should setup the infrastructure without error', function(done) { + fudd.setup(config, done); + }); + + config.exchanges.forEach(function(exchangeDefinition) { + it('should verify the exchange ' + exchangeDefinition.name + ' exists', function(done) { + var establishChannel = [fudd._connect.bind(null, config), fudd._createChannel]; + series(establishChannel, function(err, connection, channel) { + if (err) return done(err); + + channel.checkExchange(exchangeDefinition.name, function(err) { + if (err) return done(err); + + fudd._disconnect(connection, done); + }); + + }); + }); + }); + + config.queues.forEach(function(queueDefinition) { + it('should verify the queue ' + queueDefinition.name + ' exists', function(done) { + var establishChannel = [fudd._connect.bind(null, config), fudd._createChannel]; + series(establishChannel, function(err, connection, channel) { + if (err) return done(err); + + channel.checkQueue(queueDefinition.name, function(err) { + if (err) return done(err); + + fudd._disconnect(connection, done); + }); + }); + }); + }); + + it('should establish channel for messages', function(done) { + var establishChannel = [fudd._connect.bind(null, config), fudd._createChannel]; + series(establishChannel, function(err, connection, channel) { + if (err) return done(err); + + verificationChannel = channel; + verificationConnection = connection; + + done(); + }); + }); + + // specific tests + it('should publish messages to the fanout', function() { + verificationChannel.publish('fanout.fx', 'some.key', new Buffer('message1')); + verificationChannel.publish('fanout.fx', 'key.topic1', new Buffer('message1')); + verificationChannel.publish('fanout.fx', 'key.topic2', new Buffer('message1')); + }); + + config.queues.forEach(function(queueDefinition) { + it('should purge the ' + queueDefinition.name + ' queue', function(done) { + verificationChannel.purgeQueue(queueDefinition.name, function(err, ok) { + messageCounter += ok.messageCount; + done(); + }); + }); + }); + + it('should have come across correct number of messages', function() { + // msg - route + // 1 - fanout to queue1 + // 2 - fanout to queue1 + // 3 - fanout to queue1 + // 2 - fanout to topic to queue2 + // 3 - fanout to topic to queue2 + expect(messageCounter).to.equal(5); + }); + + it('should teardown the infrastructure without error', function(done) { + fudd.teardown(config, done); + }); +}); diff --git a/test/unit/fudd.js b/test/unit/fudd.js index 0f3c550..84d2f4f 100644 --- a/test/unit/fudd.js +++ b/test/unit/fudd.js @@ -1,18 +1,499 @@ 'use strict'; var expect = require('chai').expect; +var mockery = require('mockery'); +var sinon = require('sinon'); + +var config = { + exchanges: [ + { + name: 'fanout.fx', + type: 'fanout', + options: {} + } + ], + queues: [ + { + name: 'queue1', + options: {durable: true} + } + ], + bindings: [ + { + bindingType: 'queue', + from: 'fanout.fx', + to: 'queue1', + bindingKeys: ['#'] + } + ] +}; describe('fudd', function() { + var Fudd; + var mapEachStub; + var seriesStub; + + before('enable mockery', function() { + mockery.enable({useCleanCache: true}); + }); + + after('disable mockery', mockery.disable); + + describe('setup and teardown', function() { + var callbackSpy; + var connectBindStub; + var disconnectStub; + var boundConnect = function connectBindResult() { + }; + + before('setup mocks', function() { + mockery.deregisterAll(); + mockery.resetCache(); + mockery.registerAllowable('../../lib/fudd.js'); + mockery.registerMock('amqplib/callback_api', {}); + mockery.registerMock('palinode', { + series: seriesStub = sinon.stub(), + mapEach: mapEachStub = sinon.stub() + }); + mockery.registerMock('./amqp-config-utils.js', {}); + Fudd = require('../../lib/fudd.js'); + }); + + before('setup stubs', function() { + callbackSpy = sinon.spy(); + connectBindStub = sinon.stub(Fudd._connect, 'bind').returns(boundConnect); + Fudd.__proto__._disconnect = disconnectStub = sinon.spy(); + }); + + after('restore stubbed methods', function() { + Fudd._connect.bind.restore(); + }); + + beforeEach('reset stubs', function() { + callbackSpy.reset(); + connectBindStub.reset(); + disconnectStub.reset(); + seriesStub.reset(); + mapEachStub.reset(); + }); + + describe('setup', function() { + var createExchangeBindStub; + var createQueueBindStub; + var createBindingsBindStub; + var boundCreateExchange = function createExchangeBindResult() { + }; + var boundCreateQueue = function createQueueBindResult() { + }; + var boundCreateBindings = function createBindingsBindResult() { + }; + + before('setup stubs', function() { + createExchangeBindStub = sinon.stub(Fudd._createExchange, 'bind').returns(boundCreateExchange); + createQueueBindStub = sinon.stub(Fudd._createQueue, 'bind').returns(boundCreateQueue); + createBindingsBindStub = sinon.stub(Fudd._createBindings, 'bind').returns(boundCreateBindings); + }); + + after('restore stubs', function() { + Fudd._createExchange.bind.restore(); + Fudd._createQueue.bind.restore(); + Fudd._createBindings.bind.restore(); + }); + + beforeEach('reset stubs & invoke', function() { + createExchangeBindStub.reset(); + createQueueBindStub.reset(); + createBindingsBindStub.reset(); + Fudd.setup(config, callbackSpy); + }); + + it('should bind config to the _connect function', function() { + expect(connectBindStub.args[0]).to.eql([ + null, config + ]); + }); + + it('should invoke series with the bound _connect and Fudd._create channel functions', function() { + expect(seriesStub.args[0][0]).to.eql([boundConnect, Fudd._createChannel]); + }); + + it('should call the finalCallback if the first series call calls back with an error', function() { + var expectedError = new Error('things happened'); + seriesStub.callArgWith(1, expectedError); + expect(callbackSpy.args[0]).to.eql([expectedError]); + }); + + it('should call _createExchange.bind for each exchange in the config', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + expect(createExchangeBindStub.callCount).to.equal(config.exchanges.length); + }); + + it('should call _createExchange.bind for each queue in the config', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + expect(createQueueBindStub.callCount).to.equal(config.queues.length); + }); + + it('should call _createExchange.bind for each binding in the config', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + expect(createBindingsBindStub.callCount).to.equal(config.bindings.length); + }); + + it('should invoke series again with a sequence of functions derived from the config', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + expect(seriesStub.args[1][0]).to.eql([ + boundCreateExchange, + boundCreateQueue, + boundCreateBindings + ]); + }); + + it('should call the final callback with the error returned from creating the infrastructure', function() { + var expectedError = new Error('error creating infrastructure'); + seriesStub.callArgWith(1, null, 'connection', 'channel'); + seriesStub.callArgWith(1, expectedError); + expect(callbackSpy.args[0]).to.eql([expectedError]); + }); + + it('should invoke Fudd._disconnect wtih the connection and callback', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + seriesStub.callArgWith(1, null); + expect(disconnectStub.args[0]).to.eql([ + 'connection', callbackSpy + ]); + }); + }); + + describe('teardown', function() { + var deleteExchangeBindStub; + var deleteQueueBindStub; + var boundDeleteExchange = function deleteExchangeBindResult() { + }; + var boundDeleteQueue = function deleteQueueBindResult() { + }; + + before('setup stubs', function() { + deleteExchangeBindStub = sinon.stub(Fudd._deleteExchange, 'bind').returns(boundDeleteExchange); + deleteQueueBindStub = sinon.stub(Fudd._deleteQueue, 'bind').returns(boundDeleteQueue); + }); + + after('restore stubs', function() { + Fudd._deleteExchange.bind.restore(); + Fudd._deleteQueue.bind.restore(); + }); + + beforeEach('reset stubs & invoke', function() { + deleteExchangeBindStub.reset(); + deleteQueueBindStub.reset(); + Fudd.teardown(config, callbackSpy); + }); + + it('should bind config to the _connect function', function() { + expect(connectBindStub.args[0]).to.eql([ + null, config + ]); + }); + + it('should invoke series with the bound _connect and Fudd._create channel functions', function() { + expect(seriesStub.args[0][0]).to.eql([boundConnect, Fudd._createChannel]); + }); + + it('should call the finalCallback if the first series call calls back with an error', function() { + var expectedError = new Error('things happened'); + seriesStub.callArgWith(1, expectedError); + expect(callbackSpy.args[0]).to.eql([expectedError]); + }); + + it('should call _deleteExchange.bind for each exchange in the config', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + expect(deleteExchangeBindStub.callCount).to.equal(config.exchanges.length); + }); + + it('should call _deleteExchange.bind for each queue in the config', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + expect(deleteQueueBindStub.callCount).to.equal(config.queues.length); + }); + + it('should invoke series again with a sequence of functions derived from the config', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + expect(seriesStub.args[1][0]).to.eql([ + boundDeleteExchange, + boundDeleteQueue + ]); + }); + + it('should call the final callback with the error returned from creating the infrastructure', function() { + var expectedError = new Error('error creating infrastructure'); + seriesStub.callArgWith(1, null, 'connection', 'channel'); + seriesStub.callArgWith(1, expectedError); + expect(callbackSpy.args[0]).to.eql([expectedError]); + }); + + it('should invoke Fudd._disconnect wtih the connection and callback', function() { + seriesStub.callArgWith(1, null, 'connection', 'channel'); + seriesStub.callArgWith(1, null); + expect(disconnectStub.args[0]).to.eql([ + 'connection', callbackSpy + ]); + }); + }); + }); + + describe('_createExchange', function() { + var assertExchangeStub; + var mockChannel; + var callbackStub; + var error; + var exchangeDefinition; + + beforeEach(function() { + mockChannel = {assertExchange: assertExchangeStub = sinon.stub()}; + callbackStub = sinon.stub(); + error = new Error('something broke'); + exchangeDefinition = {name: 'na.me', type: 'fanout', options: {opt: 'ion'}}; + }); + + it('should call assertExchange with exchange definition & options', function() { + Fudd._createExchange(mockChannel, exchangeDefinition, callbackStub); + expect(assertExchangeStub.calledWith(exchangeDefinition.name, exchangeDefinition.type, exchangeDefinition.options)).to.equal(true); + }); + + it('should callback with error if assertExchange calls back with error', function() { + assertExchangeStub.callsArgWith(3, error); + Fudd._createExchange(mockChannel, exchangeDefinition, callbackStub); + expect(callbackStub.calledWith(error)); + }); + }); + + describe('_createQueue', function() { + var assertQueueStub; + var mockChannel; + var callbackStub; + var error; + var queueDefinition; + + beforeEach(function() { + mockChannel = {assertQueue: assertQueueStub = sinon.stub()}; + callbackStub = sinon.stub(); + error = new Error('something broke'); + queueDefinition = {name: 'na.me', options: {opt: 'ion'}}; + }); + + it('should call assertQueue with queue definition & options', function() { + Fudd._createQueue(mockChannel, queueDefinition, callbackStub); + expect(assertQueueStub.calledWith(queueDefinition.name, queueDefinition.options)).to.equal(true); + }); + + it('should callback with error if assertQueue calls back with error', function() { + assertQueueStub.callsArgWith(2, error); + Fudd._createQueue(mockChannel, queueDefinition, callbackStub); + expect(callbackStub.calledWith(error)); + }); + }); - var fudd; - before(function() { - var Fudd = require('../../lib/fudd.js'); - fudd = new Fudd(); + describe('_deleteExchange', function() { + var deleteExchangeStub; + var mockChannel; + var callbackStub; + var error; + var exchangeDefinition; + + beforeEach(function() { + mockChannel = {deleteExchange: deleteExchangeStub = sinon.stub()}; + callbackStub = sinon.stub(); + error = new Error('something broke'); + exchangeDefinition = {name: 'na.me', options: {opt: 'ion'}}; + }); + + it('should call deleteExchange with exchange name', function() { + Fudd._deleteExchange(mockChannel, exchangeDefinition, callbackStub); + expect(deleteExchangeStub.calledWith(exchangeDefinition.name, {})).to.equal(true); + }); + + it('should callback with error if deleteExchange calls back with error', function() { + deleteExchangeStub.callsArgWith(2, error); + Fudd._deleteExchange(mockChannel, exchangeDefinition, callbackStub); + expect(callbackStub.calledWith(error)); + }); }); - describe('init', function() { - it('should return true', function() { - var result = fudd.init(); - expect(result).to.equal(true); + describe('_deleteQueue', function() { + var deleteQueue; + var mockChannel; + var callbackStub; + var error; + var queueDefinition; + + beforeEach(function() { + mockChannel = {deleteQueue: deleteQueue = sinon.stub()}; + callbackStub = sinon.stub(); + error = new Error('something broke'); + queueDefinition = {name: 'na.me', options: {opt: 'ion'}}; + }); + + it('should call deleteQueue with queue name', function() { + Fudd._deleteQueue(mockChannel, queueDefinition, callbackStub); + expect(deleteQueue.calledWith(queueDefinition.name, {})).to.equal(true); + }); + + it('should callback with error if deleteQueue calls back with error', function() { + deleteQueue.callsArgWith(2, error); + Fudd._deleteQueue(mockChannel, queueDefinition, callbackStub); + expect(callbackStub.calledWith(error)); + }); + }); + + describe('_createBindings', function() { + var createBindingBindStub; + var boundCreateBinding = function() { + }; + var bindingDefinition; + var callbackSpy; + + before('setup stubs', function() { + createBindingBindStub = sinon.stub(Fudd._createBinding, 'bind').returns(boundCreateBinding); + bindingDefinition = {bindingKeys: ['#', '#.#'], bindingType: 'queue', to: 'x', from: 'y'}; + callbackSpy = sinon.spy(); + }); + + after('restore stubs', function() { + Fudd._createBinding.bind.restore(); + }); + + beforeEach('reset stubs & invoke', function() { + createBindingBindStub.reset(); + callbackSpy.reset(); + Fudd._createBindings({}, bindingDefinition, callbackSpy); + }); + + it('should call mapEach with all bindingKeys and a bound createBinding call', function() { + expect(mapEachStub.calledWith(bindingDefinition.bindingKeys, boundCreateBinding)).to.eql(true); + }); + + it('should callback with error if mapEach calls back with error', function() { + var error = new Error('boo'); + mapEachStub.callsArgWith(2, error); + Fudd._createBindings({}, bindingDefinition, callbackSpy); + expect(callbackSpy.calledWith(error)).to.eql(true); + }); + }); + + describe('_createBinding (queue)', function() { + var bindQueueStub; + var mockChannel; + var callbackStub; + var error; + var bindingDefinition; + + beforeEach(function() { + mockChannel = {bindQueue: bindQueueStub = sinon.stub()}; + callbackStub = sinon.stub(); + error = new Error('something broke'); + bindingDefinition = {to: 'y', from: 'x', options: {opt: 'ion'}, bindingType: 'queue'}; + }); + + it('should call bindQueue with queue name & source exchange', function() { + Fudd._createBinding(mockChannel, bindingDefinition, '#', callbackStub); + expect(bindQueueStub.calledWith(bindingDefinition.to, bindingDefinition.from, '#', bindingDefinition.options)).to.equal(true); + }); + + it('should callback with error if bindQueue calls back with error', function() { + bindQueueStub.callsArgWith(4, error); + Fudd._createBinding(mockChannel, bindingDefinition, '#', callbackStub); + expect(callbackStub.calledWith(error)); + }); + }); + + describe('_createBinding (exchange)', function() { + var bindExchangeStub; + var mockChannel; + var callbackStub; + var error; + var bindingDefinition; + + beforeEach(function() { + mockChannel = {bindExchange: bindExchangeStub = sinon.stub()}; + callbackStub = sinon.stub(); + error = new Error('something broke'); + bindingDefinition = {to: 'y', from: 'x', options: {opt: 'ion'}, bindingType: 'exchange'}; + }); + + it('should call bindExchange with exchange name & source exchange', function() { + Fudd._createBinding(mockChannel, bindingDefinition, '#', callbackStub); + expect(bindExchangeStub.calledWith(bindingDefinition.to, bindingDefinition.from, '#', bindingDefinition.options)).to.equal(true); + }); + + it('should callback with error if bindExchange calls back with error', function() { + bindExchangeStub.callsArgWith(4, error); + Fudd._createBinding(mockChannel, bindingDefinition, '#', callbackStub); + expect(callbackStub.calledWith(error)); + }); + }); + + describe('_createBinding (unsupported)', function() { + var bindExchangeStub; + var mockChannel; + var callbackStub; + var error; + var bindingDefinition; + + beforeEach(function() { + mockChannel = {bindExchange: bindExchangeStub = sinon.stub()}; + callbackStub = sinon.stub(); + error = new Error('something broke'); + bindingDefinition = {to: 'y', from: 'x', options: {opt: 'ion'}, bindingType: 'exhcabge'}; + }); + + it('should callback with error if bindExchange calls back with error', function() { + var message = 'unsupported binding type: ' + bindingDefinition.bindingType; + Fudd._createBinding(mockChannel, bindingDefinition, '#', function(err) { + expect(err.message).to.equal(message); + }); + }); + }); + + describe('_disconnect & createChannel', function() { + + before(function() { + mockery.resetCache(); + mockery.deregisterAll(); + mockery.registerAllowable('../../lib/fudd.js'); + mockery.registerMock('amqplib/callback_api', {}); + mockery.registerMock('palinode', {}); + mockery.registerMock('./amqp-config-utils.js', {}); + Fudd = require('../../lib/fudd.js'); + }); + + var callbackSpy = sinon.spy(); + var connection = { + close: sinon.spy(), + createChannel: sinon.stub() + }; + + beforeEach(function() { + connection.close.reset(); + connection.createChannel.reset(); + callbackSpy.reset(); + }); + + describe('_disconnect', function() { + it('should call close on the provided connection', function() { + Fudd._disconnect(connection, callbackSpy); + expect(connection.close.callCount).to.equal(1); + }); + }); + + describe('_createChannel', function() { + it('should call back with the error from createChannel', function() { + var expectedError = new Error('connection is borked'); + Fudd._createChannel(connection, callbackSpy); + connection.createChannel.callArgWith(0, expectedError); + expect(callbackSpy.args[0]).to.eql([expectedError]); + }); + + it('should call back with the originating connection and the channel from createChannel', function() { + Fudd._createChannel(connection, callbackSpy); + connection.createChannel.callArgWith(0, null, 'this is a fake channel'); + expect(callbackSpy.args[0]).to.eql([null, connection, 'this is a fake channel']); + }); }); }); });