Permalink
Browse files

A lot of cleanup, plus timeout handling

  • Loading branch information...
1 parent 13e12b4 commit 8833efb6c810689d87c6d898a1a609688d230d21 @andrewvc andrewvc committed Feb 8, 2012
View
@@ -8,5 +8,6 @@
[noir-async "0.1.2"]
[cheshire "2.0.4"]
[log4j/log4j "1.2.16"]
+ [org.slf4j/slf4j-simple "1.6.4"]
[com.ning/async-http-client "1.7.0"]
[org.clojure/tools.logging "0.2.3"]])
View
@@ -20,103 +20,104 @@
(stop [this])
(init-run [this])
(increment-and-check-run-count [this])
+ (check-result-recordability? [this])
(receive-result [this worker-id data])
(receive-error [this worker-id err])
- (hookup-workers [this])
(broadcast-at-interval [this millis]))
-(defrecord Benchmark [state max-runs run-count workers recorder output-ch broadcast-task]
+(defrecord Benchmark
+ [state max-runs run-count workers recorder output-ch broadcast-task]
Benchmarkable
(start [this]
(if (not (init-run this)) ; Guard against dupe runs
(io! (log/warn "Could not start, already started"))
(do
- (hookup-workers this)
(record-start recorder)
- (doseq [worker workers]
- (log/info (str "Starting worker: " worker))
+ (doseq [worker @workers]
(work worker))
(compare-and-set! broadcast-task nil
(broadcast-at-interval this 200))
)))
(stop [this]
- (println "STOPPING! " run-count)
- ; When this invocation actually stops it.
- (dosync (when (= @state :started) (ref-set state :stopped)))
- (record-end recorder)
- (doseq [worker workers]
- (compare-and-set! (:state worker) :started :stopped)))
+ (println "STOPPING! " run-count)
+ ; When this invocation actually stops it.
+ (dosync (when (= @state :started) (ref-set state :stopped)))
+ (record-end recorder)
+ (doseq [worker @workers]
+ (compare-and-set! (:state worker) :started :stopped)))
(init-run [this]
- (dosync
- (if (not= :stopped @state)
- false
- (do (ref-set state :started)
- true))))
+ (dosync
+ (if (not= :stopped @state)
+ false
+ (do (ref-set state :started)
+ true))))
(increment-and-check-run-count [this]
(dosync
(if (>= (ensure run-count) max-runs)
- :over
- (if (= max-runs (alter run-count inc))
- :thresh
- :under))))
-
- (hookup-workers [this]
- (doseq [worker workers]
- (let [{worker-ch :output-ch worker-id :worker-id} worker]
- (receive-all (:output-ch worker)
- (fn [{:keys [dtype data]}]
- (cond (= :worker-result dtype)
- (receive-result this worker-id data)
- (= :worker-error dtype)
- (receive-error this worker-id data)))))))
+ :over
+ (if (= max-runs (alter run-count inc))
+ :thresh
+ :under))))
- (receive-result
- [this worker-id result]
- ; If we haven't already stopped, increment the run-count. If we do increment it record the actual work
- (dosync
- (when (= @state :started)
- (let [thresh-status (increment-and-check-run-count this)]
- (record-result recorder worker-id result)
- (when (= :thresh thresh-status)
- (stop this))))))
+ (check-result-recordability? [this]
+ (dosync
+ (when (= @state :started)
+ (let [thresh-status (increment-and-check-run-count this)]
+ (cond (= :thresh thresh-status) (do (stop this) true)
+ (= :under thresh-status) true
+ :else false)))))
+
+ (receive-result [this worker-id result]
+ (when (check-result-recordability? this)
+ (record-result recorder worker-id result)))
- (receive-error [this worker-id err] (.printStackTrace err))
+ (receive-error [this worker-id err]
+ (log/warn err)
+ (when (check-result-recordability? this)
+ (record-error recorder worker-id err)))
(broadcast-at-interval [this millis]
- (set-interval millis
- (fn []
- (enqueue output-ch {:dtype "state" :data @state})
- (try
- (send-bench-msg output-ch :stats (processed-stats recorder))
- (catch Exception e
- (.printStackTrace e)
- (log/fatal e))))))
- )
-
+ (set-interval millis
+ (fn []
+ (enqueue output-ch {:dtype "state" :data @state})
+ (try
+ (send-bench-msg output-ch :stats (processed-stats recorder))
+ (catch Exception e
+ (.printStackTrace e)))))))
+
+(defn create-workers-for-benchmark
+ "Creates a vector of workers instantiated via worker-fn, which
+ must be afunction that takes a worker-id, a success handler and a failure
+ handler"
+ [worker-fn benchmark worker-count]
+ (compare-and-set! (:workers benchmark) nil
+ (vec (map (fn [worker-id]
+ (worker-fn worker-id
+ #(receive-result benchmark worker-id %1)
+ #(receive-error benchmark worker-id %1)))
+ (range worker-count)))))
+
(defn create-benchmark
"Create a new Benchmark record. This encapsulates the full benchmark state"
[worker-count max-runs worker-fn]
- (let [output-ch (channel)
- recorder (create-recorder)
- workers (vec (map (fn [worker-id] (worker-fn worker-id recorder))
- (range worker-count)))]
- (receive-all output-ch (fn [_] )) ; keep the channel empty if no listeners
- (Benchmark. (ref :stopped)
- max-runs (ref 0) workers
- recorder output-ch (atom nil))))
-
-(def client-type
- (if (= "aleph" (System/getenv "PARBENCH_CLIENT"))
- :aleph
- :ning))
-
-(println "Using " client-type " client")
-
-(defn create-single-url-benchmark [url concurrency requests]
- (let [worker-fn (partial create-single-url-worker client-type url)
+ (let [recorder (create-recorder)
+ benchmark (Benchmark. (ref :stopped)
+ max-runs
+ (ref 0) ; run-count
+ (atom nil) ; workers
+ recorder
+ (channel) ; output ch
+ (atom nil))] ; broadcast-task
+ (create-workers-for-benchmark worker-fn benchmark worker-count)
+ benchmark))
+
+(defn create-single-url-benchmark
+ "Create a new benchmark. You must call start on this to begin"
+ [url concurrency requests]
+ (let [worker-fn (partial create-single-url-worker :ning url)
benchmark (create-benchmark concurrency requests worker-fn)]
benchmark))
@@ -13,8 +13,7 @@
(enqueue (.success res-ch)
{:status status :content-type content-type})))
(onThrowable [e]
- (log/error e "We hit a throwable in Ning")
- (enqueue res-ch e))))
+ (enqueue (.error res-ch) e))))
(def method-prep-map
{:get (memfn prepareGet url)
@@ -29,7 +28,7 @@
(let [client (AsyncHttpClient.)]
(fn this
([request]
- (this request -1))
+ (this request 90000))
([{:keys [method url]} timeout]
(let [req-id (swap! id inc)
result (result-channel)
View
@@ -5,14 +5,14 @@
(defn record-avg-runtime-by-start-time [stats {:keys [req-start runtime]}]
(update-in stats
- [:avg-runtime-by-start-time (int (/ req-start 1000))]
+ [:avg-runtime-by-start-time (long (/ req-start 1000))]
(fn [bucket]
(let [rcount (+ 1 (get bucket :count 0))
total (+ runtime (get bucket :total 0))]
(merge bucket
{:count rcount
:total total
- :avg (int (/ total rcount))})))))
+ :avg (long (/ total rcount))})))))
(defn record-runtime [stats {:keys [runtime]}]
(update-in stats [:runtimes] #(conj %1 runtime)))
@@ -60,19 +60,23 @@
(record-end [this]
(compare-and-set! ended-at nil (System/currentTimeMillis)))
- (record-result [this worker-id data]
- (send stats
- (fn [statsd]
- (reduce
- (fn [v stat-fn] (stat-fn v data))
- statsd
- [record-avg-runtime-by-start-time
- record-runtime
- record-response-code-count
- record-run-succeeded]))))
+ (record-result
+ [this worker-id data]
+ (send stats
+ (fn [statsd]
+ (try
+ (reduce
+ (fn [v stat-fn] (stat-fn v data))
+ statsd
+ [record-avg-runtime-by-start-time
+ record-runtime
+ record-response-code-count
+ record-run-succeeded])
+ (catch Exception e
+ (.printStackTrace e))))))
(record-error [this worker-id err]
- (send stats increment-keys :runs-failed)))
+ (send stats increment-keys :runs-failed)) )
(defn- empty-stats []
{:started-at nil
@@ -5,69 +5,65 @@
[clojure.tools.logging :as log])
(:use [parbench.utils :only [send-bench-msg]]
noir-async.utils
- lamina.core))
+ lamina.core)
+ (:import java.util.concurrent.Executors))
+
+; Run callbacks in a cached thread pool for maximum throughput
+(def callback-pool (Executors/newCachedThreadPool))
(defprotocol Workable
"A worker aware of global job state"
+ (handle-success [this run-id req-start results])
+ (handle-error [this run-id req-start err])
(work [this] [this run-id] "Execute the job")
(exec-runner [this run-id] "Execute the runner associated with this worker"))
-(def tot-requests (atom 0))
-(def open-requests (atom 0))
-(def failures (atom 0))
+(defrecord UrlWorker [state url worker-id client succ-callback err-callback]
+ Workable
-(set-interval 1000
- (fn []
- (println
- " T: " tot-requests
- " O: " open-requests
- " F: " failures)))
+ (handle-success [this run-id req-start response]
+ (.submit callback-pool
+ #(succ-callback
+ (let [req-end (System/currentTimeMillis)]
+ {:worker-id worker-id
+ :run-id run-id
+ :req-start req-start
+ :req-end req-end
+ :runtime (- req-end req-start)
+ :response response})))
+ (work this (inc run-id)))
-(defrecord UrlWorker [state url worker-id client output-ch]
- Workable
+ (handle-error [this run-id req-start err]
+ (.submit callback-pool #(err-callback err))
+ (work this (inc run-id)))
(exec-runner [this run-id]
- (swap! tot-requests inc)
- (swap! open-requests inc)
(let [req-start (System/currentTimeMillis)
ch (client {:method :get :url url} 2000)]
- (on-success ch
- (fn [res]
- (swap! open-requests dec)
- (send-bench-msg output-ch :worker-result
- (let [req-end (System/currentTimeMillis)]
- {:worker-id worker-id
- :run-id run-id
- :req-start req-start
- :req-end req-end
- :runtime (- req-end req-start)
- :response res}))
- (work this (inc run-id))))
- (on-error ch
- (fn [err]
- (swap! failures inc)
- (println "STACK!")
- (.printStackTrace err)
- (send-bench-msg output-ch :worker-error err)
- (work this (inc run-id))))))
+ (on-success ch (partial handle-success this req-start run-id))
+ (on-error ch (partial handle-error this req-start run-id))))
- (work [this]
- (work this 0))
+ (work
+ [this]
+ (compare-and-set! state :initialized :started)
+ (work this 0))
- (work [this run-id]
- (compare-and-set! state :initialized :started)
- (when (= @state :started)
- (exec-runner this run-id))))
+ (work
+ [this run-id]
+ (when (= @state :started)
+ (exec-runner this run-id))))
(def aleph-client (atom nil))
(def ning-client (ning-http/http-client {}))
-(defn create-single-url-worker [client-type url worker-id recorder]
+(defn create-single-url-worker
+ [client-type url worker-id succ-callback err-callback]
(compare-and-set! aleph-client nil (aleph-http/http-client {:url url}))
(let [client (if (= :aleph client-type) @aleph-client
- ning-client)]
+ ning-client)]
(UrlWorker. (atom :initialized)
url
worker-id
client
- (channel))))
+ succ-callback
+ err-callback)))
View
@@ -14,7 +14,8 @@
(defn send-bench-msg
"Enqueue a message of the format {:dtype data-type :data data}
on channel ch. This uses the io! macro since its assumed bench
- messages will always hit clients"
+ messages
+will always hit clients"
[ch data-type data]
(io! (enqueue ch {:dtype data-type :data data})))
@@ -16,7 +16,7 @@
(def test-page-count (atom 0))
(defpage-async "/test" {} conn
- (set-timeout 500
+ (set-timeout 1
(fn []
(respond conn (str "Test Response #" (swap! test-page-count inc))))))

0 comments on commit 8833efb

Please sign in to comment.