Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Move guts of global queue service to polyglot.

Also, just warn on reconfigure, don't throw.
  • Loading branch information...
commit 9da5a646564d52b8b73fa21e9e9983da7dad7fb4 1 parent 57c3866
Toby Crawley authored March 06, 2013
31  integration-tests/apps/messaging/queues/test/queues/reconfigure.clj
@@ -2,21 +2,34 @@
2 2
   (:use clojure.test)
3 3
   (:require [immutant.messaging :as msg]
4 4
             [immutant.util :as util]
5  
-            [immutant.registry :as reg])
  5
+            [immutant.registry :as reg]
  6
+            [clojure.java.io :as io])
6 7
   (:import org.jboss.msc.service.ServiceName))
7 8
 
8 9
 (defn get-from-msc [name]
9  
-  (.getService (deref (deref #'reg/msc-registry)) (ServiceName/parse name)))
  10
+  (->> name
  11
+       ServiceName/parse
  12
+       (.getService (deref (deref #'reg/msc-registry)))
  13
+       .getService))
10 14
 
11 15
 (deftest reconfigure-on-a-stopped-queue-should-work
12 16
   (msg/stop "queue.reconfigurable")
13 17
   (msg/start "queue.reconfigurable" :selector "foo=1")
14  
-  (let [service (-> "jboss.messaging.default.jms.queue.\"queue.reconfigurable\""
15  
-                    get-from-msc
16  
-                    .getService)]
17  
-    (is (= "foo=1" (.getSelector service)))))
  18
+  (is
  19
+   (= "foo=1"
  20
+      (-> "jboss.messaging.default.jms.queue.\"queue.reconfigurable\""
  21
+          get-from-msc
  22
+          .getSelector))))
18 23
 
19  
-(deftest reconfigure-on-a-running-queue-should-throw
  24
+(deftest reconfigure-on-a-running-queue-should-warn
20 25
   (msg/stop "queue.not-reconfigurable")
21  
-  (is (thrown? IllegalStateException
22  
-               (msg/start "queue.not-reconfigurable" :selector "foo=1"))))
  26
+  (msg/start "queue.not-reconfigurable" :selector "foo=1")
  27
+  (is
  28
+   (re-find #"WARN.* queue\.not-reconfigurable .*currently active"
  29
+            (slurp (io/file (System/getProperty "jboss.server.log.dir")
  30
+                            "server.log"))))
  31
+  (is
  32
+   (= ""
  33
+    (-> "jboss.messaging.default.jms.queue.\"queue.not-reconfigurable\""
  34
+        get-from-msc
  35
+        .getSelector))))
54  modules/messaging/src/main/java/org/immutant/messaging/DestinationService.java
... ...
@@ -1,54 +0,0 @@
1  
-/*
2  
- * Copyright 2008-2013 Red Hat, Inc, and individual contributors.
3  
- * 
4  
- * This is free software; you can redistribute it and/or modify it
5  
- * under the terms of the GNU Lesser General Public License as
6  
- * published by the Free Software Foundation; either version 2.1 of
7  
- * the License, or (at your option) any later version.
8  
- * 
9  
- * This software is distributed in the hope that it will be useful,
10  
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  
- * Lesser General Public License for more details.
13  
- * 
14  
- * You should have received a copy of the GNU Lesser General Public
15  
- * License along with this software; if not, write to the Free
16  
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
17  
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
18  
- */
19  
-
20  
-package org.immutant.messaging;
21  
-
22  
-import org.immutant.core.SimpleServiceStateListener;
23  
-import org.immutant.runtime.ClojureRuntime;
24  
-import org.jboss.msc.service.Service;
25  
-import org.jboss.msc.service.StartContext;
26  
-import org.jboss.msc.service.StartException;
27  
-import org.jboss.msc.service.StopContext;
28  
-
29  
-public class DestinationService implements Service<Void> {
30  
-    public DestinationService(String queueName, ClojureRuntime runtime, Object callback) {
31  
-        this.runtime = runtime;
32  
-        this.callback = callback;
33  
-    }
34  
-    
35  
-    @SuppressWarnings("unchecked")
36  
-    @Override 
37  
-    public synchronized void start(StartContext context) throws StartException {
38  
-        context.getController().addListener( new SimpleServiceStateListener( this.runtime,
39  
-                                                                             this.callback ) );
40  
-    }
41  
-
42  
-    @Override
43  
-    public Void getValue() throws IllegalStateException,
44  
-            IllegalArgumentException {
45  
-        return null;
46  
-    }
47  
-
48  
-    @Override
49  
-    public void stop(StopContext context) {
50  
-    }
51  
-
52  
-    private ClojureRuntime runtime;
53  
-    private Object callback;
54  
-}
93  modules/messaging/src/main/java/org/immutant/messaging/Destinationizer.java
@@ -24,14 +24,12 @@
24 24
 
25 25
 import org.immutant.core.HasImmutantRuntimeInjector;
26 26
 import org.immutant.core.SimpleServiceStateListener;
27  
-import org.immutant.messaging.as.MessagingServices;
28 27
 import org.immutant.runtime.ClojureRuntime;
29 28
 import org.jboss.as.server.deployment.DeploymentUnit;
30 29
 import org.jboss.logging.Logger;
31 30
 import org.jboss.msc.inject.Injector;
32 31
 import org.jboss.msc.service.ServiceController;
33 32
 import org.jboss.msc.service.ServiceController.Mode;
34  
-import org.jboss.msc.service.ServiceController.State;
35 33
 import org.jboss.msc.service.ServiceName;
36 34
 import org.jboss.msc.service.ServiceRegistry;
37 35
 import org.jboss.msc.service.ServiceTarget;
@@ -39,8 +37,6 @@
39 37
 import org.projectodd.polyglot.core.AtRuntimeInstaller;
40 38
 import org.projectodd.polyglot.messaging.destinations.DestinationUtils;
41 39
 import org.projectodd.polyglot.messaging.destinations.Destroyable;
42  
-import org.projectodd.polyglot.messaging.destinations.DestroyableJMSQueueService;
43  
-import org.projectodd.polyglot.messaging.destinations.DestroyableJMSTopicService;
44 40
 import org.projectodd.polyglot.messaging.destinations.processors.QueueInstaller;
45 41
 import org.projectodd.polyglot.messaging.destinations.processors.TopicInstaller;
46 42
 
@@ -51,69 +47,32 @@ public Destinationizer(DeploymentUnit unit, ServiceTarget globalServiceTarget) {
51 47
         super( unit, globalServiceTarget );
52 48
     }
53 49
 
54  
-    @SuppressWarnings("rawtypes")
55 50
     public boolean createQueue(final String queueName, final boolean durable, final String selector, Object callback) {
56  
-        if (destinationExists( queueName )) {
  51
+        if (DestinationUtils.destinationPointerExists(getUnit(), queueName)) {
57 52
             return false;
58 53
         }
59 54
                             
60  
-        ServiceName globalQServiceName = QueueInstaller.queueServiceName( queueName );
61  
-        ServiceController globalQService = 
62  
-                getUnit().getServiceRegistry().getService( globalQServiceName );
63  
-        
64  
-        if (globalQService == null) {
65  
-            deployGlobalQueue(queueName, durable, selector);
66  
-        } else {
67  
-            //handle reconfiguration of an existing queue
68  
-            DestroyableJMSQueueService actual = (DestroyableJMSQueueService)globalQService.getService();
69  
-            if (actual.isDurable() != durable || 
70  
-                    !actual.getSelector().equals( selector )) {
71  
-                String from = "durable: " + actual.isDurable() + ", selector: " + actual.getSelector();
72  
-                String to = "durable: " + durable + ", selector: " + selector;
73  
-                State currentState = globalQService.getState();     
74  
-                if (currentState == State.DOWN || 
75  
-                        currentState == State.STOPPING) {
76  
-                    log.info("Reconfiguring " + queueName + " from " + from + " to " + to);
77  
-                    replaceService(globalQServiceName, new Runnable() {
78  
-                        public void run() {
79  
-                            deployGlobalQueue(queueName, durable, selector);
80  
-                        }
81  
-                    });
82  
-                } else {
83  
-                    throw new IllegalStateException("Can't reconfigure " + queueName + " from " + from 
84  
-                                                    + " to " + to + " - it has already been configured");
85  
-                }
86  
-            }   
87  
-        }
88  
-    
89  
-        createDestinationService(queueName, callback, globalQServiceName);
  55
+        createDestinationService(queueName, callback,
  56
+                                 QueueInstaller.deployGlobalQueue(getUnit().getServiceRegistry(), 
  57
+                                                                  getGlobalTarget(),
  58
+                                                                  queueName, 
  59
+                                                                  durable,   
  60
+                                                                  selector, 
  61
+                                                                  DestinationUtils.jndiNames( queueName, false )));
90 62
         
91 63
         return true;
92 64
     }
93 65
     
94  
-    protected ServiceName deployGlobalQueue(String queueName, boolean durable, String selector) {
95  
-        return QueueInstaller.deploy(getGlobalTarget(), 
96  
-                                     new DestroyableJMSQueueService(queueName, selector, durable, 
97  
-                                                                        new String[] { DestinationUtils.jndiName( queueName ) }),
98  
-                                     queueName,    
99  
-                                     Mode.ON_DEMAND); 
100  
-    }
101  
-    
102 66
     public boolean createTopic(String topicName, Object callback) {
103  
-        if (destinationExists( topicName )) {
  67
+        if (DestinationUtils.destinationPointerExists(getUnit(), topicName)) {
104 68
             return false;
105 69
         } 
106 70
 
107  
-        ServiceName globalTopicServiceName = TopicInstaller.topicServiceName( topicName );
108  
-        if (getUnit().getServiceRegistry().getService( globalTopicServiceName ) == null) {
109  
-            TopicInstaller.deploy(getGlobalTarget(), 
110  
-                                  new DestroyableJMSTopicService(topicName,  
111  
-                                                                    new String[] { DestinationUtils.jndiName( topicName ) }),
112  
-                                  topicName, 
113  
-                                  Mode.ON_DEMAND); 
114  
-        }
115  
-    
116  
-        createDestinationService(topicName, callback, globalTopicServiceName);
  71
+        createDestinationService(topicName, callback, 
  72
+                                 TopicInstaller.deployGlobalTopic(getUnit().getServiceRegistry(),
  73
+                                                                  getGlobalTarget(),
  74
+                                                                  topicName,
  75
+                                                                  DestinationUtils.jndiNames( topicName, false )));
117 76
 
118 77
         return true;
119 78
     }
@@ -158,25 +117,15 @@ public boolean destroyDestination(String name, Object callback) {
158 117
     }
159 118
 
160 119
     protected void createDestinationService(String destName, Object callback, ServiceName globalName) {
161  
-        DestinationService service = 
162  
-                new DestinationService(destName,
163  
-                                       this.clojureRuntimeInjector.getValue(),
164  
-                                       callback);
165  
-                
166  
-        ServiceName serviceName = MessagingServices.destinationPointer(getUnit(), destName);
167  
-        
168  
-        build(serviceName, service, false)
169  
-            .addDependency( globalName )
170  
-            .install();
171  
-            
172  
-        this.destinations.put( destName, serviceName );
173  
-        
  120
+        this.destinations.put(destName, 
  121
+                              DestinationUtils.deployDestinationPointerService(getUnit(), 
  122
+                                                                               getTarget(), 
  123
+                                                                               destName, 
  124
+                                                                               globalName, 
  125
+                                                                               new SimpleServiceStateListener(this.clojureRuntimeInjector.getValue(), 
  126
+                                                                                                              callback)));
174 127
     }
175 128
     
176  
-    protected boolean destinationExists(String name) {
177  
-        return (getUnit().getServiceRegistry().getService(MessagingServices.destinationPointer(getUnit(), name)) 
178  
-                != null);
179  
-    }
180 129
     
181 130
     @Override
182 131
     public Injector<ClojureRuntime> getClojureRuntimeInjector() {
2  modules/messaging/src/main/java/org/immutant/messaging/MessageProcessorGroupizer.java
@@ -50,7 +50,7 @@ public MessageProcessorGroup createGroup(final String destinationName, final boo
50 50
                                                  final boolean durable, final String handlerName, final XAConnection connection, final Object setupHandler, 
51 51
                                                  final Object startCallback) {
52 52
         
53  
-        final ServiceName pointerDestName = MessagingServices.destinationPointer(getUnit(), destinationName);
  53
+        final ServiceName pointerDestName = DestinationUtils.destinationPointerName(getUnit(), destinationName);
54 54
         ServiceController pointerDest = getUnit().getServiceRegistry().getService( pointerDestName );
55 55
         if (pointerDest == null ||
56 56
                !inValidState(pointerDest.getState())) {
3  modules/messaging/src/main/java/org/immutant/messaging/as/MessagingServices.java
@@ -44,7 +44,4 @@ public static ServiceName messageProcessor(DeploymentUnit unit, String processor
44 44
         return unit.getServiceName().append( MESSAGING ).append(  processorName );
45 45
     }
46 46
     
47  
-    public static ServiceName destinationPointer(DeploymentUnit unit, String name) {
48  
-        return unit.getServiceName().append( MESSAGING ).append( "destination-pointer" ).append( name );
49  
-    }
50 47
 }
2  pom.xml
@@ -344,7 +344,7 @@
344 344
   </dependencyManagement>
345 345
 
346 346
   <properties>
347  
-    <version.polyglot>1.x.incremental.13</version.polyglot>
  347
+    <version.polyglot>1.x.incremental.14</version.polyglot>
348 348
     <version.junit>4.7</version.junit>
349 349
     <version.jmock>2.5.1</version.jmock>
350 350
     <version.mockito>1.8.4</version.mockito>

0 notes on commit 9da5a64

Please sign in to comment.
Something went wrong with that request. Please try again.