Skip to content

Commit

Permalink
fix: wait for app to boot before starting subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Tom Kirkpatrick committed Aug 18, 2017
1 parent 12fb8a0 commit fc8f80a
Showing 1 changed file with 52 additions and 18 deletions.
70 changes: 52 additions & 18 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ function setupRabbitMqModel(app, settings) {
module.exports = function loopbackComponentMq(app, settings) {
settings = settings || {}

debug('initializing message queue component with settings: %o', settings)
debug('initializing message queue component with settings: %O', settings)

// Set up the RabbitMQ model.
const RabbitMQ = setupRabbitMqModel(app, settings)
Expand All @@ -52,25 +52,59 @@ module.exports = function loopbackComponentMq(app, settings) {
process.exit(1)
})

// Determine which queues should be subscribed to.
const queuesToSubscribeTo = []

_.map(settings.topology.queues, queue => {
if (queue.subscribe) {
queuesToSubscribeTo.push(queue)
delete queue.subscribe
}
})

// Start subscriptions for a list of queues.
function startSubscriptionsForQueues(queues) {
queues.forEach(queue => rabbit.startSubscription(queue.name))
debug('Started subscriptions for queues: %O', queuesToSubscribeTo)
}

// Configure the rabbit topology.
rabbit.configure(settings.topology)
.then(() => debug('Rabbit topology configured'))
.catch(err => {
RabbitMQ.log.error('Unable to configure Rabbit topology', err)
process.exit(1)
})

const connection = _.get(rabbit, 'configurations.default.connection')
const parsedConnectionUri = url.parse(connection.uri)
const ampqStatsOptions = {
username: connection.user,
password: connection.pass,
hostname: `${parsedConnectionUri.hostname}:${_.get(settings, 'options.restPort', 15672)}`,
protocol: (parsedConnectionUri.protocol === 'amqps') ? 'https' : 'http',
function configureRabbit() {
return rabbit.configure(settings.topology)
.then(() => debug('Rabbit topology configured'))
.catch(err => {
RabbitMQ.log.error('Unable to configure Rabbit topology', err)
process.exit(1)
})
}

debug('Setting up AmqpStats with options: %o', ampqStatsOptions)
// Start subscriptions for a list of queues.
function startSubscriptions() {
if (app.booting) {
return app.on('booted', () => {
startSubscriptionsForQueues(queuesToSubscribeTo)
})
}
return startSubscriptionsForQueues(queuesToSubscribeTo)
}

function configureAmqpStats() {
const connection = _.get(rabbit, 'configurations.default.connection')
const parsedConnectionUri = url.parse(connection.uri)
const ampqStatsOptions = {
username: connection.user,
password: connection.pass,
hostname: `${parsedConnectionUri.hostname}:${_.get(settings, 'options.restPort', 15672)}`,
protocol: (parsedConnectionUri.protocol === 'amqps') ? 'https' : 'http',
}

debug('Setting up AmqpStats with options: %o', ampqStatsOptions)

// Set up access to AmqpStats.
RabbitMQ.amqpStats = new AmqpStats(ampqStatsOptions)
}

// Set up access to AmqpStats.
RabbitMQ.amqpStats = new AmqpStats(ampqStatsOptions)
configureRabbit()
.then(() => configureAmqpStats())
.then(() => startSubscriptions())
}

0 comments on commit fc8f80a

Please sign in to comment.