Skip to content
This repository has been archived by the owner on Jun 7, 2018. It is now read-only.

Commit

Permalink
WIP: attempt to promisify broker
Browse files Browse the repository at this point in the history
  • Loading branch information
breezewish committed Oct 11, 2016
1 parent b4f651e commit c62c4a4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 53 deletions.
5 changes: 1 addition & 4 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,11 @@ var Vhost = require('./lib/amqp/Vhost')

module.exports = (function() {
return {
classes: {
Broker: Broker,
Publication: Publication,
SubscriberSession: SubscriberSession,
Subscription: Subscription,
Vhost: Vhost
},
Broker: Broker,
Vhost: Vhost,
createBroker: Broker.create,
defaultConfig: defaultConfig,
testConfig: testConfig,
Expand Down
105 changes: 56 additions & 49 deletions lib/amqp/Broker.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,63 +13,63 @@ var stub = require('../counters/stub')
var inMemory = require('../counters/inMemory')
var inMemoryCluster = require('../counters/inMemoryCluster').worker

module.exports = {
create: function(config, components, next) {
if (arguments.length === 2) return this.create(config, {}, arguments[1])
preflight(_.cloneDeep(config), function(err, config) {
if (err) return next(err)
new Broker(config, components).init(next)
})
}
}

inherits(Broker, EventEmitter)

function Broker(config, components) {

var self = this
var vhosts = {}
var publications = {}
var subscriptions = {}
var init = async.compose(tasks.initShovels, tasks.initSubscriptions, tasks.initPublications, tasks.initCounters, tasks.initVhosts)
var nuke = async.compose(tasks.disconnectVhost, tasks.nukeVhost)
var purge = tasks.purgeVhost
var disconnect = tasks.disconnectVhost
var bounce = tasks.bounceVhost
var counters = _.defaults({}, components.counters, { stub: stub, inMemory: inMemory, inMemoryCluster: inMemoryCluster })

function Broker(config, components) {
this.config = config
this.vhosts = {}
this.publications = {}
this.subscriptions = {}
this.counters = _.defaults({}, components.counters, { stub: stub, inMemory: inMemory, inMemoryCluster: inMemoryCluster })
}

inherits(Broker, EventEmitter)

this.init = function(next) {
Broker.create = function(config, components, next) {
if (arguments.length === 2) return this.create(config, {}, arguments[1])
preflight(_.cloneDeep(config), function(err, config) {
if (err) return next(err)
new Broker(config, components).init(next)
})
}

Broker.prototype.init = function(next) {
var self = this
debug('Initialising broker')
init(config, { broker: self, components: { counters: counters } }, function(err, config, ctx) {
vhosts = ctx.vhosts
publications = ctx.publications
subscriptions = ctx.subscriptions
init(self.config, { broker: self, components: { counters: self.counters } }, function(err, config, ctx) {
self.vhosts = ctx.vhosts
self.publications = ctx.publications
self.subscriptions = ctx.subscriptions
setImmediate(function() {
next(err, self)
})
})
return this
}

this.nuke = function(next) {
Broker.prototype.nuke = function(next) {
var self = this
debug('Nuking broker')
async.eachSeries(_.values(vhosts), function(vhost, callback) {
nuke(config, { vhost: vhost }, callback)
async.eachSeries(_.values(self.vhosts), function(vhost, callback) {
nuke(self.config, { vhost: vhost }, callback)
}, function(err) {
if (err) return next(err)
vhosts = publications = subscriptions = {}
self.vhosts = self.publications = self.subscriptions = {}
debug('Finished nuking broker')
next()
})
return this
}

this.purge = function(next) {
Broker.prototype.purge = function(next) {
var self = this
debug('Purging all queues in all vhosts')
async.eachSeries(_.values(vhosts), function(vhost, callback) {
purge(config, { vhost: vhost }, callback)
async.eachSeries(_.values(self.vhosts), function(vhost, callback) {
purge(self.config, { vhost: vhost }, callback)
}, function(err) {
if (err) return next(err)
debug('Finished purging all queues in all vhosts')
Expand All @@ -78,10 +78,11 @@ function Broker(config, components) {
return this
}

this.shutdown = function(next) {
Broker.prototype.shutdown = function(next) {
var self = this
debug('Shutting down broker')
async.eachSeries(_.values(vhosts), function(vhost, callback) {
disconnect(config, { vhost: vhost }, callback)
async.eachSeries(_.values(self.vhosts), function(vhost, callback) {
disconnect(self.config, { vhost: vhost }, callback)
}, function(err) {
if (err) return next(err)
debug('Finished shutting down broker')
Expand All @@ -90,10 +91,11 @@ function Broker(config, components) {
return this
}

this.bounce = function(next) {
Broker.prototype.bounce = function(next) {
var self = this
debug('Bouncing broker')
async.eachSeries(_.values(vhosts), function(vhost, callback) {
bounce(config, { vhost: vhost }, callback)
async.eachSeries(_.values(self.vhosts), function(vhost, callback) {
bounce(self.config, { vhost: vhost }, callback)
}, function(err) {
if (err) return next(err)
debug('Finished bouncing broker')
Expand All @@ -102,31 +104,36 @@ function Broker(config, components) {
return this
}

this.publish = function(name, message, overrides, next) {
Broker.prototype.publish = function(name, message, overrides, next) {
var self = this
if (arguments.length === 3) return self.publish(name, message, {}, arguments[2])
if (_.isString(overrides)) return self.publish(name, message, { routingKey: overrides }, next)
if (!publications[name]) return next(new Error(format('Unknown publication: %s', name)))
publications[name].publish(message, overrides, next)
if (!self.publications[name]) return next(new Error(format('Unknown publication: %s', name)))
self.publications[name].publish(message, overrides, next)
return this
}

this.forward = function(name, message, overrides, next) {
Broker.prototype.forward = function(name, message, overrides, next) {
var self = this
if (arguments.length === 3) return self.forward(name, message, {}, arguments[2])
if (_.isString(overrides)) return self.forward(name, message, { routingKey: overrides }, next)
if (!config.publications[name]) return next(new Error(format('Unknown publication: %s', name)))
publications[name].forward(message, overrides, next)
if (!self.config.publications[name]) return next(new Error(format('Unknown publication: %s', name)))
self.publications[name].forward(message, overrides, next)
return this
}

this.subscribe = function(name, overrides, next) {
Broker.prototype.subscribe = function(name, overrides, next) {
var self = this
if (arguments.length === 2) return self.subscribe(name, {}, arguments[1])
if (!subscriptions[name]) return next(new Error(format('Unknown subscription: %s', name)))
subscriptions[name].subscribe(overrides, next)
if (!self.subscriptions[name]) return next(new Error(format('Unknown subscription: %s', name)))
self.subscriptions[name].subscribe(overrides, next)
return this
}

this.getFullyQualifiedName = this.qualify = function(vhost, name) {
return fqn.qualify(name, config.vhosts[vhost].namespace)
}
Broker.prototype.getFullyQualifiedName = this.qualify = function(vhost, name) {
var self = this
return fqn.qualify(name, self.config.vhosts[vhost].namespace)
}

module.exports = Broker

0 comments on commit c62c4a4

Please sign in to comment.