Skip to content

Commit

Permalink
Use a blocking deref instead of taking a :timeout param [IMMUTANT-201]
Browse files Browse the repository at this point in the history
  • Loading branch information
tobias committed Jan 10, 2013
1 parent 4957c1b commit b0a1b8d
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 18 deletions.
7 changes: 1 addition & 6 deletions docs/src/org/messaging.org
Expand Up @@ -330,12 +330,7 @@
optional list of options. It publishes the message to the queue, marking
it as a /synchronous/ message and returns a delay that will receive the
response from the worker initiated by the =respond= function. It accepts
the same options as =publish=, with one additional option:

| Option | Default | Description |
|------------+---------+-------------------------------------|
| | <c> | <35> |
| =:timeout= | =10000= | Time in ms for the delayed receive to wait once it it is dereferenced, after which nil is returned |
the same options as =publish=.

** Respond

Expand Down
Expand Up @@ -6,13 +6,21 @@
(def ham-queue "/queue/ham")
(def biscuit-queue "/queue/biscuit")
(def oddball-queue (msg/as-queue "oddball"))
(def sleepy-queue "queue.sleeper")

(msg/start ham-queue)
(msg/start biscuit-queue)
(msg/start oddball-queue)
(msg/start sleepy-queue)

(msg/respond ham-queue (memfn toUpperCase))
(msg/respond oddball-queue (memfn toUpperCase))

(msg/respond biscuit-queue (memfn toUpperCase) :selector "worker='upper'")
(msg/respond biscuit-queue (memfn toLowerCase) :selector "worker='lower'")

(msg/respond sleepy-queue (fn [m]
(println "SLEEPING" m)
(Thread/sleep m)
(println "AWAKE")
m))
Expand Up @@ -23,22 +23,30 @@
(def ham-queue "/queue/ham")
(def biscuit-queue "/queue/biscuit")
(def oddball-queue (as-queue "oddball"))
(def sleepy-queue "queue.sleeper")

(use-fixtures :once (with-deployment *file*
{
:root "target/apps/messaging/request-response"
}))

(deftest request-and-respond-should-both-work
(is (= "BISCUIT" @(request ham-queue "biscuit" :timeout 2000))))
(is (= "BISCUIT" @(request ham-queue "biscuit"))))

(deftest request-and-respond-with-a-selector-should-work
(is (= "BISCUIT" @(request biscuit-queue "biscuit"
:timeout 2000
:properties {"worker" "upper"})))
(is (= "ham" @(request biscuit-queue "HAM"
:timeout 2000
:properties {"worker" "lower"}))))

(deftest request-and-respond-with-as-queue-should-both-work
(is (= "BISCUIT" @(request oddball-queue "biscuit" :timeout 2000))))
(is (= "BISCUIT" @(request oddball-queue "biscuit"))))

(deftest request-with-a-deref-timeout-should-work
(is (= 100 (deref (request sleepy-queue 100) 1000 nil))))

(deftest realized?-should-work
(let [response (request sleepy-queue 1000)]
(is (not (realized? response)))
(is (= 1000 (time @response)))
(is (realized? response))))
12 changes: 4 additions & 8 deletions modules/messaging/src/main/clojure/immutant/messaging.clj
Expand Up @@ -240,19 +240,15 @@
with respond. The queue parameter can either be the name of a
queue, an actual javax.jms.Queue, or the result of as-queue.
It takes the same options as publish, and one more [default]:
:timeout time in ms for the delayed receive to wait once it it is
dereferenced, after which nil is returned [10000]"
It takes the same options as publish."
[queue message & {:as opts}]
{:pre [(queue? queue)]}
(let [^javax.jms.Message message (mapply publish queue message
(update-in opts [:properties]
#(merge % {"synchronous" "true"})))]
(delay
;; TODO: move the timeout from the request call to the deref
(mapply receive queue
(assoc opts
:selector (str "JMSCorrelationID='" (.getJMSMessageID message) "'"))))))
(mapply delayed-receive queue
(assoc opts
:selector (str "JMSCorrelationID='" (.getJMSMessageID message) "'")))))

(defn respond
"Listen for messages on queue sent by the request function and
Expand Down

0 comments on commit b0a1b8d

Please sign in to comment.