Permalink
Browse files

Implement GPRABBITMQ-29: allow full configuration of listeners.

This commit allows users to configure almost every aspect of their service listeners through a combination of the bean property override mechanism and the rabbitQueue/rabbitSubscribe properties. This includes determining whether a particular service listener is transactional, what message converter is uses, and how many consumers it has.

The change involved a fairly significant refactor that made it easy to add automatic reloading of service listeners - so users can modify them on the fly now too!
  • Loading branch information...
pledbrook committed Mar 6, 2012
1 parent 3c3ce20 commit 600ed18ad604ae965b723df9eaa34dbaf15dd43b
View
@@ -1,7 +1,9 @@
-import org.codehaus.groovy.grails.commons.GrailsClassUtils as GCU
+import org.codehaus.groovy.grails.commons.ServiceArtefactHandler
import org.grails.rabbitmq.AutoQueueMessageListenerContainer
import org.grails.rabbitmq.RabbitDynamicMethods
+import org.grails.rabbitmq.RabbitErrorHandler
import org.grails.rabbitmq.RabbitQueueBuilder
+import org.grails.rabbitmq.RabbitServiceConfigurer
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.Queue
import org.springframework.amqp.rabbit.core.RabbitAdmin
@@ -13,7 +15,7 @@ import org.grails.rabbitmq.RabbitConfigurationHolder
class RabbitmqGrailsPlugin {
// the plugin version
- def version = "0.3.4-SNAPSHOT"
+ def version = "0.4-SNAPSHOT"
// the version or versions of Grails the plugin is designed for
def grailsVersion = "1.2 > *"
// the other plugins this plugin depends on
@@ -46,8 +48,6 @@ class RabbitmqGrailsPlugin {
def loadAfter = ['services']
def observe = ['*']
- private static LISTENER_CONTAINER_SUFFIX = '_MessageListenerContainer'
-
def doWithSpring = {
def rabbitmqConfig = application.config.rabbitmq
@@ -69,7 +69,8 @@ class RabbitmqGrailsPlugin {
log.debug "Connecting to rabbitmq ${connectionFactoryUsername}@${connectionFactoryHostname} with ${configHolder.getDefaultConcurrentConsumers()} consumers."
- def connectionFactoryClassName = connectionFactoryConfig?.className ?: 'org.springframework.amqp.rabbit.connection.CachingConnectionFactory'
+ def connectionFactoryClassName = connectionFactoryConfig?.className ?:
+ 'org.springframework.amqp.rabbit.connection.CachingConnectionFactory'
def parentClassLoader = getClass().classLoader
def loader = new GroovyClassLoader(parentClassLoader)
def connectionFactoryClass = loader.loadClass(connectionFactoryClassName)
@@ -87,72 +88,22 @@ class RabbitmqGrailsPlugin {
if (messageConverterBean) messageConverter = ref(messageConverterBean)
}
adm(RabbitAdmin, rabbitMQConnectionFactory)
+ rabbitErrorHandler(RabbitErrorHandler)
+
+ // Add beans to hook up services as AMQP listeners.
Set registeredServices = new HashSet()
- application.serviceClasses.each { service ->
- def serviceClass = service.clazz
- def propertyName = service.propertyName
+ for(service in application.serviceClasses) {
+ def serviceConfigurer = new RabbitServiceConfigurer(service, rabbitmqConfig)
+ if(!serviceConfigurer.isListener() || !configHolder.isServiceEnabled(service)) continue
- def transactional = service.transactional
- if (!(rabbitmqConfig."${propertyName}".transactional instanceof ConfigObject)) {
- transactional = rabbitmqConfig."${propertyName}".transactional as Boolean
+ def propertyName = service.propertyName
+ if(!registeredServices.add(propertyName)) {
+ throw new IllegalArgumentException(
+ "Unable to initialize rabbitmq listeners properly." +
+ " More than one service named ${propertyName}.")
}
- def rabbitQueue = GCU.getStaticPropertyValue(serviceClass, 'rabbitQueue')
- if(rabbitQueue) {
- if(configHolder.isServiceEnabled(service)) {
- def serviceConcurrentConsumers = configHolder.getServiceConcurrentConsumers(service)
- log.info("Setting up rabbitmq listener for ${service.clazz} with ${serviceConcurrentConsumers} consumer(s)")
- if(!registeredServices.add(propertyName)){
- throw new IllegalArgumentException("Unable to initialize rabbitmq listeners properly. More than one service named ${propertyName}.")
- }
-
- "${propertyName}${LISTENER_CONTAINER_SUFFIX}"(SimpleMessageListenerContainer) {
- // We manually start the listener once we have attached the
- // service in doWithApplicationContext.
- autoStartup = false
- channelTransacted = transactional
- connectionFactory = rabbitMQConnectionFactory
- concurrentConsumers = serviceConcurrentConsumers
- queueNames = rabbitQueue
- }
- } else {
- log.info("Not listening to ${service.clazz} it is disabled in configuration")
- }
- }
- else {
- def rabbitSubscribe = GCU.getStaticPropertyValue(serviceClass, 'rabbitSubscribe')
- if (rabbitSubscribe) {
- if (!(rabbitSubscribe instanceof CharSequence) && !(rabbitSubscribe instanceof Map)) {
- log.error "The 'rabbitSubscribe' property on service ${service.fullName} must be a string or a map"
- }
- else {
- if(configHolder.isServiceEnabled(service)) {
- def serviceConcurrentConsumers = configHolder.getServiceConcurrentConsumers(service)
- log.info("Setting up rabbitmq listener for ${service.clazz} with ${serviceConcurrentConsumers} consumer(s)")
- if(!registeredServices.add(propertyName)){
- throw new IllegalArgumentException("Unable to initialize rabbitmq listeners properly. More than one service named ${propertyName}.")
- }
- "${propertyName}${LISTENER_CONTAINER_SUFFIX}"(AutoQueueMessageListenerContainer) {
- // We manually start the listener once we have attached the
- // service in doWithApplicationContext.
- autoStartup = false
- channelTransacted = transactional
- connectionFactory = rabbitMQConnectionFactory
- concurrentConsumers = serviceConcurrentConsumers
- if (rabbitSubscribe instanceof Map) {
- exchangeBeanName = "grails.rabbit.exchange.${rabbitSubscribe.name}"
- routingKey = rabbitSubscribe.routingKey ?: '#'
- }
- else {
- exchangeBeanName = "grails.rabbit.exchange.${rabbitSubscribe}"
- }
- }
- } else {
- log.info("Not listening to ${service.clazz} it is disabled in configuration")
- }
- }
- }
- }
+ serviceConfigurer.configure(delegate)
}
def queuesConfig = application.config.rabbitmq?.queues
@@ -223,16 +174,9 @@ class RabbitmqGrailsPlugin {
}
def doWithApplicationContext = { applicationContext ->
- def rabbitTemplate = applicationContext.getBean('rabbitTemplate', RabbitTemplate.class)
def containerBeans = applicationContext.getBeansOfType(SimpleMessageListenerContainer)
containerBeans.each { beanName, bean ->
- if(beanName.endsWith(LISTENER_CONTAINER_SUFFIX)) {
- def adapter = new MessageListenerAdapter()
- def serviceName = beanName - LISTENER_CONTAINER_SUFFIX
- adapter.delegate = applicationContext.getBean(serviceName)
- adapter.messageConverter = rabbitTemplate.messageConverter
- bean.messageListener = adapter
-
+ if(isServiceListener(beanName)) {
// Now that the listener is properly configured, we can start it.
bean.start()
}
@@ -242,6 +186,34 @@ class RabbitmqGrailsPlugin {
def onChange = { evt ->
if(evt.source instanceof Class) {
addDynamicMessageSendingMethods ([evt.source], evt.ctx)
+
+ // If a service has changed, reload the associated beans
+ if(isServiceEventSource(application, evt.source)) {
+ def serviceGrailsClass = application.addArtefact(ServiceArtefactHandler.TYPE, evt.source)
+ def serviceConfigurer = new RabbitServiceConfigurer(
+ serviceGrailsClass,
+ application.config.rabbitmq)
+ if (serviceConfigurer.isListener()) {
+ def beans = beans {
+ serviceConfigurer.configure(delegate)
+ }
+ beans.registerBeans(evt.ctx)
+ startServiceListener(serviceGrailsClass.propertyName, evt.ctx)
+ }
+ }
}
}
+
+ protected isServiceListener(beanName) {
+ return beanName.endsWith(RabbitServiceConfigurer.LISTENER_CONTAINER_SUFFIX)
+ }
+
+ protected isServiceEventSource(application, source) {
+ return application.isArtefactOfType(ServiceArtefactHandler.TYPE, source)
+ }
+
+ protected startServiceListener(servicePropertyName, applicationContext) {
+ def beanName = servicePropertyName + RabbitServiceConfigurer.LISTENER_CONTAINER_SUFFIX
+ applicationContext.getBean(beanName).start()
+ }
}
View
@@ -1,5 +1,6 @@
#Grails Metadata file
-#Wed Jan 25 17:24:02 GMT 2012
-app.grails.version=2.0.0
+#Sun Mar 04 13:14:00 GMT 2012
+app.grails.version=2.0.1
app.name=rabbitmq
+plugins.release=1.0.1
plugins.svn=1.0.2
@@ -30,8 +30,5 @@ grails.project.dependency.resolution = {
}
plugins {
- build(":release:1.0.1") {
- export = false
- }
}
}
@@ -1,5 +1,7 @@
grails.doc.authors = 'Jeff Brown, Peter Ledbrook'
grails.doc.license = 'Apache License 2.0'
grails.doc.title = 'RabbitMQ Plugin'
+grails.doc.'api.org.springframework.amqp'='http://static.springsource.org/spring-amqp/docs/1.0.x/apidocs'
+
grails.views.default.codec="none" // none, html, base64
grails.views.gsp.encoding="UTF-8"
@@ -22,5 +22,6 @@ rabbitmq.connectionfactory.password | The password for connection to the server
rabbitmq.connectionfactory.hostname | The host name of the server | (none)
rabbitmq.connectionfactory.virtualHost | The name of the virtual host to connect to | '/'
rabbitmq.connectionfactory.channelCacheSize | The connection channel cache size | 10
-rabbitmq.concurrentConsumers | The number of concurrent consumers to create per message handler. Raising the number is recommendable in order to scale the consumption of messages coming in from a queue. Note that ordering guarantees are lost when multiple consumers are registered. | 1
+rabbitmq.concurrentConsumers | The number of concurrent consumers to create per message handler. Raising the number is recommended in order to scale the consumption of messages coming in from a queue. Note that ordering guarantees are lost when multiple consumers are registered. | 1
+rabbitmq.disableListening | Disables all service listeners so that they won't receive any messages. | false
{table}
@@ -0,0 +1,56 @@
+When you need fine-grained control over your service listeners, you can tap into the power of Spring. Since each service listener is implemented as a set of Spring beans, you can use Grails' [bean property override|http://grails.org/doc/latest/guide/spring.html#propertyOverrideConfiguration] mechanism to provide your own low-level settings.
+
+So how are these beans set up? If a service has either a @rabbitQueue@ or @rabbitSubscribe@ property, then you will have these beans:
+
+* @<serviceName>_MessageListenerContainer@ of type [SimpleMessageListenerContainer|api:org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer]
+* @<serviceName>RabbitAdapter@ of type [MessageListenerAdapter|api:org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter]
+
+As an example, let's say you have a @MessageStoreService@ like so:
+
+{code}
+class MessageStoreService {
+ static rabbitSubscribe = [exchange: "amq.topic", routingKey: "logs.#"]
+ ...
+}
+{code}
+
+You can then customise things like the number of concurrent consumers, whether the channel is transacted, what the prefetch count should be, and more! Simply add code like this to your runtime configuration (Config.groovy):
+
+{code}
+beans {
+ messageStoreService_MessageListenerContainer {
+ channelTransacted = false
+ concurrentConsumers = 10
+ prefetchCount = 5
+ queueNames = ["q1", "q2"] as String[]
+ }
+
+ messageStoreServiceRabbitAdapter {
+ encoding = "UTF-8"
+ responseRoutingKey = "replyQueue"
+ }
+}
+{code}
+
+This approach works for any property that accepts a basic type. But what about bean references? In this case, you can't use the bean property overrides. Fortunately, the most common bean reference you are likely to want to override, the message converter, has a dedicated configuration option:
+
+{code}
+rabbitmq.messageConverterBean = "myCustomMessageConverter"
+{code}
+
+This is a global setting that accepts the name of a message converter bean. For the rare occasions that you need to override other bean references, you can declare your own @<serviceName>_MessageListenerContainer@ or @<serviceName>_RabbitAdapter@ beans in resources.groovy.
+
+Finally, you can override some of the global config options on a per-service basis:
+
+{code}
+rabbitmq {
+ services {
+ messageStoreService {
+ concurrentConsumers = 50
+ disableListening = true
+ }
+ }
+}
+{code}
+
+There are many options for customisation and we hope the above will get you started.
@@ -68,6 +68,14 @@ This approach isn't limited to topic exchanges: you can automatically bind queue
# the 'binding' is ignored for fanout exchanges; and
# the headers exchange requires a map of message header names and values for its binding.
+{note}
+RabbitMQ has several built-in exchanges with names of the form 'amq.*', for example 'amq.direct'. If you want to bind to these, you currently have to declare them with the correct attributes, i.e.
+
+{code}
+exchange name: "amq.direct", type: direct, durable: true, autoDelete: false
+{code}
+{note}
+
As you can imagine, these few building blocks allow you to configure some pretty complex messaging systems with very little effort. You can tailor the messaging system to your needs rather than tailor your applications to the messaging system.
@@ -15,4 +15,12 @@ class DemoService {
As with the pub/sub model, messages are delivered to the service by invoking the @handleMessage()@ method. That's all there is to it! The real trick is to configure your exchanges and queues with appropriate bindings, as we described in the configuration section.
+If you want more say in the configuration of the underlying listener, then you can also specify a map:
+
+{code}
+static rabbitQueue = [queues: "someQueueName", channelTransacted: true]
+{code}
+
+The "queues" option can either be a simple queue name or a list of queue names. Again, have a look at the [advanced configuration section|guide:advancedConfig] for information about the extra properties you can set here.
+
One last subject to discuss is the form that the messages take.
@@ -30,3 +30,23 @@ class DemoService {
{code}
This is a great convenience, but be aware that using serializable Java objects limits the types of client you can interact with. If all the clients you're interested in are using Spring AMQP, then you should be fine, but don't expect Ruby or Python clients to handle @Map@ messages! For production systems, we recommend you use strings and byte arrays.
+
+Sometimes you want access to the raw message, particularly if you want to look at the message headers. If so, just change the signature of the @handleMessage()@ method and add an extra option to your @rabbitQueue@ or @rabbitSubscribe@ property:
+
+{code}
+package org.grails.rabbitmq.test
+
+import org.springframework.amqp.core.Message
+
+class DemoService {
+ static rabbitQueue = [queues: 'someQueueName', messageConverterBean: '']
+
+ void handleMessage(Message msg) {
+ // Do something with the message headers
+ println "Received message with content type ${msg.contentType};${msg.encoding}"
+ ...
+ }
+}
+{code}
+
+As you can see, all you have to do is accept an argument of type @Message@ and add the @messageConverterBean@ option with an empty string as its value. This disables the automatic message conversion, allowing you to interrogate the raw message as required.
@@ -40,3 +40,14 @@ In this example, the service will only receive messages that have a routing key
Under the hood, the plugin creates a temporary, exclusive queue for your service which is removed from the broker when your application shuts down. There is no way for you to control the name of the queue or attach another listener to it, but then that's the point in this case. If you do need more control, then you must manage the queues and their bindings yourself.
+The map syntax also allows you to customise the properties of the Spring message listener container and the corresponding listener adapter (see the section on [advanced configuration|guide:advancedConfig] for more details on these). For example,
+
+{code}
+static rabbitSubscribe = [
+ name: 'shares',
+ routingKey: 'NYSE.GE',
+ encoding: "ISO-8859-1",
+ prefetchCount: 1]
+{code}
+
+will set the encoding and prefetch count for just this service listener. This technique is also possible for straight queue listeners as well.
View
@@ -5,6 +5,7 @@ configuration:
title: Configuration
configuringQueues: Configuring Queues
configuringExchanges: Configuring Exchanges
+ advancedConfig: Advanced Configuration
sendingMessages: Sending Messages
consumingMessages:
title: Consuming Messages
@@ -2,6 +2,7 @@ package org.grails.rabbitmq
import org.slf4j.LoggerFactory
import org.springframework.amqp.core.BindingBuilder
+import org.springframework.amqp.core.DirectExchange
import org.springframework.amqp.core.FanoutExchange
import org.springframework.amqp.core.TopicExchange
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
@@ -51,10 +52,10 @@ class AutoQueueMessageListenerContainer extends SimpleMessageListenerContainer i
def binding = null
if (exchange instanceof FanoutExchange) {
- binding = BindingBuilder.bind(queue).to((FanoutExchange)exchange);
+ binding = BindingBuilder.bind(queue).to(exchange);
}
- else if (exchange instanceof TopicExchange) {
- binding = BindingBuilder.bind(queue).to((TopicExchange)exchange).with(routingKey);
+ else if (exchange instanceof DirectExchange || exchange instanceof TopicExchange) {
+ binding = BindingBuilder.bind(queue).to(exchange).with(routingKey);
}
else {
log.error "Cannot subscribe to an exchange ('${exchange.name}') that is neither a fanout nor a topic"
@@ -0,0 +1,16 @@
+package org.grails.rabbitmq
+
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import org.springframework.util.ErrorHandler
+
+/**
+ * A simple error handler that logs exceptions via SLF4J.
+ */
+class RabbitErrorHandler implements ErrorHandler {
+ private static final Logger log = LoggerFactory.getLogger(this)
+
+ void handleError(Throwable t) {
+ log.error "Rabbit service listener failed.", t
+ }
+}
Oops, something went wrong.

0 comments on commit 600ed18

Please sign in to comment.