diff --git a/integration-tests/apps/messaging/in_container/src/in_container/tests.clj b/integration-tests/apps/messaging/in_container/src/in_container/tests.clj index 26c631fe..cbc55020 100644 --- a/integration-tests/apps/messaging/in_container/src/in_container/tests.clj +++ b/integration-tests/apps/messaging/in_container/src/in_container/tests.clj @@ -1,15 +1,14 @@ (ns in-container.tests (:use clojure.test) (:require [immutant.messaging :as msg] - [immutant.registry :as reg]) - (:import javax.jms.JMSException)) + [immutant.registry :as reg])) (deftest listen-on-a-queue-should-raise-for-non-existent-destinations - (is (thrown? JMSException + (is (thrown? IllegalStateException (msg/listen "a.non-existent.queue" (constantly nil))))) (deftest listen-on-a-topic-should-raise-for-non-existent-destinations - (is (thrown? JMSException + (is (thrown? IllegalStateException (msg/listen "a.non-existent.topic" (constantly nil))))) (deftest queue-start-should-be-synchronous @@ -28,16 +27,37 @@ (let [queue "queue.stop.sync"] (msg/start queue) (msg/stop queue) - (is (thrown? JMSException + (is (thrown? IllegalStateException (msg/listen queue (constantly true)))))) (deftest topic-stop-should-be-synchronous (let [topic "topic.stop.sync"] (msg/start topic) (msg/stop topic) - (is (thrown? JMSException + (is (thrown? IllegalStateException (msg/listen topic (constantly true)))))) +(deftest queue-start-should-be-idempotent + (let [queue "queue.id"] + (msg/start queue) + (try + (msg/start queue) + (is true) + (catch Exception e + (.printStackTrace e) + (is false))))) + + +(deftest topic-start-should-be-idempotent + (let [topic "topic.id"] + (msg/start topic) + (try + (msg/start topic) + (is true) + (catch Exception e + (.printStackTrace e) + (is false))))) + (deftest unlisten-on-a-queue-should-be-synchronous (let [queue "queue.ham"] (msg/start queue) diff --git a/modules/messaging/src/main/clojure/immutant/messaging.clj b/modules/messaging/src/main/clojure/immutant/messaging.clj index a7d35978..4f9d57a0 100644 --- a/modules/messaging/src/main/clojure/immutant/messaging.clj +++ b/modules/messaging/src/main/clojure/immutant/messaging.clj @@ -132,6 +132,7 @@ to be set) [nil]" [name-or-dest f & {:keys [concurrency decode?] :or {concurrency 1 decode? true} :as opts}] (let [connection (create-connection opts) + dest-name (destination-name name-or-dest) setup-fn (fn [] (let [session (create-session connection) destination (create-destination session name-or-dest)] @@ -143,9 +144,8 @@ (if-let [izer (registry/get "message-processor-groupizer")] ;; in-container - (when (destination-exists? connection name-or-dest) + (if (destination-exists? connection dest-name) (let [complete (promise) - dest-name (destination-name name-or-dest) group (.createGroup izer dest-name false ;; TODO: singleton @@ -157,7 +157,8 @@ #(deliver complete %))] (if (= "up" (deref complete 5000 nil)) group - (log/error "Failed to setup listener for" dest-name)))) + (log/error "Failed to setup listener for" dest-name))) + (throw (IllegalStateException. (str "Destination " dest-name " does not exist.")))) ;; out of container (try diff --git a/modules/messaging/src/main/clojure/immutant/messaging/core.clj b/modules/messaging/src/main/clojure/immutant/messaging/core.clj index f2ee25f8..c7aa089d 100644 --- a/modules/messaging/src/main/clojure/immutant/messaging/core.clj +++ b/modules/messaging/src/main/clojure/immutant/messaging/core.clj @@ -16,9 +16,8 @@ ;; 02110-1301 USA, or see the FSF site: http://www.fsf.org. (ns immutant.messaging.core - (:use [immutant.utilities :only (at-exit)] - [immutant.try :only (try-if)]) - (:import (javax.jms DeliveryMode Destination Queue Session Topic)) + (:use [immutant.utilities :only (at-exit)]) + (:import (javax.jms DeliveryMode Destination JMSException Queue Session Topic)) (:require [immutant.registry :as registry] [immutant.messaging.hornetq :as hornetq] [immutant.xa.transaction :as tx] @@ -77,6 +76,9 @@ javax.jms.Queue (.getQueueName name-or-dest) javax.jms.Topic (.getTopicName name-or-dest))) +(defn jms-name [dest-name] + (str "jms." (if (queue? dest-name) "queue." "topic.") dest-name)) + (defn wash-publish-options "Wash publish options relative to default values from a producer" [opts ^javax.jms.MessageProducer producer] @@ -152,7 +154,7 @@ (if force (stop-destination name) (if-let [default (registry/get "jboss.messaging.default")] - (if-let [queue (.getResource (.getManagementService default) (str "jms.queue." name))] + (if-let [queue (.getResource (.getManagementService default) (jms-name name))] (cond (and (.isDurable queue) (< 0 (.getMessageCount queue))) (log/warn "Won't stop non-empty durable queue:" name) (< 0 (.getConsumerCount queue)) (throw (IllegalStateException. "Can't stop queue with active consumers")) @@ -163,7 +165,7 @@ (if force (stop-destination name) (if-let [default (registry/get "jboss.messaging.default")] - (if-let [topic (.getResource (.getManagementService default) (str "jms.topic." name))] + (if-let [topic (.getResource (.getManagementService default) (jms-name name))] (condp > 0 (.getMessageCount topic) (log/warn "Won't stop topic with messages for durable subscribers:" name) (.getSubscriptionCount topic) (throw (IllegalStateException. "Can't stop topic with active subscribers")) @@ -172,34 +174,23 @@ (defn start-queue [name & {:keys [durable selector] :or {durable true selector ""}}] (if-let [izer (registry/get "destinationizer")] - - ;; in-container - (let [complete (promise)] - (.createQueue izer name durable selector #(deliver complete %)) - (if-not (= "up" (deref complete 5000 nil)) - (throw (Exception. (str "Unable to start queue: " name))))) - - - ;; outside container - (if-let [manager (registry/get "jboss.messaging.default.jms.manager")] - (if (.createQueue manager false name selector durable (into-array String [])) - (at-exit #(stop-destination name))) - (throw (Exception. (str "Unable to start queue: " name)))))) + (let [complete (promise) + service-created (.createQueue izer name durable selector #(deliver complete %))] + (if service-created + (when (not= "up" (deref complete 5000 nil)) + (throw (Exception. (str "Unable to start queue: " name)))) + (log/info "Queue already exists:" name))) + (throw (Exception. (str "Unable to start queue: " name))))) (defn start-topic [name & opts] (if-let [izer (registry/get "destinationizer")] - - ;; in-container - (let [complete (promise)] - (.createTopic izer name #(deliver complete %)) - (if-not (= "up" (deref complete 5000 nil)) - (throw (Exception. (str "Unable to start topic: " name))))) - - ;; outside container - (if-let [manager (registry/get "jboss.messaging.default.jms.manager")] - (if (.createTopic manager false name (into-array String [])) - (at-exit #(stop-destination name))) - (throw (Exception. (str "Unable to start topic: " name)))))) + (let [complete (promise) + service-created (.createTopic izer name #(deliver complete %))] + (if service-created + (when (not= "up" (deref complete 5000 nil)) + (throw (Exception. (str "Unable to start topic: " name)))) + (log/info "Topic already exists:" name))) + (throw (Exception. (str "Unable to start topic: " name))))) (defn create-session [^javax.jms.XAConnection connection] (if (registry/get factory-name) @@ -222,9 +213,11 @@ (.createConsumer session destination selector))) (defn destination-exists? - ([connection name-or-dest] - (with-open [session (create-session connection)] - (create-destination session name-or-dest)))) + [connection name-or-dest] + (try + (with-open [session (create-session connection)] + (create-destination session name-or-dest)) + (catch JMSException _))) (defn enlist-session "Enlist a session in the current transaction, if any" @@ -279,7 +272,6 @@ (onMessage [_ message] (handler message)))) - ;; TODO: This is currently unused and, if deemed necessary, could ;; probably be better implemented (defn wait-for-destination diff --git a/modules/messaging/src/main/java/org/immutant/messaging/Destinationizer.java b/modules/messaging/src/main/java/org/immutant/messaging/Destinationizer.java index 66f6277c..efd1188e 100644 --- a/modules/messaging/src/main/java/org/immutant/messaging/Destinationizer.java +++ b/modules/messaging/src/main/java/org/immutant/messaging/Destinationizer.java @@ -42,22 +42,34 @@ public Destinationizer(DeploymentUnit unit) { super( unit ); } - public void createQueue(String queueName, boolean durable, String selector, Object callback) { + public boolean createQueue(String queueName, boolean durable, String selector, Object callback) { + if (this.destinations.containsKey( queueName )) { + return false; + } + QueueService service = new QueueService( queueName, selector, durable, this.clojureRuntimeInjector.getValue(), callback ); - + this.destinations.put( queueName, QueueInstaller.deploy( getTarget(), service, queueName ) ); + + return true; } - public void createTopic(String topicName, Object callback) { + public boolean createTopic(String topicName, Object callback) { + if (this.destinations.containsKey( topicName )) { + return false; + } + TopicService service = new TopicService( topicName, this.clojureRuntimeInjector.getValue(), callback ); this.destinations.put( topicName, TopicInstaller.deploy( getTarget(), service, topicName ) ); + + return true; } @SuppressWarnings({ "rawtypes", "unchecked" }) diff --git a/modules/messaging/src/test/clojure/test/immutant/messaging.clj b/modules/messaging/src/test/clojure/test/immutant/messaging.clj index bd9a8023..b83ac7fa 100644 --- a/modules/messaging/src/test/clojure/test/immutant/messaging.clj +++ b/modules/messaging/src/test/clojure/test/immutant/messaging.clj @@ -20,11 +20,10 @@ clojure.test) (:require [immutant.registry :as registry])) -(def dummies { "jboss.messaging.default.jms.manager" - (proxy [org.hornetq.jms.server.JMSServerManager][] - (createQueue [& _] true) - (createTopic [& _] true)) - "housekeeper" nil}) +(def dummies { "destinationizer" + (proxy [org.immutant.messaging.Destinationizer] [nil] + (createQueue [& _] false) + (createTopic [& _] false))}) (defn test-already-running [destination] (let [names (atom (keys dummies))]