forked from andrewvc/engulf
/
url_worker.clj
69 lines (59 loc) · 2.44 KB
/
url_worker.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
(ns parbench.url-worker
(:require [parbench.runner :as runner]
[parbench.ning-client :as ning-http]
[aleph.http :as aleph-http]
[clojure.tools.logging :as log])
(:use [parbench.utils :only [send-bench-msg]]
noir-async.utils
lamina.core)
(:import java.util.concurrent.Executors))
; Run callbacks in a cached thread pool for maximum throughput
(def callback-pool (Executors/newCachedThreadPool))
(defprotocol Workable
"A worker aware of global job state"
(handle-success [this run-id req-start results])
(handle-error [this run-id req-start err])
(work [this] [this run-id] "Execute the job")
(exec-runner [this run-id] "Execute the runner associated with this worker"))
(defrecord UrlWorker [state url worker-id client succ-callback err-callback]
Workable
(handle-success [this run-id req-start response]
(.submit callback-pool
#(succ-callback
(let [req-end (System/currentTimeMillis)]
{:worker-id worker-id
:run-id run-id
:req-start req-start
:req-end req-end
:runtime (- req-end req-start)
:response response})))
(work this (inc run-id)))
(handle-error [this run-id req-start err]
(.submit callback-pool #(err-callback err))
(work this (inc run-id)))
(exec-runner [this run-id]
(let [req-start (System/currentTimeMillis)
ch (client {:method :get :url url} 2000)]
(on-success ch (partial handle-success this req-start run-id))
(on-error ch (partial handle-error this req-start run-id))))
(work
[this]
(compare-and-set! state :initialized :started)
(work this 0))
(work
[this run-id]
(when (= @state :started)
(exec-runner this run-id))))
(def aleph-client (atom nil))
(def ning-client (ning-http/http-client {}))
(defn create-single-url-worker
[client-type url worker-id succ-callback err-callback]
(compare-and-set! aleph-client nil (aleph-http/http-client {:url url}))
(let [client (if (= :aleph client-type) @aleph-client
ning-client)]
(UrlWorker. (atom :initialized)
url
worker-id
client
succ-callback
err-callback)))