-
Notifications
You must be signed in to change notification settings - Fork 2
/
core.clj
142 lines (123 loc) · 5.03 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
(ns clj-faktory.core
(:require [clojure.tools.logging :as log]
[cheshire.core :as cheshire]
[crypto.random :as random]
[pool.core :as pool]
[clj-faktory.protocol.transit :as transit]
[clj-faktory.socket :as socket])
(:import [java.util.concurrent Executors ScheduledThreadPoolExecutor ThreadFactory TimeUnit]))
(def ^:private registered-jobs (atom {}))
(defn- encode-transit-args [job]
(-> job
(update :args (comp vector transit/write))
(assoc-in [:custom :args-encoding] "transit")))
(defn- decode-transit-args [job]
(if (= (get-in job [:custom :args-encoding]) "transit")
(update job :args (comp transit/read first))
job))
(defn transit-args? [args]
(not= args (cheshire/parse-string (cheshire/generate-string args))))
(defn- send-command [conn-pool command]
(socket/with-conn [conn conn-pool]
(socket/send-command conn command)))
(defn fail [conn-pool jid e]
(send-command conn-pool [:fail {:jid jid
:message (.getMessage e)
:errtype (str (class e))
:backtrace (map #(.toString %) (.getStackTrace e))}]))
(defn beat [conn-pool wid]
(send-command conn-pool [:beat {:wid wid}]))
(defn ack [conn-pool jid]
(send-command conn-pool [:ack {:jid jid}]))
(defn fetch [conn-pool queues]
(send-command conn-pool (cons :fetch queues)))
(defn push [conn-pool job]
(send-command conn-pool [:push job]))
(defn perform-async
([{:keys [prio-pool]} job-type args opts]
(if (contains? @registered-jobs job-type)
(let [jid (random/hex 12)
job (cond-> (merge {:jid jid
:jobtype job-type
:args args
:queue "default"
:retry 25
:backtrace 10}
opts)
(transit-args? args) encode-transit-args)]
(push prio-pool job)
jid)
(throw (Exception. "Job type has not been registered"))))
([worker-manager job-type args]
(perform-async worker-manager job-type args {})))
(defn info [{:keys [conn-pool]}]
(send-command conn-pool [:info]))
(defn- run-work-loop [{:keys [conn-pool queues]} n]
(loop []
(log/debug "Worker" n "checking in")
(let [{:keys [jid] :as job} (decode-transit-args (fetch conn-pool queues))
[result e] (when job
(try
(if-let [handler-fn (get @registered-jobs (keyword (:jobtype job)))]
(apply handler-fn (:args job))
(throw (Exception. "No handler job type")))
[:success]
(catch InterruptedException e
[:stopped e])
(catch Throwable e
[:failure e])))]
(case result
:success (do (ack conn-pool jid)
(recur))
:failure (do (fail conn-pool jid e)
(recur))
:stopped (fail conn-pool jid e)
(recur)))))
(defn- keep-alive [{:keys [conn-pool prio-pool wid]} heartbeat]
(let [thread-factory (reify ThreadFactory
(newThread [_ runnable]
(doto (Thread. runnable)
(.setDaemon true))))]
(.scheduleWithFixedDelay (ScheduledThreadPoolExecutor. 1 thread-factory)
(fn []
(log/debug "❤❤❤")
(beat prio-pool wid)
(beat conn-pool wid))
heartbeat
heartbeat
TimeUnit/MILLISECONDS)))
(defn stop [{:keys [conn-pool prio-pool worker-pool :as worker-manager]}]
(try
(when-not (.awaitTermination worker-pool 2000 TimeUnit/MILLISECONDS)
(.shutdownNow worker-pool))
(catch InterruptedException e
(log/debug e)
(.shutdownNow worker-pool))
(finally
(pool/close conn-pool)
(pool/close prio-pool)))
worker-manager)
(defn start [{:keys [worker-pool concurrency] :as worker-manager}]
(dotimes [n concurrency]
(.submit worker-pool #(run-work-loop worker-manager n)))
worker-manager)
(def conn-pool socket/conn-pool)
(defn register-job [job-type handler-fn]
(if (keyword? job-type)
(swap! registered-jobs assoc job-type handler-fn)
(throw (Exception. "Job type must be a keyword"))))
(defn worker-manager
([conn-pool {:keys [concurrency
heartbeat
queues] :or {concurrency 10
heartbeat 10000
queues ["default"]}}]
(let [worker-pool (Executors/newFixedThreadPool concurrency)]
(keep-alive conn-pool heartbeat)
(merge conn-pool
{:worker-pool worker-pool
:concurrency concurrency
:heartbeat heartbeat
:queues queues})))
([conn-pool]
(worker-manager conn-pool {})))