Skip to content

Commit

Permalink
Implemented GRAILSPLUGINS-2525: exchanges can now be declared in the …
Browse files Browse the repository at this point in the history
…Rabbit

configuration and queues bound to them.
  • Loading branch information
pledbrook committed Oct 13, 2010
1 parent 9f7489b commit 269abd1
Show file tree
Hide file tree
Showing 4 changed files with 430 additions and 17 deletions.
54 changes: 45 additions & 9 deletions RabbitmqGrailsPlugin.groovy
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import org.codehaus.groovy.grails.commons.GrailsClassUtils as GCU
import org.grails.rabbitmq.AutoQueueMessageListenerContainer
import org.grails.rabbitmq.RabbitQueueBuilder
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
Expand Down Expand Up @@ -108,17 +109,52 @@ The Rabbit MQ plugin provides integration with the Rabbit MQ Messaging System.
def queuesConfig = application.config.rabbitmq?.queues
if(queuesConfig) {
def queueBuilder = new RabbitQueueBuilder()
queuesConfig = queuesConfig.clone()
queuesConfig.delegate = queueBuilder
queuesConfig.resolveStrategy = Closure.DELEGATE_FIRST
queuesConfig()
def queues = queueBuilder.queues
if(queues) {
queues.each { queue ->
"grails.rabbit.queue.${queue.name}"(Queue, queue.name) {
durable = queue.durable
autoDelete = queue.autoDelete
exclusive = queue.exclusive
arguments = queue.arguments
}

// Deal with declared exchanges first.
queueBuilder.exchanges?.each { exchange ->
if (log.debugEnabled) {
log.debug "Registering exchange '${exchange.name}'"
}

"grails.rabbit.exchange.${exchange.name}"(exchange.type, exchange.name) {
durable = exchange.durable
autoDelete = exchange.autoDelete
arguments = exchange.arguments
}
}

// Next, the queues.
queueBuilder.queues?.each { queue ->
if (log.debugEnabled) {
log.debug "Registering queue '${queue.name}'"
}

"grails.rabbit.queue.${queue.name}"(Queue, queue.name) {
durable = queue.durable
autoDelete = queue.autoDelete
exclusive = queue.exclusive
arguments = queue.arguments
}
}

// Finally, the bindings between exchanges and queues.
queueBuilder.bindings?.each { binding ->
if (log.debugEnabled) {
log.debug "Registering binding between exchange '${binding.exchange}' & queue '${binding.queue}'"
}

def args = [ ref("grails.rabbit.exchange.${binding.exchange}"), ref ("grails.rabbit.queue.${binding.queue}") ]
if (binding.rule) {
log.debug "Binding with rule '${binding.rule}'"
args << binding.rule.toString()
}

"grails.rabbit.binding.${binding.exchange}.${binding.queue}"(Binding, *args) {
arguments = binding.arguments
}
}
}
Expand Down
13 changes: 10 additions & 3 deletions src/groovy/org/grails/rabbitmq/GrailsRabbitAdmin.groovy
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.grails.rabbitmq

import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.Exchange
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitAdmin
Expand Down Expand Up @@ -30,10 +32,15 @@ class GrailsRabbitAdmin extends RabbitAdmin implements SmartLifecycle, Applicati
}

void start() {
def exchanges = applicationContext.getBeansOfType(Exchange)?.values()
exchanges?.each { ex -> declareExchange ex }

def queues = applicationContext.getBeansOfType(Queue)?.values()
queues?.each { queue ->
declareQueue queue
}
queues?.each { queue -> declareQueue queue }

def bindings = applicationContext.getBeansOfType(Binding)?.values()
bindings?.each { b -> declareBinding b }

running = true
}

Expand Down
96 changes: 92 additions & 4 deletions src/groovy/org/grails/rabbitmq/RabbitQueueBuilder.groovy
Original file line number Diff line number Diff line change
@@ -1,23 +1,111 @@
package org.grails.rabbitmq

import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.*
import org.slf4j.LoggerFactory

class RabbitQueueBuilder {

private final static log = LoggerFactory.getLogger(RabbitQueueBuilder)

def bindings = []
def exchanges = []
def queues = []

private currentExchange

def methodMissing(String methodName, args) {
def queue = new Queue(methodName)

if(args) {
def argsMap = args[0]
def argsMap = args ? args[0] : [:]
if(argsMap) {
queue.autoDelete = Boolean.valueOf(argsMap.autoDelete)
queue.exclusive = Boolean.valueOf(argsMap.exclusive)
queue.durable = Boolean.valueOf(argsMap.durable)
if(argsMap.arguments instanceof Map) {
queue.arguments = argsMap.arguments
}
}

// If we are nested inside of an exchange definition, create
// a binding between the queue and the exchange.
if (currentExchange) {
def newBinding = [ queue: methodName, exchange: currentExchange.name ]
bindings << newBinding

switch (currentExchange.type) {
case DirectExchange:
if (argsMap.binding && !(argsMap.binding instanceof CharSequence)) {
throw new RuntimeException(
"The binding for queue '${methodName}' to direct " +
"exchange '${currentExchange.name}' must be a string.")
}

// Use the name of the queue as a default binding if no
// explicit one is declared.
newBinding.rule = argsMap.binding ?: queue.name
break

case FanoutExchange:
// Any binding will be ignored.
log.warn "'${currentExchange.name}' is a fanout exchange - binding for queue '${methodName}' ignored"
break

/* Enable once we have a version of Spring AMQP with HeadersExchange included
case HeadersExchange:
if (!(argsMap.binding instanceof Map)) {
throw new RuntimeException(
"The binding for queue '${methodName}' to headers " +
"exchange '${currentExchange.name}' must be declared " +
"and must be a map.")
}
newBinding.rule = argsMap.binding
break
*/

case TopicExchange:
if (!(argsMap.binding instanceof CharSequence)) {
throw new RuntimeException(
"The binding for queue '${methodName}' to topic " +
"exchange '${currentExchange.name}' must be declared " +
"and must be a string.")
}

newBinding.rule = argsMap.binding
break
}
}
queues << queue
}
}

/**
* Defines a new exchange.
* @param args The properties of the exchange, such as its name
* and whether it is durable or not.
* @param c An optional closure that includes queue definitions.
* If provided, the queues are bound to this exchange.
*/
def exchange(Map args, Closure c = null) {
if (currentExchange) throw new RuntimeException("Cannot declare an exchange within another exchange")
if (!args.name) throw new RuntimeException("A name must be provided for the exchange")
if (!args.type) throw new RuntimeException("A type must be provided for the exchange '${args.name}'")
exchanges << [ *:args ]

if (c) {
currentExchange = exchanges[-1]
c = c.clone()
c.delegate = this
c()
}

// Clear the current exchange regardless of whether there was
// a closure argument or not. Just an extra safety measure.
currentExchange = null
}

def getDirect() { return DirectExchange }
def getFanout() { return FanoutExchange }
def getHeaders() { return HeadersExchange }
def getTopic() { return TopicExchange }
}
Loading

0 comments on commit 269abd1

Please sign in to comment.