Skip to content

Commit

Permalink
inserted new plumbing for realtime and non-realtime queues
Browse files Browse the repository at this point in the history
  • Loading branch information
amitrathore committed Nov 17, 2010
1 parent 45b2255 commit e135a53
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 32 deletions.
8 changes: 4 additions & 4 deletions src/org/runa/swarmiji/client/client_core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
(log-exception e (str "Read failed on " response-obj-string))
{:exception (exception-name e) :stacktrace (stacktrace e) :status :error})))

(defn on-swarm [sevak-service & args]
(defn on-swarm [realtime? sevak-service & args]
(let [sevak-start (ref (System/currentTimeMillis))
total-sevak-time (ref nil)
sevak-data (ref swarmiji-sevak-init-value)
Expand All @@ -71,7 +71,7 @@
(dosync (ref-set total-sevak-time (- (System/currentTimeMillis) @sevak-start)))
(if (and (swarmiji-diagnostics-mode?) (success?))
(send-work-report (sevak-name) args (sevak-time) (messaging-time) (return-q @sevak-data) (sevak-server-pid @sevak-data)))))
on-swarm-proxy-client (new-proxy (name sevak-service) args on-swarm-response)]
on-swarm-proxy-client (new-proxy realtime? (name sevak-service) args on-swarm-response)]
(fn [accessor]
(condp = accessor
:sevak-name (name sevak-service)
Expand All @@ -92,8 +92,8 @@
:default (throw (Exception. (str "On-swarm proxy error - unknown message:" accessor)))))))


