Skip to content

Commit

Permalink
no client-side thread pool and no more spin-locks
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Jul 14, 2011
1 parent 664f140 commit 18af3bc
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 41 deletions.
78 changes: 41 additions & 37 deletions src/org/runa/swarmiji/client/client_core.clj
@@ -1,15 +1,18 @@
(ns org.runa.swarmiji.client.client-core)

(use 'org.runa.swarmiji.mpi.sevak-proxy)
(use 'org.runa.swarmiji.mpi.transport)
(use 'org.runa.swarmiji.sevak.bindings)
(use 'org.runa.swarmiji.config.system-config)
(use 'org.runa.swarmiji.utils.general-utils)
(import '(java.io StringWriter))
(import '(org.runa.swarmiji.exception SevakErrors))
(use 'org.rathore.amit.utils.config)
(use 'org.rathore.amit.utils.logger)
(use 'org.rathore.amit.utils.clojure)
(ns org.runa.swarmiji.client.client-core
(:use
org.runa.swarmiji.mpi.sevak-proxy
org.runa.swarmiji.mpi.transport
org.runa.swarmiji.sevak.bindings
org.runa.swarmiji.config.system-config
org.runa.swarmiji.utils.general-utils
org.rathore.amit.utils.config
org.rathore.amit.utils.logger
org.rathore.amit.utils.clojure)
(:import
(java.io StringWriter)
(org.runa.swarmiji.exception SevakErrors)
(java.util.concurrent TimeoutException TimeUnit CountDownLatch)))


(def WORK-REPORT "WORK_REPORT")

Expand Down Expand Up @@ -59,25 +62,29 @@
(defn on-swarm [realtime? sevak-service & args]
(let [sevak-start (ref (System/currentTimeMillis))
total-sevak-time (ref nil)
latch (CountDownLatch. 1)
sevak-data (ref swarmiji-sevak-init-value)
complete? (fn [] (not (= swarmiji-sevak-init-value @sevak-data)))
success? (fn [] (= (:status @sevak-data) :success))
sevak-name (fn [] (sevak-name-from @sevak-data))
sevak-time (fn [] (time-on-server @sevak-data))
messaging-time (fn [] (- @total-sevak-time (sevak-time)))
on-swarm-response (fn [response]
(dosync (ref-set sevak-data (unserialized-response response)))
(do
(dosync (ref-set total-sevak-time (- (System/currentTimeMillis) @sevak-start)))
(if (and (swarmiji-diagnostics-mode?) (success?))
(send-work-report (sevak-name) args (sevak-time) (messaging-time) (return-q @sevak-data) (sevak-server-pid @sevak-data)))))
(let [response (unserialized-response response)]
(dosync
(ref-set sevak-data response)
(dosync (ref-set total-sevak-time (- (System/currentTimeMillis) @sevak-start))))
(if (and (swarmiji-diagnostics-mode?) (success?))
(send-work-report (sevak-name) args (sevak-time) (messaging-time) (return-q @sevak-data) (sevak-server-pid @sevak-data)))
(.countDown latch)))
on-swarm-proxy-client (new-proxy realtime? sevak-service args on-swarm-response)]
(fn [accessor]
(condp = accessor
:sevak-name sevak-service
:args args
:distributed? true
:sevak-type :sevak-with-return
:latch latch
:disconnect (disconnect-proxy on-swarm-proxy-client)
:complete? (complete?)
:value (response-value-from @sevak-data)
Expand All @@ -96,18 +103,16 @@
(new-proxy realtime? sevak-service args)
nil)

(defn all-complete? [swarm-requests]
(when (every? #(% :complete?) swarm-requests)
(doseq [r swarm-requests]
(when (r :distributed?)
(log-message "Received sevak response on" (r :sevak-name)
"for return-q" (return-q (r :__inner_ref)) "with elapsed time" (r :total-time))))
true))

(defn disconnect-all [swarm-requests]
(doseq [req swarm-requests]
(req :disconnect)))

(defn log-timeouts [swarm-requests]
(doseq [r swarm-requests]
(when (r :distributed?)
(log-message "Sevak response timed-out on" (r :sevak-name)
"for return-q" ((r :sevak-proxy) :queue)))))

(defn throw-exception [allowed-time]
(throw (RuntimeException. (str "Swarmiji reports: This operation has taken more than " allowed-time " milliseconds."))))

Expand All @@ -116,19 +121,18 @@
(wait-until-completion swarm-requests allowed-time throw-exception))
([swarm-requests allowed-time error-fn]
(let [start (System/currentTimeMillis)
elapsed-time #(- (System/currentTimeMillis) start)]
remaining-time #(- allowed-time (- (System/currentTimeMillis) start))
latches (map :latch swarm-requests)]
(try
(loop [all-complete (all-complete? swarm-requests)]
(if (> (elapsed-time) allowed-time)
(do
(doseq [r swarm-requests]
(when (r :distributed?)
(log-message "Sevak response timed-out on" (r :sevak-name)
"for return-q" ((r :sevak-proxy) :queue))))
(error-fn allowed-time))
(when-not all-complete
(Thread/sleep 5)
(recur (all-complete? swarm-requests)))))
(doseq [r swarm-requests]
(let [latch ^CountDownLatch (r :latch)]
(.await latch (remaining-time) TimeUnit/MILLISECONDS)
(when (r :distributed?)
(log-message "Received sevak response on" (r :sevak-name)
"for return-q" (return-q (r :__inner_ref)) "with elapsed time" (r :total-time)))))
(catch TimeoutException e
(log-timeouts swarm-requests)
(error-fn allowed-time))
(finally
(disconnect-all swarm-requests))))))

Expand Down
7 changes: 3 additions & 4 deletions src/org/runa/swarmiji/mpi/transport.clj
Expand Up @@ -43,13 +43,12 @@
(try
(custom-handler msg)
(finally
(.queueDelete chan return-q-name)
(.close chan)))))
(.queueDelete chan return-q-name)
(.close chan)))))
f (fn []
(send-message-on-queue (queue-sevak-q-name realtime?) request-object)
(on-response (delivery-from chan consumer)))]
(medusa-future-thunk return-q-name f)
(log-message "Client medusa stats:" (medusa-stats))
(f)
{:channel chan :queue return-q-name :consumer consumer}))

(defn add-to-rabbit-down-queue [realtime? return-queue-name custom-handler request-object]
Expand Down

0 comments on commit 18af3bc

Please sign in to comment.