Skip to content

Commit

Permalink
fix the broken message consumer queue proces
Browse files Browse the repository at this point in the history
  • Loading branch information
rahulbile committed Jan 6, 2017
1 parent 467c995 commit fc8e705
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 22 deletions.
1 change: 0 additions & 1 deletion lib/index.js
Expand Up @@ -35,7 +35,6 @@ module.exports = function loopbackComponentMq(app, config) {
// If we have a datasource, wire up Rabbit and setup Queues
if (ds && dsOptions) {
Queue.rabbit = new Rabbit(dsOptions)
RabbitTopology.setupQueues(Queue)
RabbitTopology.setupTopology(Queue)
}

Expand Down
26 changes: 5 additions & 21 deletions lib/rabbit-topology.js
Expand Up @@ -12,16 +12,6 @@ module.exports = function rabbitTopology() {
msqQueue = QueueModel.rabbit.exchange.queue({ name, durable: true })
}

function setupQueues(Model) {
QueueModel = Model

// Loop through all the defined queues
_.forEach(QueueModel.topology, (handlers, queue) => {
// Setup the actual queue on RabbitMQ
setupQueue(queue)
})
}

function setupQueueConsumer(app, queue, definition) {
debug('setupQueueConsumer')
const modelName = definition.model
Expand All @@ -41,10 +31,7 @@ module.exports = function rabbitTopology() {
}

// Start consuming the queue
if (msqQueue) {
msqQueue.consume(Method)
}

msqQueue.consume(Method)

debug('setupQueueConsumer: queue: %s, model: %s, method: %s', queue, modelName, methodName)
}
Expand All @@ -62,12 +49,7 @@ module.exports = function rabbitTopology() {

Model[methodName] = function queueProducer(params) {
debug(`${modelName}.${methodName}(%o)`, params)
if (QueueModel.rabbit && QueueModel.rabbit.exchange) {
QueueModel.rabbit.exchange.publish(params, { key: queue })
}
else {
debug('setupQueueProducer: queue %s is not yet initialised', queue)
}
QueueModel.rabbit.exchange.publish(params, { key: queue })
}
}

Expand All @@ -77,6 +59,9 @@ module.exports = function rabbitTopology() {
// Loop through all the defined queues
_.forEach(QueueModel.topology, (handlers, queue) => {

// Setup the actual queue on RabbitMQ
setupQueue(queue)

// Setup the consumer of this queue
if (handlers.consumer) {
setupQueueConsumer(QueueModel.app, queue, handlers.consumer)
Expand Down Expand Up @@ -112,7 +97,6 @@ module.exports = function rabbitTopology() {

return {
setupQueue,
setupQueues,
setupTopology,
setupQueueProducer,
setupQueueConsumer,
Expand Down

0 comments on commit fc8e705

Please sign in to comment.