From fc8f80a1417361c7e53b5f25d00b6e9c5378257d Mon Sep 17 00:00:00 2001 From: Tom Kirkpatrick Date: Fri, 18 Aug 2017 11:27:04 +0200 Subject: [PATCH] fix: wait for app to boot before starting subscriptions --- lib/index.js | 70 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 52 insertions(+), 18 deletions(-) diff --git a/lib/index.js b/lib/index.js index 63da3e9..ee8f6f5 100644 --- a/lib/index.js +++ b/lib/index.js @@ -30,7 +30,7 @@ function setupRabbitMqModel(app, settings) { module.exports = function loopbackComponentMq(app, settings) { settings = settings || {} - debug('initializing message queue component with settings: %o', settings) + debug('initializing message queue component with settings: %O', settings) // Set up the RabbitMQ model. const RabbitMQ = setupRabbitMqModel(app, settings) @@ -52,25 +52,59 @@ module.exports = function loopbackComponentMq(app, settings) { process.exit(1) }) + // Determine which queues should be subscribed to. + const queuesToSubscribeTo = [] + + _.map(settings.topology.queues, queue => { + if (queue.subscribe) { + queuesToSubscribeTo.push(queue) + delete queue.subscribe + } + }) + + // Start subscriptions for a list of queues. + function startSubscriptionsForQueues(queues) { + queues.forEach(queue => rabbit.startSubscription(queue.name)) + debug('Started subscriptions for queues: %O', queuesToSubscribeTo) + } + // Configure the rabbit topology. - rabbit.configure(settings.topology) - .then(() => debug('Rabbit topology configured')) - .catch(err => { - RabbitMQ.log.error('Unable to configure Rabbit topology', err) - process.exit(1) - }) - - const connection = _.get(rabbit, 'configurations.default.connection') - const parsedConnectionUri = url.parse(connection.uri) - const ampqStatsOptions = { - username: connection.user, - password: connection.pass, - hostname: `${parsedConnectionUri.hostname}:${_.get(settings, 'options.restPort', 15672)}`, - protocol: (parsedConnectionUri.protocol === 'amqps') ? 'https' : 'http', + function configureRabbit() { + return rabbit.configure(settings.topology) + .then(() => debug('Rabbit topology configured')) + .catch(err => { + RabbitMQ.log.error('Unable to configure Rabbit topology', err) + process.exit(1) + }) } - debug('Setting up AmqpStats with options: %o', ampqStatsOptions) + // Start subscriptions for a list of queues. + function startSubscriptions() { + if (app.booting) { + return app.on('booted', () => { + startSubscriptionsForQueues(queuesToSubscribeTo) + }) + } + return startSubscriptionsForQueues(queuesToSubscribeTo) + } + + function configureAmqpStats() { + const connection = _.get(rabbit, 'configurations.default.connection') + const parsedConnectionUri = url.parse(connection.uri) + const ampqStatsOptions = { + username: connection.user, + password: connection.pass, + hostname: `${parsedConnectionUri.hostname}:${_.get(settings, 'options.restPort', 15672)}`, + protocol: (parsedConnectionUri.protocol === 'amqps') ? 'https' : 'http', + } + + debug('Setting up AmqpStats with options: %o', ampqStatsOptions) + + // Set up access to AmqpStats. + RabbitMQ.amqpStats = new AmqpStats(ampqStatsOptions) + } - // Set up access to AmqpStats. - RabbitMQ.amqpStats = new AmqpStats(ampqStatsOptions) + configureRabbit() + .then(() => configureAmqpStats()) + .then(() => startSubscriptions()) }