Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

refactoring transport.clj, and new queue-name to global broadcasts

  • Loading branch information...
commit f79f4846c72b7c9cdefb064b7bf7806c743393cb 1 parent b05072d
@amitrathore authored
View
13 src/org/runa/swarmiji/mpi/sevak_proxy.clj
@@ -10,16 +10,8 @@
(use 'org.rathore.amit.utils.config)
(use 'org.rathore.amit.utils.rabbitmq)
(use 'org.rathore.amit.medusa.core)
-(use 'clojure.contrib.except)
(use 'alex-and-georges.debug-repl)
-(defn sevak-queue-message-no-return [sevak-service args]
- {:sevak-service-name sevak-service
- :sevak-service-args args})
-
-(defn sevak-queue-message-for-return [sevak-service args]
- (assoc (sevak-queue-message-no-return sevak-service args) :return-queue-name (return-queue-name sevak-service)))
-
(defn register-callback [realtime? return-q-name custom-handler request-object]
(init-medusa (medusa-client-thread-count))
(register-callback-or-fallback realtime? return-q-name custom-handler request-object))
@@ -34,8 +26,3 @@
(let [request-object (sevak-queue-message-no-return sevak-service args)]
(send-message-on-queue (queue-sevak-q-name realtime?) request-object)
nil)))
-
-(defmacro multicast-to-sevak-servers [sevak-var & args]
- (let [{:keys [name ns] :as meta-inf} (meta (resolve sevak-var))]
- (if-not meta-inf (throwf "Multicast-to-sevak-servers is unable to resolve %s" sevak-var))
- `(fanout-message-to-all (sevak-queue-message-no-return ~(ns-qualified-name name ns) (list ~@args)))))
View
12 src/org/runa/swarmiji/mpi/transport.clj
@@ -1,13 +1,18 @@
(ns org.runa.swarmiji.mpi.transport)
+
(use 'org.runa.swarmiji.config.system-config)
(use 'org.runa.swarmiji.sevak.bindings)
+(use 'org.runa.swarmiji.utils.general-utils)
(use 'org.rathore.amit.utils.logger)
(use 'org.rathore.amit.utils.clojure)
(use 'org.rathore.amit.utils.rabbitmq)
(use 'org.rathore.amit.medusa.core)
+(use 'clojure.contrib.except)
+(use 'alex-and-georges.debug-repl)
(def rabbit-down-messages (atom {}))
(def *guaranteed-sevaks*)
+(def BROADCASTS-QUEUE-NAME "BROADCASTS_GLOBAL")
(defn send-message-no-declare [q-name q-message-object]
(with-swarmiji-bindings
@@ -20,7 +25,12 @@
(send-message q-name q-message-object))))
(defn fanout-message-to-all [message-object]
- (send-message (sevak-fanout-exchange-name) FANOUT-EXCHANGE-TYPE "" message-object))
+ (send-message (sevak-fanout-exchange-name) FANOUT-EXCHANGE-TYPE BROADCASTS-QUEUE-NAME message-object))
+
+(defmacro multicast-to-sevak-servers [sevak-var & args]
+ (let [{:keys [name ns] :as meta-inf} (meta (resolve sevak-var))]
+ (if-not meta-inf (throwf "Multicast-to-sevak-servers is unable to resolve %s" sevak-var))
+ `(fanout-message-to-all (sevak-queue-message-no-return ~(ns-qualified-name name ns) (list ~@args)))))
(defn should-fallback [sevak-name]
(some #{(keyword sevak-name)} *guaranteed-sevaks*))
View
8 src/org/runa/swarmiji/sevak/sevak_core.clj
@@ -88,9 +88,11 @@
(defn sevak-request-handling-listener [req-str ack-fn]
(with-swarmiji-bindings
(try
+ (reload-namespaces)
(let [req (read-string req-str)
- service-name (req :sevak-service-name) service-args (req :sevak-service-args) return-q (req :return-queue-name)
- _ (reload-namespaces)
+ service-name (req :sevak-service-name)
+ service-args (req :sevak-service-args)
+ return-q (req :return-queue-name)
service-handler (@sevaks service-name)]
(log-message "Received request for" service-name "with args:" service-args "and return-q:" return-q)
(when (nil? service-handler)
@@ -119,7 +121,7 @@
(defn start-broadcast-processor []
(future
(with-swarmiji-bindings
- (let [broadcasts-q (random-queue-name "BROADCASTS_")]
+ (let [broadcasts-q (random-queue-name "BROADCASTS_LISTENER_")]
(try
(log-message "Listening for update broadcasts...")
(.addShutdownHook (Runtime/getRuntime) (Thread. #(with-swarmiji-bindings (delete-queue broadcasts-q))))
View
17 src/org/runa/swarmiji/utils/general_utils.clj
@@ -13,11 +13,9 @@
([sevak-name-keyword the-name-space]
(str (ns-name the-name-space) "/" (name sevak-name-keyword))))
-(defn sevak-info [sevak-name realtime? needs-response? function]
- {:name sevak-name :return needs-response? :realtime realtime? :fn function})
-
-(defn return-queue-name [sevak-name]
- (str (System/currentTimeMillis) "_" sevak-name "_" (random-uuid)))
+(defn sevak-queue-message-no-return [sevak-service args]
+ {:sevak-service-name sevak-service
+ :sevak-service-args args})
(defn random-queue-name
([]
@@ -25,6 +23,15 @@
([prefix]
(str prefix (random-uuid))))
+(defn return-queue-name [sevak-name]
+ (str (System/currentTimeMillis) "_" sevak-name "_" (random-uuid)))
+
+(defn sevak-queue-message-for-return [sevak-service args]
+ (assoc (sevak-queue-message-no-return sevak-service args) :return-queue-name (return-queue-name sevak-service)))
+
+(defn sevak-info [sevak-name realtime? needs-response? function]
+ {:name sevak-name :return needs-response? :realtime realtime? :fn function})
+
(defn process-pid []
(let [m-name (.getName (ManagementFactory/getRuntimeMXBean))]
(first (.split m-name "@"))))
Please sign in to comment.
Something went wrong with that request. Please try again.