(defn on-swarm-no-response [sevak-service & args]
(new-proxy (name sevak-service) args)
(defn on-swarm-no-response [realtime? sevak-service & args]
(new-proxy realtime? (name sevak-service) args)
nil)

(defn all-complete? [swarm-requests]
Expand Down
6 changes: 4 additions & 2 deletions src/org/runa/swarmiji/config/system_config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@
(defn queue-password []
((operation-config) :q-password))

(defn queue-sevak-q-name []
(str ((operation-config) :sevak-request-queue-prefix) (swarmiji-user)))
(defn queue-sevak-q-name [realtime?]
(if realtime?
(str ((operation-config) :sevak-request-queue-prefix) "realtime_" (swarmiji-user))
(str ((operation-config) :sevak-request-queue-prefix) "non_realtime_" (swarmiji-user))))

(defn queue-diagnostics-q-name []
(str ((operation-config) :sevak-diagnostics-queue-prefix) (swarmiji-user)))
Expand Down
12 changes: 6 additions & 6 deletions src/org/runa/swarmiji/mpi/sevak_proxy.clj
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@
(defn sevak-queue-message-for-return [sevak-service args]
(assoc (sevak-queue-message-no-return sevak-service args) :return-queue-name (return-queue-name sevak-service)))

(defn register-callback [return-q-name custom-handler request-object]
(defn register-callback [realtime? return-q-name custom-handler request-object]
(init-medusa (medusa-client-thread-count))
(register-callback-or-fallback return-q-name custom-handler request-object))
(register-callback-or-fallback realtime? return-q-name custom-handler request-object))

(defn new-proxy
([sevak-service args callback-function]
([realtime? sevak-service args callback-function]
(let [request-object (sevak-queue-message-for-return sevak-service args)
return-q-name (request-object :return-queue-name)
proxy-object (register-callback return-q-name callback-function request-object)]
proxy-object (register-callback realtime? return-q-name callback-function request-object)]
proxy-object))
([sevak-service args]
([realtime? sevak-service args]
(let [request-object (sevak-queue-message-no-return sevak-service args)]
(send-message-on-queue (queue-sevak-q-name) request-object)
(send-message-on-queue (queue-sevak-q-name realtime?) request-object)
nil)))

(defmacro multicast-to-sevak-servers [sevak-name & args]
Expand Down
20 changes: 9 additions & 11 deletions src/org/runa/swarmiji/mpi/transport.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
(defn should-fallback [sevak-name]
(some #{(keyword sevak-name)} *guaranteed-sevaks*))

(defn send-and-register-callback [return-q-name custom-handler request-object]
(defn send-and-register-callback [realtime? return-q-name custom-handler request-object]
(let [chan (create-channel)
consumer (consumer-for chan DEFAULT-EXCHANGE-NAME DEFAULT-EXCHANGE-TYPE return-q-name return-q-name)
on-response (fn [msg]
Expand All @@ -36,27 +36,25 @@
(.queueDelete chan return-q-name)
(.close chan)))))
f (fn []
(send-message-on-queue (queue-sevak-q-name) request-object)
(send-message-on-queue (queue-sevak-q-name realtime?) request-object)
(on-response (delivery-from chan consumer)))]
(log-message "[" (number-of-queued-tasks) (futures-count)
"]: Dispatching request on " return-q-name)
(medusa-future-thunk return-q-name f)
{:channel chan :queue return-q-name :consumer consumer}))

(defn add-to-rabbit-down-queue [return-queue-name custom-handler request-object]
(swap! rabbit-down-messages assoc (System/currentTimeMillis) [return-queue-name custom-handler request-object]))
(defn add-to-rabbit-down-queue [realtime? return-queue-name custom-handler request-object]
(swap! rabbit-down-messages assoc (System/currentTimeMillis) [realtime? return-queue-name custom-handler request-object]))

(defn register-callback-or-fallback [return-q-name custom-handler request-object]
(defn register-callback-or-fallback [realtime? return-q-name custom-handler request-object]
(try
(send-and-register-callback return-q-name custom-handler request-object)
(send-and-register-callback realtime? return-q-name custom-handler request-object)
(catch java.net.ConnectException ce
(if (should-fallback (:sevak-service-name request-object))
(add-to-rabbit-down-queue return-q-name custom-handler request-object)))))
(add-to-rabbit-down-queue realtime? return-q-name custom-handler request-object)))))

(defn retry-message [timestamp [return-queue-name custom-handler request-object]]
(defn retry-message [timestamp [realtime? return-queue-name custom-handler request-object]]
(with-swarmiji-bindings
(try
(send-and-register-callback return-queue-name custom-handler request-object)
(send-and-register-callback realtime? return-queue-name custom-handler request-object)
(swap! rabbit-down-messages dissoc timestamp)
(catch java.net.ConnectException ce
(log-message "RabbitMQ still down, will retry" (count @rabbit-down-messages) "messages...")) ;;ignore, will try again later
Expand Down
37 changes: 28 additions & 9 deletions src/org/runa/swarmiji/sevak/sevak_core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,33 @@
(def START-UP-REPORT "START_UP_REPORT")
(def SEVAK-SERVER "SEVAK_SERVER")

(defmacro sevak-runner [sevak-name needs-response sevak-args]
(defmacro sevak-runner [realtime? sevak-name needs-response sevak-args]
`(fn ~sevak-args
(if (swarmiji-distributed-mode?)
(if ~needs-response
(apply on-swarm (cons ~sevak-name ~sevak-args))
(apply on-swarm-no-response (cons ~sevak-name ~sevak-args)))
(apply on-swarm ~realtime? ~sevak-name ~sevak-args)
(apply on-swarm-no-response ~realtime? ~sevak-name ~sevak-args))
(apply on-local (cons (@sevaks ~sevak-name) ~sevak-args)))))

(defmacro defsevak [service-name args & expr]
`(let [sevak-name# (keyword (str '~service-name))]
(dosync (ref-set sevaks (assoc @sevaks sevak-name# {:return Boolean/TRUE :fn (fn ~args (do ~@expr))})))
(def ~service-name (sevak-runner sevak-name# Boolean/TRUE ~args))))
(def ~service-name (sevak-runner true sevak-name# Boolean/TRUE ~args))))

(defmacro defseva [service-name args & expr]
`(let [seva-name# (keyword (str '~service-name))]
(dosync (ref-set sevaks (assoc @sevaks seva-name# {:return Boolean/FALSE :fn (fn ~args (do ~@expr))})))
(def ~service-name (sevak-runner seva-name# Boolean/FALSE ~args))))
(def ~service-name (sevak-runner true seva-name# Boolean/FALSE ~args))))

(defmacro defsevak-nr [service-name args & expr]
`(let [sevak-name# (keyword (str '~service-name))]
(dosync (ref-set sevaks (assoc @sevaks sevak-name# {:return Boolean/TRUE :fn (fn ~args (do ~@expr))})))
(def ~service-name (sevak-runner false sevak-name# Boolean/TRUE ~args))))

(defmacro defseva-nr [service-name args & expr]
`(let [seva-name# (keyword (str '~service-name))]
(dosync (ref-set sevaks (assoc @sevaks seva-name# {:return Boolean/FALSE :fn (fn ~args (do ~@expr))})))
(def ~service-name (sevak-runner false seva-name# Boolean/FALSE ~args))))

(defn handle-sevak-request [service-name service-handler service-args ack-fn]
(with-swarmiji-bindings
Expand Down Expand Up @@ -78,8 +88,6 @@
(defn boot-sevak-server []
(log-message "Starting sevaks in" *swarmiji-env* "mode")
(log-message "System config:" (operation-config))
(log-message "MPI transport Q:" (queue-sevak-q-name))
(log-message "MPI diagnostics Q:" (queue-diagnostics-q-name))
(log-message "Medusa server threads:" (medusa-server-thread-count))
(log-message "Medusa client threads:" (medusa-client-thread-count))
(log-message "RabbitMQ prefetch-count:" (rabbitmq-prefetch-count))
Expand All @@ -103,12 +111,23 @@
(future
(with-swarmiji-bindings
(try
(log-message "Starting to serve sevak requests...")
(log-message "Starting to serve realtime sevak requests...")
(with-prefetch-count (rabbitmq-prefetch-count)
(start-queue-message-handler (queue-sevak-q-name) (queue-sevak-q-name) sevak-request-handling-listener))
(start-queue-message-handler (queue-sevak-q-name true) (queue-sevak-q-name true) sevak-request-handling-listener))
(log-message "Done with sevak requests!")
(catch Exception e
(log-message "Error in sevak-servicing future!")
(log-exception e)))))

(future
(with-swarmiji-bindings
(try
(log-message "Starting to serve non-realtime sevak requests...")
(with-prefetch-count (rabbitmq-prefetch-count)
(start-queue-message-handler (queue-sevak-q-name false) (queue-sevak-q-name false) sevak-request-handling-listener))
(log-message "Done with sevak requests!")
(catch Exception e
(log-message "Error in sevak-servicing future!")
(log-exception e)))))

(log-message "Sevak Server Started!"))

0 comments on commit e135a53

Please sign in to comment.