From c62c4a425093fa6fa7ce522605822cb0e021ff5c Mon Sep 17 00:00:00 2001 From: Breezewish Date: Tue, 11 Oct 2016 22:18:05 +0800 Subject: [PATCH] WIP: attempt to promisify broker --- index.js | 11 +-- lib/amqp/Broker.js | 207 +++++++++++++++++++++++---------------------- 2 files changed, 111 insertions(+), 107 deletions(-) diff --git a/index.js b/index.js index 1f7e9f9..8400516 100644 --- a/index.js +++ b/index.js @@ -11,14 +11,11 @@ var Vhost = require('./lib/amqp/Vhost') module.exports = (function() { return { - classes: { - Broker: Broker, - Publication: Publication, - SubscriberSession: SubscriberSession, - Subscription: Subscription, - Vhost: Vhost - }, Broker: Broker, + Publication: Publication, + SubscriberSession: SubscriberSession, + Subscription: Subscription, + Vhost: Vhost, createBroker: Broker.create, defaultConfig: defaultConfig, testConfig: testConfig, diff --git a/lib/amqp/Broker.js b/lib/amqp/Broker.js index dc7c35a..c4ddc42 100644 --- a/lib/amqp/Broker.js +++ b/lib/amqp/Broker.js @@ -13,120 +13,127 @@ var stub = require('../counters/stub') var inMemory = require('../counters/inMemory') var inMemoryCluster = require('../counters/inMemoryCluster').worker -module.exports = { - create: function(config, components, next) { - if (arguments.length === 2) return this.create(config, {}, arguments[1]) - preflight(_.cloneDeep(config), function(err, config) { - if (err) return next(err) - new Broker(config, components).init(next) - }) - } +var init = async.compose(tasks.initShovels, tasks.initSubscriptions, tasks.initPublications, tasks.initCounters, tasks.initVhosts) +var nuke = async.compose(tasks.disconnectVhost, tasks.nukeVhost) +var purge = tasks.purgeVhost +var disconnect = tasks.disconnectVhost +var bounce = tasks.bounceVhost + +function Broker(config, components) { + this.config = config + this.vhosts = {} + this.publications = {} + this.subscriptions = {} + this.counters = _.defaults({}, components.counters, { stub: stub, inMemory: inMemory, inMemoryCluster: inMemoryCluster }) } inherits(Broker, EventEmitter) -function Broker(config, components) { +Broker.create = function(config, components, next) { + if (arguments.length === 2) return this.create(config, {}, arguments[1]) + preflight(_.cloneDeep(config), function(err, config) { + if (err) return next(err) + new Broker(config, components).init(next) + }) +} +Broker.prototype.init = function(next) { var self = this - var vhosts = {} - var publications = {} - var subscriptions = {} - var init = async.compose(tasks.initShovels, tasks.initSubscriptions, tasks.initPublications, tasks.initCounters, tasks.initVhosts) - var nuke = async.compose(tasks.disconnectVhost, tasks.nukeVhost) - var purge = tasks.purgeVhost - var disconnect = tasks.disconnectVhost - var bounce = tasks.bounceVhost - var counters = _.defaults({}, components.counters, { stub: stub, inMemory: inMemory, inMemoryCluster: inMemoryCluster }) - - this.config = config - - this.init = function(next) { - debug('Initialising broker') - init(config, { broker: self, components: { counters: counters } }, function(err, config, ctx) { - vhosts = ctx.vhosts - publications = ctx.publications - subscriptions = ctx.subscriptions - setImmediate(function() { - next(err, self) - }) + debug('Initialising broker') + init(self.config, { broker: self, components: { counters: self.counters } }, function(err, config, ctx) { + self.vhosts = ctx.vhosts + self.publications = ctx.publications + self.subscriptions = ctx.subscriptions + setImmediate(function() { + next(err, self) }) - return this - } + }) + return this +} - this.nuke = function(next) { - debug('Nuking broker') - async.eachSeries(_.values(vhosts), function(vhost, callback) { - nuke(config, { vhost: vhost }, callback) - }, function(err) { - if (err) return next(err) - vhosts = publications = subscriptions = {} - debug('Finished nuking broker') - next() - }) - return this - } +Broker.prototype.nuke = function(next) { + var self = this + debug('Nuking broker') + async.eachSeries(_.values(self.vhosts), function(vhost, callback) { + nuke(self.config, { vhost: vhost }, callback) + }, function(err) { + if (err) return next(err) + self.vhosts = self.publications = self.subscriptions = {} + debug('Finished nuking broker') + next() + }) + return this +} - this.purge = function(next) { - debug('Purging all queues in all vhosts') - async.eachSeries(_.values(vhosts), function(vhost, callback) { - purge(config, { vhost: vhost }, callback) - }, function(err) { - if (err) return next(err) - debug('Finished purging all queues in all vhosts') - next() - }) - return this - } +Broker.prototype.purge = function(next) { + var self = this + debug('Purging all queues in all vhosts') + async.eachSeries(_.values(self.vhosts), function(vhost, callback) { + purge(self.config, { vhost: vhost }, callback) + }, function(err) { + if (err) return next(err) + debug('Finished purging all queues in all vhosts') + next() + }) + return this +} - this.shutdown = function(next) { - debug('Shutting down broker') - async.eachSeries(_.values(vhosts), function(vhost, callback) { - disconnect(config, { vhost: vhost }, callback) - }, function(err) { - if (err) return next(err) - debug('Finished shutting down broker') - next() - }) - return this - } +Broker.prototype.shutdown = function(next) { + var self = this + debug('Shutting down broker') + async.eachSeries(_.values(self.vhosts), function(vhost, callback) { + disconnect(self.config, { vhost: vhost }, callback) + }, function(err) { + if (err) return next(err) + debug('Finished shutting down broker') + next() + }) + return this +} - this.bounce = function(next) { - debug('Bouncing broker') - async.eachSeries(_.values(vhosts), function(vhost, callback) { - bounce(config, { vhost: vhost }, callback) - }, function(err) { - if (err) return next(err) - debug('Finished bouncing broker') - next() - }) - return this - } +Broker.prototype.bounce = function(next) { + var self = this + debug('Bouncing broker') + async.eachSeries(_.values(self.vhosts), function(vhost, callback) { + bounce(self.config, { vhost: vhost }, callback) + }, function(err) { + if (err) return next(err) + debug('Finished bouncing broker') + next() + }) + return this +} - this.publish = function(name, message, overrides, next) { - if (arguments.length === 3) return self.publish(name, message, {}, arguments[2]) - if (_.isString(overrides)) return self.publish(name, message, { routingKey: overrides }, next) - if (!publications[name]) return next(new Error(format('Unknown publication: %s', name))) - publications[name].publish(message, overrides, next) - return this - } +Broker.prototype.publish = function(name, message, overrides, next) { + var self = this + if (arguments.length === 3) return self.publish(name, message, {}, arguments[2]) + if (_.isString(overrides)) return self.publish(name, message, { routingKey: overrides }, next) + if (!self.publications[name]) return next(new Error(format('Unknown publication: %s', name))) + self.publications[name].publish(message, overrides, next) + return this +} - this.forward = function(name, message, overrides, next) { - if (arguments.length === 3) return self.forward(name, message, {}, arguments[2]) - if (_.isString(overrides)) return self.forward(name, message, { routingKey: overrides }, next) - if (!config.publications[name]) return next(new Error(format('Unknown publication: %s', name))) - publications[name].forward(message, overrides, next) - return this - } +Broker.prototype.forward = function(name, message, overrides, next) { + var self = this + if (arguments.length === 3) return self.forward(name, message, {}, arguments[2]) + if (_.isString(overrides)) return self.forward(name, message, { routingKey: overrides }, next) + if (!self.config.publications[name]) return next(new Error(format('Unknown publication: %s', name))) + self.publications[name].forward(message, overrides, next) + return this +} - this.subscribe = function(name, overrides, next) { - if (arguments.length === 2) return self.subscribe(name, {}, arguments[1]) - if (!subscriptions[name]) return next(new Error(format('Unknown subscription: %s', name))) - subscriptions[name].subscribe(overrides, next) - return this - } +Broker.prototype.subscribe = function(name, overrides, next) { + var self = this + if (arguments.length === 2) return self.subscribe(name, {}, arguments[1]) + if (!self.subscriptions[name]) return next(new Error(format('Unknown subscription: %s', name))) + self.subscriptions[name].subscribe(overrides, next) + return this +} - this.getFullyQualifiedName = this.qualify = function(vhost, name) { - return fqn.qualify(name, config.vhosts[vhost].namespace) - } +Broker.prototype.getFullyQualifiedName = this.qualify = function(vhost, name) { + var self = this + return fqn.qualify(name, self.config.vhosts[vhost].namespace) } +module.exports = Broker +