(ns clojurewerkz.langohr.examples.recovery.example1
(:require [langohr.core :as rmq]
[ :as lch]
[langohr.queue :as lq]
[ :as lx]
[langohr.consumers :as lc]
[langohr.basic :as lb])
(defn message-handler
[ch {:keys [content-type delivery-tag type] :as meta} ^bytes payload]
(println (format "[consumer] Received a message: %s"
(String. payload "UTF-8"))))
(defn start-consumer
[ch ^String q]
(lq/declare ch q {:exclusive false :auto-delete false})
(lc/subscribe ch q message-handler {:auto-ack true}))
(defn -main
[& args]
(let [conn (rmq/connect {:automatically-recover true :automatically-recover-topology false})
ch (lch/open conn)
q "langohr.examples.recovery.example1.q"
x ""]
(println (format "[main] Connected. Channel id: %d" (.getChannelNumber ch)))
(start-consumer ch q)
(rmq/on-recovery ch (fn [ch]
(println "[main] Channel recovered. Recovering topology...")
(start-consumer ch q)))
(while true
(Thread/sleep 1000)
(lb/publish ch x q "hello")
(catch AlreadyClosedException ace
(comment "Happens when you publish while the connection is down"))
(catch IOException ioe
(comment "ditto"))))))