Skip to content

Commit

Permalink
Made sure all rabbitmq messages got ack'ed
Browse files Browse the repository at this point in the history
  • Loading branch information
Jim Duey committed Sep 6, 2010
1 parent 51ff146 commit be94bd8
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 57 deletions.
72 changes: 32 additions & 40 deletions src/conduit_rabbitmq.clj
@@ -1,6 +1,6 @@
(ns conduit-rabbitmq
(:use
[conduit :only [conduit new-proc
[conduit :only [conduit new-proc run-proc
new-id seq-fn seq-proc a-run
scatter-gather-fn
reply-proc pass-through]]
Expand Down Expand Up @@ -47,23 +47,23 @@
nil)))
([queue msecs] (.nextDelivery (consumer queue) msecs)))

(defn msg-stream
([queue]
(let [consumer (consumer queue)]
{:type :rabbitmq
:fn (fn this-fn [x]
(try
(let [msg (.nextDelivery consumer)]
[[msg] this-fn])
(catch InterruptedException e
nil)))}))
([queue msecs]
(let [consumer (consumer queue)]
{:type :rabbitmq
:fn (fn this-fn [x]
(let [msg (.nextDelivery consumer msecs)]
(when msg
[[msg] this-fn])))})))
(defn msg-stream [queue & [msecs]]
(let [consumer (consumer queue)]
(if msecs
(fn this-fn1 [x]
(let [msg (.nextDelivery consumer msecs)]
(when msg
[[msg] this-fn1])))
(fn this-fn2 [x]
(try
(let [msg (.nextDelivery consumer)]
[[msg] this-fn2])
(catch InterruptedException e
nil))))))

(defn msg-stream-proc [queue & [msecs]]
{:type :rabbitmq
:fn (msg-stream queue msecs)})

(defn a-rabbitmq [source proc]
(let [id (new-id)
Expand All @@ -75,29 +75,21 @@
:source source
:id id}))

(with-arrow conduit
(defn rabbitmq-handler [p queue]
(let [h (->> (dissoc (get-in p [:parts queue]) :type)
seq
(mapcat (fn [[x p]]
[x p]))
(apply a-select))]
h))

(defn rabbitmq-run [p queue channel exchange & [msecs]]
(binding [*channel* channel
*exchange* exchange]
(let [queue (str queue)]
(dorun (a-run
(a-comp (if msecs
(msg-stream queue msecs)
(msg-stream queue))
(a-all (a-arr read-msg)
pass-through)
(a-nth 0 (rabbitmq-handler p queue))
(a-nth 1 (a-arr ack-message))
(a-arr first)))))))
(defn rabbitmq-run [p queue channel exchange & [msecs]]
(binding [*channel* channel
*exchange* exchange]
(let [queue (str queue)
get-next (msg-stream queue msecs)]
(when-let [handler-map (get-in p [:parts queue])]
(loop [[[raw-msg] get-next] (get-next nil)]
(when raw-msg
(let [[target msg] (read-msg raw-msg)]
(when-let [handler (get handler-map target)]
((:fn handler) msg))
(ack-message raw-msg)
(recur (get-next nil)))))))))

(with-arrow conduit
(defn rabbitmq-arr [source f]
(a-rabbitmq source (a-arr f))))

Expand Down
26 changes: 9 additions & 17 deletions test/test_conduit_rabbitmq.clj
Expand Up @@ -2,7 +2,8 @@
(:use conduit-rabbitmq :reload-all)
(:use
clojure.test
[conduit :only [conduit a-run
[conduit :only [conduit a-run run-proc
conduit-map
new-proc conduit-seq]]
arrows)
(:import
Expand Down Expand Up @@ -56,7 +57,7 @@
(range 50)))

(is (= (range 50)
(a-run (a-comp (msg-stream *queue* 100)
(a-run (a-comp (msg-stream-proc *queue* 100)
(a-arr (fn [m]
(ack-message m)
(read-msg m))))))))
Expand Down Expand Up @@ -105,30 +106,21 @@
thread-fn (fn [exchange queue]
(with-open [connection (rabbitmq-connection "localhost" "/" "guest" "guest")
channel (.createChannel connection)]
(binding [*channel* channel
*exchange* exchange]
(let [queue (str queue)]
(a-run
(a-comp (msg-stream queue)
(a-arr (fn [m]
[(read-msg m) m]))
(a-nth 0 (rabbitmq-handler new-rabbit queue))
(a-nth 1 (a-arr ack-message))))))))

(rabbitmq-run new-rabbit queue channel exchange)))
remote-thread (doto (new Thread (partial thread-fn *exchange* *queue*))
(.start))]

(try
(is (= (map vector
(range 1 11)
(range -1 9))
(a-run (a-comp (conduit-seq (range 10))
new-rabbit ))))
(conduit-map new-rabbit (range 10))))
(finally
(Thread/sleep 500)
(.interrupt remote-thread)
(println "waiting...")
(.join remote-thread 5000)
(println "done waiting"))))))
(.join remote-thread 5000))))))




(run-tests)

0 comments on commit be94bd8

Please sign in to comment.