Permalink
Browse files

using new rabbitmq utils for automatic reconnection to rabbitmq serve…

…r on connection failure
  • Loading branch information...
1 parent 6cfd493 commit 6fdfb4c69668b03fc5980a0203c24264a9c1c03e @amitrathore committed Aug 5, 2010
Showing with 26 additions and 10 deletions.
  1. +3 −3 project.clj
  2. +1 −1 src/org/runa/swarmiji/mpi/sevak_proxy.clj
  3. +22 −6 src/org/runa/swarmiji/sevak/sevak_core.clj
View
@@ -1,4 +1,4 @@
-(defproject swarmiji "0.3.3"
+(defproject swarmiji "0.3.4"
:description "A distributed computing framework to help write and run Clojure code in parallel, across cores and processors"
:url "http://github.com/amitrathore/swarmiji"
:dependencies [[org.clojure/clojure "1.1.0"]
@@ -8,8 +8,8 @@
[mysql/mysql-connector-java "5.1.6"]
[rabbitmq-client "1.7.0"]
[org.clojars.sethtrain/postal "0.2.0"]
- [org.clojars.runa/clj-utils "0.1.3"]
- [org.clojars.runa/medusa "0.1.5"]
+ [org.clojars.runa/clj-utils "0.2.0"]
+ [org.clojars.runa/medusa "0.1.6"]
[org.clojars.macourtney/clj-record "1.0.1"]
[org.clojars.amit/swarmiji-java "0.2.0"]]
:dev-dependencies [[swank-clojure "1.2.1"]])
@@ -20,7 +20,7 @@
(defn register-callback [return-q-name custom-handler request-object]
(init-medusa 140)
- (let [chan (new-channel)
+ (let [chan (create-channel)
consumer (consumer-for chan DEFAULT-EXCHANGE-NAME DEFAULT-EXCHANGE-TYPE return-q-name return-q-name)
on-response (fn [msg]
(custom-handler (read-string msg))
@@ -71,6 +71,7 @@
(.get f))
f))
(catch Exception e
+ (log-message "Error in sevak-request-handling-listener:" (class e))
(log-exception e)))))
(defn boot-sevak-server []
@@ -82,10 +83,25 @@
(init-rabbit)
(init-medusa 300)
;(send-message-on-queue (queue-diagnostics-q-name) {:message_type START-UP-REPORT :sevak_server_pid (process-pid) :sevak_name SEVAK-SERVER})
- (future
- (with-swarmiji-bindings
- (start-queue-message-handler (sevak-fanout-exchange-name) FANOUT-EXCHANGE-TYPE (random-queue-name) sevak-request-handling-listener)))
- (future
- (with-swarmiji-bindings
- (start-queue-message-handler (queue-sevak-q-name) (queue-sevak-q-name) sevak-request-handling-listener)))
+
+(future
+ (with-swarmiji-bindings
+ (try
+ (log-message "Listening for update broadcasts...")
+ (start-queue-message-handler (sevak-fanout-exchange-name) FANOUT-EXCHANGE-TYPE (random-queue-name) sevak-request-handling-listener)
+ (log-message "Done with broadcasts!")
+ (catch Exception e
+ (log-message "Error in update broadcasts future!")
+ (log-exception e)))))
+
+(future
+ (with-swarmiji-bindings
+ (try
+ (log-message "Starting to serve sevak requests...")
+ (start-queue-message-handler (queue-sevak-q-name) (queue-sevak-q-name) sevak-request-handling-listener)
+ (log-message "Done with sevak requests!")
+ (catch Exception e
+ (log-message "Error in sevak-servicing future!")
+ (log-exception e)))))
+
(log-message "Sevak Server Started!"))

0 comments on commit 6fdfb4c

Please sign in to comment.