Skip to content

Commit

Permalink
Small refactoring so that AutoQueueMessageListenerContainer doesn't n…
Browse files Browse the repository at this point in the history
…eed to be

aware of the convention for naming exchange beans. This should avoid potential
pain down the line. Now it must be configured explicitly with an exchange name
and a seperate, optional routing key.
  • Loading branch information
pledbrook committed Jan 27, 2011
1 parent a6b67a1 commit 3f52821
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 57 deletions.
8 changes: 7 additions & 1 deletion RabbitmqGrailsPlugin.groovy
Expand Up @@ -109,7 +109,13 @@ The Rabbit MQ plugin provides integration with the Rabbit MQ Messaging System.
channelTransacted = transactional
connectionFactory = rabbitMQConnectionFactory
concurrentConsumers = connectionFactoryConsumers
exchange = rabbitSubscribe
if (rabbitSubscribe instanceof Map) {
exchangeBeanName = "grails.rabbit.exchange.${rabbitSubscribe.name}"
routingKey = rabbitSubscribe.routingKey ?: '#'
}
else {
exchangeBeanName = "grails.rabbit.exchange.${rabbitSubscribe}"
}
}
}
}
Expand Down
Expand Up @@ -24,37 +24,33 @@ class AutoQueueMessageListenerContainer extends SimpleMessageListenerContainer i
* of the exchange (key: 'name') and the routing key (key: 'routing').
* If no routing key is specified, the match-all wildcard ('#') is used.
*/
Object exchange
String exchangeBeanName

/**
* The routing key to bind the queue to the exchange with. This is
* the 'match-all' wildcard by default: '#'.
*/
String routingKey = '#'

