Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

removing request-handling threadpool

  • Loading branch information...
commit c0813345821f773660f764f989015671001ce499 1 parent e1c7f46
@amitrathore authored
Showing with 19 additions and 20 deletions.
  1. +19 −20 src/org/runa/swarmiji/sevak/sevak_core.clj
View
39 src/org/runa/swarmiji/sevak/sevak_core.clj
@@ -9,7 +9,6 @@
(use 'org.rathore.amit.utils.config)
(use 'org.rathore.amit.utils.logger)
(use 'org.rathore.amit.utils.clojure)
-(use 'org.rathore.amit.medusa.core)
(def sevaks (ref {}))
@@ -71,35 +70,35 @@
(let [req (read-string req-str)
service-name (req :sevak-service-name) service-args (req :sevak-service-args) return-q (req :return-queue-name)
service-handler (@sevaks (keyword service-name))]
- (log-message "[ in-q pool completed" (number-of-queued-tasks) (current-pool-size) (completed-task-count) "]: Received request for" service-name "with args:" service-args "and return-q:" return-q)
- (if (nil? service-handler)
- (throw (Exception. (str "No handler found for: " service-name))))
- (medusa-future-thunk return-q #(async-sevak-handler service-handler service-name service-args return-q ack-fn)))
+ (log-message "Received request for" service-name "with args:" service-args "and return-q:" return-q)
+ (if (nil? service-handler)
+ (throw (Exception. (str "No handler found for: " service-name))))
+ (async-sevak-handler service-handler service-name service-args return-q ack-fn))
(catch Exception e
(log-message "Error in sevak-request-handling-listener:" (class e))
(log-exception e)))))
-(defn start-processor [routing-key start-log-message]
- (future
- (with-swarmiji-bindings
- (try
- (log-message start-log-message)
- (with-prefetch-count (rabbitmq-prefetch-count)
- (start-queue-message-handler routing-key routing-key sevak-request-handling-listener))
- (log-message "Done with sevak requests!")
- (catch Exception e
- (log-message "Error in sevak-servicing future!")
- (log-exception e))))))
+(defn start-processors [routing-key number-of-processors start-log-message]
+ (let [processor #(with-swarmiji-bindings
+ (try
+ (log-message "Thread #[" % "]" start-log-message)
+ (with-prefetch-count (rabbitmq-prefetch-count)
+ (start-queue-message-handler routing-key routing-key sevak-request-handling-listener))
+ (log-message "Done with sevak requests!")
+ (catch Exception e
+ (log-message "Error in sevak-servicing future!")
+ (log-exception e))))]
+ (dotimes [n number-of-processors]
+ (.start (Thread. #(processor n))))))
+
(defn boot-sevak-server []
(log-message "Starting sevaks in" *swarmiji-env* "mode")
(log-message "System config:" (operation-config))
- (log-message "Medusa server threads:" (medusa-server-thread-count))
(log-message "Medusa client threads:" (medusa-client-thread-count))
(log-message "RabbitMQ prefetch-count:" (rabbitmq-prefetch-count))
(log-message "Sevaks are offering the following" (count @sevaks) "services:" (keys @sevaks))
(init-rabbit)
- (init-medusa (medusa-server-thread-count))
;(send-message-on-queue (queue-diagnostics-q-name) {:message_type START-UP-REPORT :sevak_server_pid (process-pid) :sevak_name SEVAK-SERVER})
(future
@@ -114,6 +113,6 @@
(log-message "Error in update broadcasts future!")
(log-exception e))))))
- (start-processor (queue-sevak-q-name true) "Starting to serve realtime sevak requests..." )
- (start-processor (queue-sevak-q-name false) "Starting to serve non-realtime sevak requests..." )
+ (start-processors (queue-sevak-q-name true) 10 "Starting to serve realtime sevak requests..." )
+ (start-processors (queue-sevak-q-name false) 10 "Starting to serve non-realtime sevak requests..." )
(log-message "Sevak Server Started!"))
Please sign in to comment.
Something went wrong with that request. Please try again.