Permalink
Browse files

updating to use explicit acking of rabbit messages

  • Loading branch information...
1 parent 85c05db commit 01dcd7c1f0cdc40d528df5358df4d042f4fde7eb @amitrathore committed Nov 16, 2010
@@ -48,8 +48,9 @@
(catch Exception e))))))
;no-op, this sevak-proxy should be aborted, thats it
-(defn unserialized-response [response-obj-string]
+(defn unserialized-response [[response-obj-string ack-fn]]
(try
+ (ack-fn)
(read-string response-obj-string)
(catch Exception e
(log-exception e (str "Read failed on " response-obj-string))
@@ -64,8 +65,8 @@
sevak-name (fn [] (sevak-name-from @sevak-data))
sevak-time (fn [] (time-on-server @sevak-data))
messaging-time (fn [] (- @total-sevak-time (sevak-time)))
- on-swarm-response (fn [response-object-string]
- (dosync (ref-set sevak-data (unserialized-response response-object-string)))
+ on-swarm-response (fn [response]
+ (dosync (ref-set sevak-data (unserialized-response response)))
(do
(dosync (ref-set total-sevak-time (- (System/currentTimeMillis) @sevak-start)))
(if (and (swarmiji-diagnostics-mode?) (success?))
@@ -34,7 +34,7 @@
(dosync (ref-set sevaks (assoc @sevaks seva-name# {:return Boolean/FALSE :fn (fn ~args (do ~@expr))})))
(def ~service-name (sevak-runner seva-name# Boolean/FALSE ~args))))
-(defn handle-sevak-request [service-name service-handler service-args]
+(defn handle-sevak-request [service-name service-handler service-args ack-fn]
(with-swarmiji-bindings
(try
(let [response-with-time (run-and-measure-timing
@@ -46,32 +46,34 @@
(throw ie))
(catch Exception e
(log-exception e (str "SEVAK ERROR! " (class e) " detected while running " service-name " with args: " service-args))
- {:exception (exception-name e) :stacktrace (stacktrace e) :status :error}))))
+ {:exception (exception-name e) :stacktrace (stacktrace e) :status :error})
+ (finally
+ (ack-fn)))))
-(defn async-sevak-handler [service-handler sevak-name service-args return-q]
+(defn async-sevak-handler [service-handler sevak-name service-args return-q ack-fn]
(with-swarmiji-bindings
(let [response (merge
{:return-q-name return-q :sevak-name sevak-name :sevak-server-pid (process-pid)}
- (handle-sevak-request sevak-name service-handler service-args))]
+ (handle-sevak-request sevak-name service-handler service-args ack-fn))]
(if (and return-q (:return service-handler))
(send-message-no-declare return-q response)))))
-(defn sevak-request-handling-listener [req-str]
+(defn sevak-request-handling-listener [req-str ack-fn]
(with-swarmiji-bindings
- (try
- (let [req (read-string req-str)
- service-name (req :sevak-service-name) service-args (req :sevak-service-args) return-q (req :return-queue-name)
- service-handler (@sevaks (keyword service-name))]
- (log-message "[ in-q pool completed" (number-of-queued-tasks) (current-pool-size) (completed-task-count) "]: Received request for" service-name "with args:" service-args "and return-q:" return-q)
+ (try
+ (let [req (read-string req-str)
+ service-name (req :sevak-service-name) service-args (req :sevak-service-args) return-q (req :return-queue-name)
+ service-handler (@sevaks (keyword service-name))]
+ (log-message "[ in-q pool completed" (number-of-queued-tasks) (current-pool-size) (completed-task-count) "]: Received request for" service-name "with args:" service-args "and return-q:" return-q)
(if (nil? service-handler)
(throw (Exception. (str "No handler found for: " service-name))))
- (let [f (medusa-future-thunk return-q #(async-sevak-handler service-handler service-name service-args return-q))]
- (when (> (number-of-queued-tasks) 10)
- (.get f))
+ (let [f (medusa-future-thunk return-q #(async-sevak-handler service-handler service-name service-args return-q ack-fn))]
+; (when (> (number-of-queued-tasks) 10)
+; (.get f))
f))
- (catch Exception e
- (log-message "Error in sevak-request-handling-listener:" (class e))
- (log-exception e)))))
+ (catch Exception e
+ (log-message "Error in sevak-request-handling-listener:" (class e))
+ (log-exception e)))))
(defn boot-sevak-server []
(log-message "Starting sevaks in" *swarmiji-env* "mode")
@@ -99,7 +101,8 @@
(with-swarmiji-bindings
(try
(log-message "Starting to serve sevak requests...")
- (start-queue-message-handler (queue-sevak-q-name) (queue-sevak-q-name) sevak-request-handling-listener)
+ (with-prefetch-count 300
+ (start-queue-message-handler (queue-sevak-q-name) (queue-sevak-q-name) sevak-request-handling-listener))
(log-message "Done with sevak requests!")
(catch Exception e
(log-message "Error in sevak-servicing future!")
@@ -4,6 +4,7 @@
(use '[clojure.contrib.duck-streams :only (spit)])
(import '(java.lang.management ManagementFactory))
(use 'org.rathore.amit.utils.clojure)
+(use 'org.rathore.amit.utils.rabbitmq)
(defn random-uuid []
(str (UUID/randomUUID)))
@@ -22,4 +23,8 @@
(first (.split m-name "@"))))
(defn simulate-serialized [hash-object]
- (read-string (str hash-object)))
+ (read-string (str hash-object)))
+
+(defmacro with-prefetch-count [prefetch-count & body]
+ `(binding [*PREFETCH-COUNT* ~prefetch-count]
+ (do ~@body)))

0 comments on commit 01dcd7c

Please sign in to comment.