protected void doStart() {
// Check the exchange name has been specified.
if (!exchangeBeanName) {
log.error "Property [exchangeBeanName] must have a value!"
return
}

// First, create a broker-named, temporary queue.
def adminBean = applicationContext.getBean(rabbitAdminBeanName)
def queue = adminBean.declareQueue()

// Get the details of the exchange to bind the temporary queue to.
def exchangeName = exchange
def routingKey = '#'
if (exchange instanceof Map) {
exchangeName = exchange.name
routingKey = exchange.routingKey ?: routingKey
}
else if (!(exchange instanceof CharSequence)) {
log.error "Property [exchange] must be a string or a map - current value: ${exchange.getClass()}"
return
}

// Now bind this queue to the named exchanged. If the exchange is a
// fanout, then we don't bind with a routing key. If it's a topic,
// we use the 'match-all' wildcard. Other exchange types are not
// supported.
def exchange = applicationContext.getBean("grails.rabbit.exchange.${exchangeName}")
def exchange = applicationContext.getBean(exchangeBeanName)
def binding = null
if (exchange instanceof FanoutExchange) {
binding = new Binding(queue, exchange)

if (exchange instanceof Map && exchange.routingKey) {
log.warn "Routing key ignored for fanout exchange '${exchangeName}'"
}
}
else if (exchange instanceof TopicExchange) {
binding = new Binding(queue, exchange, routingKey)
Expand Down
Expand Up @@ -50,9 +50,9 @@ class AutoQueueMessageListenerContainerTests extends GrailsUnitTestCase {
declareBindingCalled = true
}

mockContext.registerMockBean("grails.rabbit.exchange.${exchangeName}", new TopicExchange(exchangeName))
mockContext.registerMockBean(exchangeName, new TopicExchange(exchangeName))

testContainer.exchange = exchangeName
testContainer.exchangeBeanName = exchangeName
testContainer.doStart()

assertTrue "declareBinding() not called", declareBindingCalled
Expand All @@ -62,7 +62,7 @@ class AutoQueueMessageListenerContainerTests extends GrailsUnitTestCase {
* Make sure that a temporary queue is created and that it is bound to the
* topic exchange with the given name and the given routing key.
*/
void testDoStartWithTopicExchangeMap() {
void testDoStartWithTopicExchangeAndRoutingKey() {
def declareBindingCalled = false
def tempQueueName = "dummy-1235"
def exchangeName = "another.topic"
Expand All @@ -76,35 +76,10 @@ class AutoQueueMessageListenerContainerTests extends GrailsUnitTestCase {
declareBindingCalled = true
}

mockContext.registerMockBean("grails.rabbit.exchange.${exchangeName}", new TopicExchange(exchangeName))
mockContext.registerMockBean(exchangeName, new TopicExchange(exchangeName))

testContainer.exchange = [ name: exchangeName, routingKey: routingKey ]
testContainer.doStart()

assertTrue "declareBinding() not called", declareBindingCalled
}

/**
* Make sure that a temporary queue is created and that it is bound to the
* topic exchange with the given name and the 'match-all' wildcard routing
* key when no routing key is given in the configuration map.
*/
void testDoStartWithTopicExchangeMapNoRouting() {
def declareBindingCalled = false
def tempQueueName = "dummy-1235"
def exchangeName = "another.topic"

mockAdminBean.declareQueue = {-> return new Queue(tempQueueName) }
mockAdminBean.declareBinding = { binding ->
assert binding.exchange == exchangeName
assert binding.queue == tempQueueName
assert binding.routingKey == '#'
declareBindingCalled = true
}

mockContext.registerMockBean("grails.rabbit.exchange.${exchangeName}", new TopicExchange(exchangeName))

testContainer.exchange = [ name: exchangeName ]
testContainer.exchangeBeanName = exchangeName
testContainer.routingKey = routingKey
testContainer.doStart()

assertTrue "declareBinding() not called", declareBindingCalled
Expand All @@ -127,9 +102,9 @@ class AutoQueueMessageListenerContainerTests extends GrailsUnitTestCase {
declareBindingCalled = true
}

mockContext.registerMockBean("grails.rabbit.exchange.${exchangeName}", new FanoutExchange(exchangeName))
mockContext.registerMockBean(exchangeName, new FanoutExchange(exchangeName))

testContainer.exchange = exchangeName
testContainer.exchangeBeanName = exchangeName
testContainer.doStart()

assertTrue "declareBinding() not called", declareBindingCalled
Expand All @@ -140,7 +115,7 @@ class AutoQueueMessageListenerContainerTests extends GrailsUnitTestCase {
* fanout exchange with the given name. Even if a routing key is given, it
* should be ignored.
*/
void testDoStartWithFanoutExchangeMap() {
void testDoStartWithFanoutExchangeAndRoutingKey() {
def declareBindingCalled = false
def tempQueueName = "dummy-1235"
def exchangeName = "another.fanout"
Expand All @@ -154,9 +129,10 @@ class AutoQueueMessageListenerContainerTests extends GrailsUnitTestCase {
declareBindingCalled = true
}

mockContext.registerMockBean("grails.rabbit.exchange.${exchangeName}", new FanoutExchange(exchangeName))
mockContext.registerMockBean(exchangeName, new FanoutExchange(exchangeName))

testContainer.exchange = [ name: exchangeName, routingKey: routingKey ]
testContainer.exchangeBeanName = exchangeName
testContainer.routingKey = routingKey
testContainer.doStart()

assertTrue "declareBinding() not called", declareBindingCalled
Expand All @@ -175,9 +151,9 @@ class AutoQueueMessageListenerContainerTests extends GrailsUnitTestCase {
declareBindingCalled = true
}

mockContext.registerMockBean("grails.rabbit.exchange.${exchangeName}", new DirectExchange(exchangeName))
mockContext.registerMockBean(exchangeName, new DirectExchange(exchangeName))

testContainer.exchange = exchangeName
testContainer.exchangeBeanName = exchangeName
testContainer.doStart()

assertFalse "declareBinding() called", declareBindingCalled
Expand Down

0 comments on commit 3f52821

Please sign in to comment.