Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

GPRABBITMQ-30 - retryAttempts is now configurable

  • Loading branch information...
commit e8c0a2db3a716cab43ddf5aa9be0d4eb4f052796 1 parent c935b23
Jeff Scott Brown authored April 05, 2012
52  RabbitmqGrailsPlugin.groovy
... ...
@@ -1,5 +1,6 @@
1 1
 import org.codehaus.groovy.grails.commons.ServiceArtefactHandler
2 2
 import org.grails.rabbitmq.AutoQueueMessageListenerContainer
  3
+import org.grails.rabbitmq.RabbitConfigurationHolder
3 4
 import org.grails.rabbitmq.RabbitDynamicMethods
4 5
 import org.grails.rabbitmq.RabbitErrorHandler
5 6
 import org.grails.rabbitmq.RabbitQueueBuilder
@@ -8,10 +9,25 @@ import org.springframework.amqp.core.Binding
8 9
 import org.springframework.amqp.core.Queue
9 10
 import org.springframework.amqp.rabbit.core.RabbitAdmin
10 11
 import org.springframework.amqp.rabbit.core.RabbitTemplate
11  
-import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
12 12
 import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter
  13
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer
  14
+import org.springframework.retry.backoff.FixedBackOffPolicy;
  15
+//import org.springframework.retry.interceptor.StatefulRetryOperationsInterceptor;
  16
+import org.springframework.retry.policy.SimpleRetryPolicy;
  17
+import org.springframework.retry.support.RetryTemplate;
13 18
 import static org.springframework.amqp.core.Binding.DestinationType.QUEUE
14  
-import org.grails.rabbitmq.RabbitConfigurationHolder
  19
+import org.springframework.amqp.rabbit.config.StatefulRetryOperationsInterceptorFactoryBean
  20
+import org.aopalliance.aop.Advice
  21
+import java.io.ByteArrayOutputStream;
  22
+import org.springframework.amqp.support.converter.SimpleMessageConverter
  23
+import java.io.PrintStream;
  24
+
  25
+import org.slf4j.Logger;
  26
+import org.slf4j.LoggerFactory;
  27
+import org.springframework.amqp.core.AmqpTemplate;
  28
+import org.springframework.amqp.core.Message;
  29
+import org.springframework.amqp.rabbit.retry.MessageRecoverer;
  30
+import org.springframework.beans.factory.annotation.Autowired;
15 31
 
16 32
 class RabbitmqGrailsPlugin {
17 33
     // the plugin version
@@ -85,7 +101,13 @@ class RabbitmqGrailsPlugin {
85 101
             }
86 102
             rabbitTemplate(RabbitTemplate) {
87 103
                 connectionFactory = rabbitMQConnectionFactory
88  
-                if (messageConverterBean) messageConverter = ref(messageConverterBean)
  104
+                if (messageConverterBean) {
  105
+                     messageConverter = ref(messageConverterBean)
  106
+                 } else {
  107
+                     def converter = new SimpleMessageConverter()
  108
+                     converter.createMessageIds = true
  109
+                     messageConverter = converter
  110
+                 }
89 111
             }
90 112
             adm(RabbitAdmin, rabbitMQConnectionFactory)
91 113
             rabbitErrorHandler(RabbitErrorHandler)
@@ -158,6 +180,28 @@ class RabbitmqGrailsPlugin {
158 180
                     "grails.rabbit.binding.${binding.exchange}.${binding.queue}"(Binding, binding.queue, QUEUE, binding.exchange, binding.rule, binding.arguments )
159 181
                 }
160 182
             }
  183
+            rabbitRetryHandler(StatefulRetryOperationsInterceptorFactoryBean) {
  184
+                def retryPolicy = new SimpleRetryPolicy()
  185
+                def maxRetryAttempts = 0
  186
+                if(rabbitmqConfig?.retryPolicy?.containsKey('maxAttempts')) {
  187
+                    def maxAttemptsConfigValue = rabbitmqConfig.retryPolicy.maxAttempts
  188
+                    if(maxAttemptsConfigValue instanceof Integer) {
  189
+                        maxRetryAttempts = maxAttemptsConfigValue
  190
+                    } else {
  191
+                        log.error "rabbitmq.retryPolicy.maxAttempts [$maxAttemptsConfigValue] of type [${maxAttemptsConfigValue.getClass().getName()}] is not an Integer and will be ignored.  The default value of [${maxRetryAttempts}] will be used"
  192
+                    }
  193
+                }
  194
+                retryPolicy.maxAttempts = maxRetryAttempts
  195
+                
  196
+                def backOffPolicy = new FixedBackOffPolicy()
  197
+                backOffPolicy.backOffPeriod = 5000
  198
+                
  199
+                def retryTemplate = new RetryTemplate()
  200
+                retryTemplate.retryPolicy  = retryPolicy
  201
+                retryTemplate.backOffPolicy = backOffPolicy
  202
+                
  203
+                retryOperations = retryTemplate
  204
+            }
161 205
         }   
162 206
     }
163 207
     
@@ -175,8 +219,10 @@ class RabbitmqGrailsPlugin {
175 219
 
176 220
     def doWithApplicationContext = { applicationContext ->
177 221
         def containerBeans = applicationContext.getBeansOfType(SimpleMessageListenerContainer)
  222
+        applicationContext.rabbitTemplate.messageConverter.createMessageIds = true
178 223
         containerBeans.each { beanName, bean ->
179 224
             if(isServiceListener(beanName)) {
  225
+                bean.adviceChain = [applicationContext.rabbitRetryHandler] as Advice[]
180 226
                 // Now that the listener is properly configured, we can start it.
181 227
                 bean.start()
182 228
             }
1  grails-app/conf/BuildConfig.groovy
@@ -27,6 +27,7 @@ grails.project.dependency.resolution = {
27 27
                      'slf4j-log4j12',
28 28
                      'log4j'
29 29
         }
  30
+        runtime "org.springframework.retry:spring-retry:1.0.0.RELEASE"
30 31
     }
31 32
 
32 33
     plugins {
1  src/docs/guide/configuration.gdoc
@@ -24,4 +24,5 @@ rabbitmq.connectionfactory.virtualHost | The name of the virtual host to connect
24 24
 rabbitmq.connectionfactory.channelCacheSize | The connection channel cache size | 10
25 25
 rabbitmq.concurrentConsumers | The number of concurrent consumers to create per message handler.  Raising the number is recommended in order to scale the consumption of messages coming in from a queue.  Note that ordering guarantees are lost when multiple consumers are registered. | 1
26 26
 rabbitmq.disableListening | Disables all service listeners so that they won't receive any messages. | false
  27
+rabbitmq.retryPolicy.maxAttempts | Sets the maximum number of retries for failed message deliveries | 0
27 28
 {table}

0 notes on commit e8c0a2d

Please sign in to comment.
Something went wrong with that request. Please try again.