Skip to content

Commit

Permalink
Fix msg/start's idempotency. [IMMUTANT-163]
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias committed Nov 5, 2012
1 parent a104354 commit d0d5b89
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 51 deletions.
@@ -1,15 +1,14 @@
(ns in-container.tests
(:use clojure.test)
(:require [immutant.messaging :as msg]
[immutant.registry :as reg])
(:import javax.jms.JMSException))
[immutant.registry :as reg]))

(deftest listen-on-a-queue-should-raise-for-non-existent-destinations
(is (thrown? JMSException
(is (thrown? IllegalStateException
(msg/listen "a.non-existent.queue" (constantly nil)))))

(deftest listen-on-a-topic-should-raise-for-non-existent-destinations
(is (thrown? JMSException
(is (thrown? IllegalStateException
(msg/listen "a.non-existent.topic" (constantly nil)))))

(deftest queue-start-should-be-synchronous
Expand All @@ -28,16 +27,37 @@
(let [queue "queue.stop.sync"]
(msg/start queue)
(msg/stop queue)
(is (thrown? JMSException
(is (thrown? IllegalStateException
(msg/listen queue (constantly true))))))

(deftest topic-stop-should-be-synchronous
(let [topic "topic.stop.sync"]
(msg/start topic)
(msg/stop topic)
(is (thrown? JMSException
(is (thrown? IllegalStateException
(msg/listen topic (constantly true))))))

(deftest queue-start-should-be-idempotent
(let [queue "queue.id"]
(msg/start queue)
(try
(msg/start queue)
(is true)
(catch Exception e
(.printStackTrace e)
(is false)))))


(deftest topic-start-should-be-idempotent
(let [topic "topic.id"]
(msg/start topic)
(try
(msg/start topic)
(is true)
(catch Exception e
(.printStackTrace e)
(is false)))))

(deftest unlisten-on-a-queue-should-be-synchronous
(let [queue "queue.ham"]
(msg/start queue)
Expand Down
7 changes: 4 additions & 3 deletions modules/messaging/src/main/clojure/immutant/messaging.clj
Expand Up @@ -132,6 +132,7 @@
to be set) [nil]"
[name-or-dest f & {:keys [concurrency decode?] :or {concurrency 1 decode? true} :as opts}]
(let [connection (create-connection opts)
dest-name (destination-name name-or-dest)
setup-fn (fn []
(let [session (create-session connection)
destination (create-destination session name-or-dest)]
Expand All @@ -143,9 +144,8 @@
(if-let [izer (registry/get "message-processor-groupizer")]

;; in-container
(when (destination-exists? connection name-or-dest)
(if (destination-exists? connection dest-name)
(let [complete (promise)
dest-name (destination-name name-or-dest)
group (.createGroup izer
dest-name
false ;; TODO: singleton
Expand All @@ -157,7 +157,8 @@
#(deliver complete %))]
(if (= "up" (deref complete 5000 nil))
group
(log/error "Failed to setup listener for" dest-name))))
(log/error "Failed to setup listener for" dest-name)))
(throw (IllegalStateException. (str "Destination " dest-name " does not exist."))))

;; out of container
(try
Expand Down
60 changes: 26 additions & 34 deletions modules/messaging/src/main/clojure/immutant/messaging/core.clj
Expand Up @@ -16,9 +16,8 @@
;; 02110-1301 USA, or see the FSF site: http://www.fsf.org.

(ns immutant.messaging.core
(:use [immutant.utilities :only (at-exit)]
[immutant.try :only (try-if)])
(:import (javax.jms DeliveryMode Destination Queue Session Topic))
(:use [immutant.utilities :only (at-exit)])
(:import (javax.jms DeliveryMode Destination JMSException Queue Session Topic))
(:require [immutant.registry :as registry]
[immutant.messaging.hornetq :as hornetq]
[immutant.xa.transaction :as tx]
Expand Down Expand Up @@ -77,6 +76,9 @@
javax.jms.Queue (.getQueueName name-or-dest)
javax.jms.Topic (.getTopicName name-or-dest)))

(defn jms-name [dest-name]
(str "jms." (if (queue? dest-name) "queue." "topic.") dest-name))

(defn wash-publish-options
"Wash publish options relative to default values from a producer"
[opts ^javax.jms.MessageProducer producer]
Expand Down Expand Up @@ -152,7 +154,7 @@
(if force
(stop-destination name)
(if-let [default (registry/get "jboss.messaging.default")]
(if-let [queue (.getResource (.getManagementService default) (str "jms.queue." name))]
(if-let [queue (.getResource (.getManagementService default) (jms-name name))]
(cond
(and (.isDurable queue) (< 0 (.getMessageCount queue))) (log/warn "Won't stop non-empty durable queue:" name)
(< 0 (.getConsumerCount queue)) (throw (IllegalStateException. "Can't stop queue with active consumers"))
Expand All @@ -163,7 +165,7 @@
(if force
(stop-destination name)
(if-let [default (registry/get "jboss.messaging.default")]
(if-let [topic (.getResource (.getManagementService default) (str "jms.topic." name))]
(if-let [topic (.getResource (.getManagementService default) (jms-name name))]
(condp > 0
(.getMessageCount topic) (log/warn "Won't stop topic with messages for durable subscribers:" name)
(.getSubscriptionCount topic) (throw (IllegalStateException. "Can't stop topic with active subscribers"))
Expand All @@ -172,34 +174,23 @@

(defn start-queue [name & {:keys [durable selector] :or {durable true selector ""}}]
(if-let [izer (registry/get "destinationizer")]

;; in-container
(let [complete (promise)]
(.createQueue izer name durable selector #(deliver complete %))
(if-not (= "up" (deref complete 5000 nil))
(throw (Exception. (str "Unable to start queue: " name)))))


;; outside container
(if-let [manager (registry/get "jboss.messaging.default.jms.manager")]
(if (.createQueue manager false name selector durable (into-array String []))
(at-exit #(stop-destination name)))
(throw (Exception. (str "Unable to start queue: " name))))))
(let [complete (promise)
service-created (.createQueue izer name durable selector #(deliver complete %))]
(if service-created
(when (not= "up" (deref complete 5000 nil))
(throw (Exception. (str "Unable to start queue: " name))))
(log/info "Queue already exists:" name)))
(throw (Exception. (str "Unable to start queue: " name)))))

(defn start-topic [name & opts]
(if-let [izer (registry/get "destinationizer")]

;; in-container
(let [complete (promise)]
(.createTopic izer name #(deliver complete %))
(if-not (= "up" (deref complete 5000 nil))
(throw (Exception. (str "Unable to start topic: " name)))))

;; outside container
(if-let [manager (registry/get "jboss.messaging.default.jms.manager")]
(if (.createTopic manager false name (into-array String []))
(at-exit #(stop-destination name)))
(throw (Exception. (str "Unable to start topic: " name))))))
(let [complete (promise)
service-created (.createTopic izer name #(deliver complete %))]
(if service-created
(when (not= "up" (deref complete 5000 nil))
(throw (Exception. (str "Unable to start topic: " name))))
(log/info "Topic already exists:" name)))
(throw (Exception. (str "Unable to start topic: " name)))))

(defn create-session [^javax.jms.XAConnection connection]
(if (registry/get factory-name)
Expand All @@ -222,9 +213,11 @@
(.createConsumer session destination selector)))

(defn destination-exists?
([connection name-or-dest]
(with-open [session (create-session connection)]
(create-destination session name-or-dest))))
[connection name-or-dest]
(try
(with-open [session (create-session connection)]
(create-destination session name-or-dest))
(catch JMSException _)))

(defn enlist-session
"Enlist a session in the current transaction, if any"
Expand Down Expand Up @@ -279,7 +272,6 @@
(onMessage [_ message]
(handler message))))


;; TODO: This is currently unused and, if deemed necessary, could
;; probably be better implemented
(defn wait-for-destination
Expand Down
Expand Up @@ -42,22 +42,34 @@ public Destinationizer(DeploymentUnit unit) {
super( unit );
}

public void createQueue(String queueName, boolean durable, String selector, Object callback) {
public boolean createQueue(String queueName, boolean durable, String selector, Object callback) {
if (this.destinations.containsKey( queueName )) {
return false;
}

QueueService service = new QueueService( queueName, selector, durable,
this.clojureRuntimeInjector.getValue(),
callback );

this.destinations.put( queueName,
QueueInstaller.deploy( getTarget(), service, queueName ) );

return true;
}

public void createTopic(String topicName, Object callback) {
public boolean createTopic(String topicName, Object callback) {
if (this.destinations.containsKey( topicName )) {
return false;
}

TopicService service = new TopicService( topicName,
this.clojureRuntimeInjector.getValue(),
callback );

this.destinations.put( topicName,
TopicInstaller.deploy( getTarget(), service, topicName ) );

return true;
}

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand Down
Expand Up @@ -20,11 +20,10 @@
clojure.test)
(:require [immutant.registry :as registry]))

(def dummies { "jboss.messaging.default.jms.manager"
(proxy [org.hornetq.jms.server.JMSServerManager][]
(createQueue [& _] true)
(createTopic [& _] true))
"housekeeper" nil})
(def dummies { "destinationizer"
(proxy [org.immutant.messaging.Destinationizer] [nil]
(createQueue [& _] false)
(createTopic [& _] false))})

(defn test-already-running [destination]
(let [names (atom (keys dummies))]
Expand Down

0 comments on commit d0d5b89

Please sign in to comment.