/
worker.clj
144 lines (128 loc) · 4.68 KB
/
worker.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
(ns clj-faktory.worker
(:require [cheshire.core :as cheshire]
[clj-faktory.client :as client]
[clj-faktory.protocol.transit :as transit]
[clojure.java.shell :as shell]
[clojure.string :as string]
[clojure.tools.logging :as log]
[crypto.random :as random])
(:import [java.net InetAddress]
[java.util.concurrent Executors ScheduledThreadPoolExecutor ThreadFactory TimeUnit]))
(def registered-jobs
(atom {}))
(def daemon-thread-factory
(reify ThreadFactory
(newThread [_ runnable]
(doto (Thread. runnable)
(.setDaemon true)))))
(defn- encode-transit-args [job]
(-> job
(update :args (comp vector transit/write))
(assoc-in [:custom :args-encoding] "transit")))
(defn- decode-transit-args [job]
(if (= (get-in job [:custom :args-encoding]) "transit")
(update job :args (comp transit/read first))
job))
(defn transit-args? [args]
(not= args (cheshire/parse-string (cheshire/generate-string args))))
(defn- hostname []
(or (try (.getHostName (InetAddress/getLocalHost))
(catch Exception _))
(some-> (shell/sh "hostname")
(:out)
(string/trim))))
(defn- worker-info []
{:wid (random/hex 12)
:hostname (hostname)
:v 2})
(defn- keep-alive [conn pool wid heartbeat]
(.scheduleWithFixedDelay pool
(fn []
(client/beat conn wid))
heartbeat
heartbeat
TimeUnit/MILLISECONDS))
(defn connect
([uri worker-info]
(.connect (client/connection uri worker-info)))
([uri]
(connect uri (worker-info))))
(defn- run-work-loop [worker]
(let [conn (connect (get-in worker [::opts :uri]) (::info worker))
queues (get-in worker [::opts :queues])]
(loop []
(when-not @(::stopped? worker)
(when-let [{:keys [jid] :as job} (decode-transit-args (client/fetch conn queues))]
(try
(if-let [handler-fn (get @registered-jobs (keyword (:jobtype job)))]
(do (apply handler-fn (:args job))
(client/ack conn jid))
(throw (Exception. "No handler job type")))
(catch Throwable e
(log/warn e)
(client/fail conn jid e)))))
(when-not @(::stopped? worker)
(recur)))))
(defn register-job [job-type handler-fn]
(if (keyword? job-type)
(swap! registered-jobs assoc job-type handler-fn)
(throw (Exception. "Job type must be a keyword"))))
(defn perform-async
([worker job-type args opts]
(if (contains? @registered-jobs job-type)
(let [jid (random/hex 12)
job (cond-> (merge {:jid jid
:jobtype job-type
:args args
:queue "default"
:retry 25
:backtrace 10}
opts)
(transit-args? args) (encode-transit-args))]
(client/push (::conn worker) job)
jid)
(throw (Exception. "Job type has not been registered"))))
([worker job-type args]
(perform-async worker job-type args {})))
(defn info [worker]
(client/info (::conn worker)))
(defn worker [uri opts]
(let [{:keys [concurrency heartbeat]
:as opts} (merge {:concurrency 10
:queues ["default"]
:heartbeat 10000} opts)
info (worker-info)
conn (connect uri info)
beat-pool (ScheduledThreadPoolExecutor. 1 daemon-thread-factory)
beat-conn (connect uri info)
work-pool (Executors/newFixedThreadPool concurrency)]
(keep-alive conn beat-pool (:wid info) heartbeat)
{::info info
::opts (assoc opts :uri uri)
::conn conn
::beat-pool beat-pool
::beat-conn beat-conn
::work-pool work-pool
::stopped? (atom false)}))
(defn stop [{:keys [::work-pool ::beat-pool ::beat-conn ::conn] :as worker}]
(log/debug "Stopping worker")
(try
(reset! (::stopped? worker) true)
(.shutdownNow beat-pool)
(.shutdown work-pool)
(.shutdownNow work-pool)
(when-not (.awaitTermination work-pool 10000 TimeUnit/MILLISECONDS)
(throw (Exception. "Could not shut down work pool properly")))
(catch InterruptedException _
(.shutdownNow work-pool)
(.interrupt (Thread/currentThread)))
(finally
(.close @conn)
(.close @beat-conn)))
worker)
(defn start [worker]
(when @(::stopped? worker)
(throw (Exception. "A stopped worker cannot be started again")))
(dotimes [_ (get-in worker [::opts :concurrency])]
(.submit (::work-pool worker) #(run-work-loop worker)))
worker)