Permalink
Browse files

First cut at remote destination support [IMMUTANT-69]

This may break XA, and needs more tests. It also lacks docs atm.
  • Loading branch information...
1 parent 817fcb4 commit f05a61c14da836825940455f9d07f8878df544ea @tobias tobias committed Jun 20, 2012
@@ -5,3 +5,6 @@
(msg/start "/queue/biscuit")
(msg/listen "/queue/biscuit" #(msg/publish "/queue/ham" (.toUpperCase %)))
+(msg/start "/queue/bam")
+(msg/start "/queue/hiscuit")
+
@@ -65,3 +65,12 @@
(publish ham-queue "die!" :ttl 1)
(is (nil? (receive ham-queue :timeout 1000))))
+(testing "remote connections"
+ (deftest remote-publish-should-work
+ (publish ham-queue "testing-remote" :host "integ-app1.torquebox.org" :port 5445)
+ (is (= (receive ham-queue :timeout 60000) "testing-remote")))
+
+ (deftest remote-receive-should-work
+ (publish ham-queue "testing-remote")
+ (is (= (receive ham-queue :timeout 60000 :host "integ-app1.torquebox.org" :port 5445)
+ "testing-remote"))))
@@ -0,0 +1,37 @@
+;; Copyright 2008-2012 Red Hat, Inc, and individual contributors.
+;;
+;; This is free software; you can redistribute it and/or modify it
+;; under the terms of the GNU Lesser General Public License as
+;; published by the Free Software Foundation; either version 2.1 of
+;; the License, or (at your option) any later version.
+;;
+;; This software is distributed in the hope that it will be useful,
+;; but WITHOUT ANY WARRANTY; without even the implied warranty of
+;; MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+;; Lesser General Public License for more details.
+;;
+;; You should have received a copy of the GNU Lesser General Public
+;; License along with this software; if not, write to the Free
+;; Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+;; 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+
+(ns immutant.integs.msg.remote-listen
+ (:use fntest.core
+ clojure.test
+ immutant.messaging))
+
+(def bam-queue "/queue/bam")
+(def hiscuit-queue "/queue/hiscuit")
+
+(use-fixtures :once (with-deployment *file*
+ {
+ :root "target/apps/messaging/queues"
+ }))
+
+(deftest remote-listen-should-work
+ (listen bam-queue
+ (fn [m]
+ (publish hiscuit-queue m))
+ :host "integ-app1.torquebox.org" :port 5445)
+ (publish bam-queue "listen-up")
+ (is (= (receive hiscuit-queue :timeout 60000) "listen-up")))
@@ -40,9 +40,13 @@
:priority 0-9 or :low :normal :high :critical [4]
:ttl time to live, in ms [0=forever]
:persistent whether undelivered messages survive restarts [true]
- :properties a hash to which selectors may be applied"
+ :properties a hash to which selectors may be applied
+ :host the remote host to connect to (default is to connect in-vm, requires :port to be set as well) [nil]
+ :port the remote port to connect to (requires :host to be set) [nil]
+ :username the username to use to auth the connection (requires :password to be set) [nil]
+ :password the password to use to auth the connection (requires :username to be set) [nil]"
[dest-name message & {:as opts}]
- (with-connection
+ (with-connection opts
(let [session (session)
destination (destination session dest-name)
producer (.createProducer session destination)
@@ -56,9 +60,13 @@
The following options are supported [default]:
:timeout time in ms, after which nil is returned [10000]
- :selector A JMS (SQL 92) expression matching message properties"
- [dest-name & {:keys [timeout selector]}]
- (with-connection
+ :selector A JMS (SQL 92) expression matching message properties
+ :host the remote host to connect to (default is to connect in-vm, requires :port to be set as well) [nil]
+ :port the remote port to connect to (requires :host to be set) [nil]
+ :username the username to use to auth the connection (requires :password to be set) [nil]
+ :password the password to use to auth the connection (requires :username to be set) [nil]"
+ [dest-name & {:keys [timeout selector] :as opts}]
+ (with-connection opts
(let [session (session)
destination (destination session dest-name)
@@ -75,9 +83,13 @@
"The handler function, f, will receive any messages sent to dest-name.
The following options are supported [default]:
- :concurrency the number of threads handling messages [1]"
- [dest-name f & {:keys [concurrency selector] :or {concurrency 1}}]
- (let [connection (.createXAConnection connection-factory)]
+ :concurrency the number of threads handling messages [1]
+ :host the remote host to connect to (default is to connect in-vm, requires :port to be set as well) [nil]
+ :port the remote port to connect to (requires :host to be set) [nil]
+ :username the username to use to auth the connection (requires :password to be set) [nil]
+ :password the password to use to auth the connection (requires :username to be set) [nil]"
+ [dest-name f & {:keys [concurrency selector] :or {concurrency 1} :as opts}]
+ (let [connection (.createXAConnection (connection-factory opts))]
(try
(dotimes [_ concurrency]
(let [session (create-session connection)
@@ -86,7 +98,7 @@
(.setMessageListener consumer
(reify javax.jms.MessageListener
(onMessage [_ message]
- (bind-transaction session connection #(f (codecs/decode message))))))))
+ (f (codecs/decode message)))))))
(at-exit #(.close connection))
(.start connection)
connection
@@ -18,27 +18,40 @@
(ns immutant.messaging.core
(:use [immutant.utilities :only (at-exit)])
(:import (javax.jms Session DeliveryMode))
- (:require [immutant.registry :as lookup]
+ (:require [immutant.registry :as lookup]
[immutant.messaging.hornetq :as hornetq]
- [immutant.xa.transaction :as tx]))
+ [immutant.xa.transaction :as tx]
+ [clojure.tools.logging :as log]))
;;; The name of the JBoss connection factory
(def factory-name "jboss.naming.context.java.ConnectionFactory")
-;;; Thread-local connection
+;;; Thread-local connection set
+(def ^{:private true, :dynamic true} *connections* nil)
+;;; Thread-local current connection
(def ^{:private true, :dynamic true} *connection* nil)
;;; Thread-local map of transactions to sessions
(def ^{:private true, :dynamic true} *sessions* nil)
-(def connection-factory
- (if-let [reference-factory (lookup/fetch factory-name)]
- (let [reference (.getReference reference-factory)]
- (try
- (.getInstance reference)
- (finally (at-exit #(.release reference)))))
- (do
- (println "WARN: unable to obtain JMS Connection Factory so we must be outside container")
- (hornetq/connection-factory))))
+(defn ^{:private true} remote-connection? [opts]
+ (:host opts))
+
+(defn ^{:private true} connection-key [opts]
+ (map #(% opts) [:host :port :username :password]))
+
+(let [local-connection-factory
+ (if-let [reference-factory (lookup/fetch factory-name)]
+ (let [reference (.getReference reference-factory)]
+ (try
+ (.getInstance reference)
+ (finally (at-exit #(.release reference)))))
+ (do
+ (log/warn "Unable to obtain JMS Connection Factory - assuming we are outside the container")
+ (hornetq/connection-factory)))]
+ (defn connection-factory [opts]
+ (if (remote-connection? opts)
+ (hornetq/connection-factory opts)
+ local-connection-factory)))
(defn queue? [name]
(.startsWith name "/queue"))
@@ -84,9 +97,9 @@
(if (queue? name)
(.destroyQueue manager name)
(.destroyTopic manager name))
- (println "Stopped" name)
+ (log/info "Stopped" name)
(catch Throwable e
- (println "WARN:" (.getMessage (.getCause e)))))))
+ (log/warn (.getMessage (.getCause e)))))))
(defn start-queue [name & {:keys [durable selector] :or {durable false selector ""}}]
(if-let [manager (lookup/fetch "jboss.messaging.default.jms.manager")]
@@ -105,53 +118,46 @@
(.createXASession connection)
(.createSession connection false Session/AUTO_ACKNOWLEDGE)))
-(defn join-current-transaction
+(defn create-connection
+ "Creates a connection and registers it in the *connections* map"
+ [opts]
+ (let [conn (.createXAConnection (connection-factory opts) (:username opts) (:password opts))]
+ (set! *connections* (assoc *connections* (connection-key opts) conn))
+ conn))
+
+(defn enlist-session
"Enlist a session in the current transaction, if any"
[session]
(let [transaction (tx/current)]
(if transaction
(tx/enlist (.getXAResource session)))
- (set! *sessions* (assoc *sessions* transaction session))))
+ (set! *sessions* (assoc *sessions* transaction session)))
+ session)
(defn session
[]
- (let [transaction (tx/current)]
- (if-not (contains? *sessions* transaction)
- (join-current-transaction (create-session *connection*)))
- (get *sessions* transaction)))
-
-(defn with-connection* [f]
- (if *connection*
- (f)
- (binding [*connection* (.createXAConnection connection-factory)
- *sessions* {}]
- (.start *connection*)
- (try
- (f)
- (finally
- (let [conn *connection*]
- (if (tx/active?)
- (tx/after-completion #(.close conn))
- (.close conn))))))))
-
-(defmacro with-connection [& body]
- `(with-connection* (fn [] ~@body)))
-
-(defn bind-transaction
- "Create a transaction, bind the connection, enlist the session, and
- call the function. Most useful for a listener's onMessage calls, so
- as to re-use its session and connection for subsequent
- transactional JMS interactions invoked by the function."
- [session connection f]
- (binding [*connection* connection
- *sessions* {}]
- (tx/requires-new
- (join-current-transaction session)
- (f))
- ;; Close the sessions used for nested tx's, if any
- (doseq [s (remove (partial = session) (vals *sessions*))] (.close s))))
-
-
+ (enlist-session (create-session *connection*)))
+
+(defn with-connection* [opts f]
+ (binding [*connections* (or *connections* {})]
+ (if-let [conn (*connections* (connection-key opts))]
+ ;; this connection has been used before in the current call stack, just rebind it
+ (binding [*connection* conn]
+ (f))
+ ;; we need a new connection, so we have to start it and clean up after
+ (binding [*connection* (create-connection opts)
+ *sessions* {}]
+ (.start *connection*)
+ (try
+ (f)
+ (finally
+ (let [conn *connection*]
+ (if (tx/active?)
+ (tx/after-completion #(.close conn))
+ (.close conn)))))))))
+
+(defmacro with-connection [options & body]
+ `(with-connection* ~options (fn [] ~@body)))
;; TODO: This is currently unused and, if deemed necessary, could
;; probably be better implemented
@@ -22,8 +22,10 @@
(defn connection-factory
"Create a connection factory, typically invoked when outside container"
- [& {:keys [host port] :or {host "localhost" port 5445}}]
- (let [connect_opts { "host" host "port" (Integer. port) }
- transport_config (new TransportConfiguration "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" connect_opts)]
- (HornetQJMSClient/createConnectionFactoryWithoutHA JMSFactoryType/CF (into-array [transport_config]))))
+ ([]
+ (connection-factory nil))
+ ([{:keys [host port] :or {host "localhost" port 5445}}]
+ (let [connect_opts { "host" host "port" (Integer. port) }
+ transport_config (new TransportConfiguration "org.hornetq.core.remoting.impl.netty.NettyConnectorFactory" connect_opts)]
+ (HornetQJMSClient/createConnectionFactoryWithoutHA JMSFactoryType/CF (into-array [transport_config])))))
@@ -20,8 +20,9 @@
(:use [clojure.java.jdbc :only [transaction*]])
(:require [immutant.registry :as lookup]))
-(def manager (lookup/fetch "jboss.txn.TransactionManager"))
-
+(def ^javax.transaction.TransactionManager
+ manager (lookup/fetch "jboss.txn.TransactionManager"))
+
(defn current
"Return the active transaction"
[]

0 comments on commit f05a61c

Please sign in to comment.