-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcore_test.clj
340 lines (314 loc) · 15 KB
/
core_test.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
(ns mysql-queue.core-test
(:require [clojure.test :refer :all]
[clojure.java.jdbc :as sql]
[clojure.set :as clj-set]
[clojure.string :as string]
[mysql-queue.core :refer :all]
[mysql-queue.queries :as queries]))
(Thread/setDefaultUncaughtExceptionHandler
(reify Thread$UncaughtExceptionHandler
(uncaughtException [_ thread throwable]
(println "WARNING!!! Uncaught exception in core async:")
(println throwable))))
(def db-conn {:subprotocol "mysql"
:subname "//localhost:3306/clj_mysql_queue?useSSL=false"
:user "root"
:password ""})
(defn delete-scheduled-jobs-by-name!
[db-conn job-name]
(sql/delete! db-conn :scheduled_jobs ["name = ?" job-name]))
(defn count-jobs
[db-conn]
(sql/query db-conn ["SELECT COUNT(*) AS c FROM jobs"] {:result-set-fn (comp :c first)}))
(defn queue-size
[db-conn]
(sql/query db-conn ["SELECT COUNT(*) AS c FROM scheduled_jobs"] {:result-set-fn (comp :c first)}))
(defn setup-db
[f]
(initialize! db-conn)
(f))
(defn clean-up
[f]
(delete-scheduled-jobs-by-name! db-conn "test-foo")
(delete-scheduled-jobs-by-name! db-conn "slow-job")
(delete-scheduled-jobs-by-name! db-conn "quick-job")
(f))
(use-fixtures :once setup-db)
(use-fixtures :each clean-up)
(defmacro with-worker
[[bound-name expr :as args] & body]
{:pre (= 2 (count args))}
`(let [~bound-name ~expr]
(try
~@body
(finally
(stop ~bound-name 2)))))
(defn check-in-atom [expected-set success-promise]
(doto (atom [])
(add-watch :test (fn [_ _ _ v]
(when (= (set v) expected-set)
(deliver success-promise true))))))
(deftest job-processing-test
(let [num-jobs 100
expected-set (->> num-jobs range (map inc) (into #{}))
success? (promise)
exception (promise)
check-ins (check-in-atom expected-set success?)
jobs {:test-foo (fn [status {id :id :as args}]
(Thread/sleep 10)
(swap! check-ins conj id)
[:done args])}
_ (dotimes [n num-jobs]
(schedule-job db-conn :test-foo :begin {:id (inc n)} (java.util.Date.)))]
(with-worker [wrk (worker db-conn
jobs
:num-consumer-threads 1
:err-fn #(deliver exception %)
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(is (deref success? 15000 false)
(str "Failed to process " num-jobs " test jobs in 15 seconds.\n"
"Missing job IDs: " (clj-set/difference expected-set @check-ins) "\n"
"Exception?: " (deref exception 0 "nope")))
(is (= num-jobs (count @check-ins))
"The number of executed jobs doesn't match the number of jobs queued."))))
(deftest parallel-job-processing-test
(let [num-jobs 100
expected-set (->> num-jobs range (map inc) (into #{}))
success? (promise)
exception (promise)
check-ins (check-in-atom expected-set success?)
jobs {:test-foo (fn [status {id :id :as args}]
(Thread/sleep 10)
(swap! check-ins conj id)
[:done args])}
_ (dotimes [n num-jobs]
(schedule-job db-conn :test-foo :begin {:id (inc n)} (java.util.Date.)))]
(with-worker [wrk (worker db-conn
jobs
:prefetch 4
:num-consumer-threads 4
:err-fn #(deliver exception %)
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(is (deref success? 15000 false)
(str "Failed to process " num-jobs " test jobs in 15 seconds.\n"
"Missing job IDs: " (clj-set/difference expected-set @check-ins) "\n"
"Exception?: " (deref exception 0 "nope")))
(is (= num-jobs (count @check-ins))
"The number of executed jobs doesn't match the number of jobs queued."))))
(deftest unbalanced-parallel-job-processing-test
(let [num-slow-jobs 1
num-quick-jobs 5
expected-slow-set (->> num-slow-jobs range (map inc) (into #{}))
expected-quick-set (->> num-quick-jobs range (map inc) (into #{}))
quick-success? (promise)
slow-success? (promise)
exception (promise)
slow-check-ins (check-in-atom expected-slow-set slow-success?)
quick-check-ins (check-in-atom expected-quick-set quick-success?)
jobs {:quick-job (fn [status {id :id :as args}]
(swap! quick-check-ins conj id)
[:done args])
:slow-job (fn [status {id :id :as args}]
(when (deref quick-success? 2000 false)
(swap! slow-check-ins conj id))
[:done args])}
_ (dotimes [n num-slow-jobs]
(schedule-job db-conn :slow-job :begin {:id (inc n)} (java.util.Date.)))
_ (dotimes [n num-quick-jobs]
(schedule-job db-conn :quick-job :begin {:id (inc n)} (java.util.Date.)))]
(with-worker [wrk (worker db-conn
jobs
:prefetch 3
:num-consumer-threads 2
:err-fn #(deliver exception %)
:max-scheduler-sleep-interval 0.1)]
(is (deref slow-success? 2000 false)
(str "Failed to process 1 slow job and " num-quick-jobs
" quick jobs in 2 seconds.\n"
"Missing slow job IDs: " (clj-set/difference expected-slow-set
@slow-check-ins) "\n"
"Missing quick job IDs: " (clj-set/difference expected-quick-set
@quick-check-ins) "\n"
"Exception?: " (deref exception 0 "nope")))
(is (= num-slow-jobs (count @slow-check-ins))
"The number of executed slow jobs doesn't match the number of jobs queued.")
(is (= num-quick-jobs (count @quick-check-ins))
"The number of executed quick jobs doesn't match the number of jobs queued."))))
(deftest distributed-job-processing-test
(let [num-jobs 100
expected-set (->> num-jobs range (map inc) (into #{}))
success? (promise)
exception (promise)
check-ins (check-in-atom expected-set success?)
jobs {:test-foo (fn [status {id :id :as args}]
(Thread/sleep 10)
(swap! check-ins conj id)
[:done args])}
_ (dotimes [n num-jobs]
(schedule-job db-conn :test-foo :begin {:id (inc n)} (java.util.Date.)))]
(with-worker [wrk-1 (worker db-conn
jobs
:prefetch 4
:num-consumer-threads 2
:err-fn #(deliver exception %)
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(with-worker [wrk-2 (worker db-conn
jobs
:prefetch 4
:num-consumer-threads 2
:err-fn #(deliver exception %)
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(is (deref success? 15000 false)
(str "Failed to process " num-jobs " test jobs in 15 seconds.\n"
"Missing job IDs: " (clj-set/difference expected-set @check-ins) "\n"
"Exception?: " (deref exception 0 "nope")))
(is (= num-jobs (count @check-ins))
"The number of executed jobs doesn't match the number of jobs queued.")))))
(deftest stuck-job-processing-test
(let [num-jobs 100
expected-set (->> num-jobs range (map inc) (into #{}))
success? (promise)
exception (promise)
check-ins (check-in-atom expected-set success?)
jobs {:test-foo (fn [status {id :id :as args}]
(Thread/sleep 10)
(swap! check-ins conj id)
[:done args])}
_ (dotimes [n num-jobs]
(let [scheduled-id (schedule-job db-conn :test-foo :begin {:id (inc n)} (java.util.Date.))]
(queries/insert-job<! db-conn scheduled-id 0 "test-foo" "begin" (pr-str {:id (inc n)}) 1 (java.util.Date.))))]
(with-worker [wrk (worker db-conn
jobs
:num-consumer-threads 1
:err-fn #(deliver exception %)
:recovery-threshold-mins 0
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(is (deref success? 15000 false)
(str "Failed to process " num-jobs " test jobs in 15 seconds.\n"
"Missing job IDs: " (clj-set/difference expected-set @check-ins) "\n"
"Exception?: " (deref exception 0 "nope")))
(is (= num-jobs (count @check-ins))
"The number of executed jobs doesn't match the number of jobs queued."))))
(deftest stuck-job-max-attempts-test
(let [jobs {:test-foo #(throw (Exception. "This job should not have been executed, because it reached max attempts."))}
scheduled-id (schedule-job db-conn :test-foo :begin {} (java.util.Date. 0))]
(queries/insert-job<! db-conn scheduled-id 0 "test-foo" "begin" (pr-str {}) 5 (java.util.Date. 0))
(is (= 1 (count (queries/select-n-stuck-jobs db-conn ultimate-job-states ["test-foo"] [0] 5 5))))
(is (= 1 (count-jobs db-conn)))
(with-worker [wrk (worker db-conn
jobs
:num-consumer-threads 1
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(Thread/sleep 1000)
(is (zero? (count (queries/select-n-stuck-jobs db-conn ultimate-job-states ["test-foo"] [0] 5 5))))
(is (zero? (count-jobs db-conn))))))
(deftest job-timeout-test
(let [attempt (atom 0)
jobs {:test-foo (fn [_ _]
(swap! attempt inc)
(when (= 1 @attempt)
(Thread/sleep 10000))
[:done {}])}]
(schedule-job db-conn :test-foo :begin {} (java.util.Date.))
(with-worker [wrk (worker db-conn
jobs
:num-consumer-threads 1
; run scheduler every 0.5s
:max-scheduler-sleep-interval 0.5
; terminate jobs that take over 1 second
:job-timeout-mins (/ 1 60))]
(Thread/sleep 3000)
(is (= 2 @attempt)))))
(deftest graceful-shutdown-test
(let [num-jobs 2
expected-set (->> num-jobs range (map inc) (into #{}))
success? (promise)
exception (promise)
lock (promise)
check-ins (check-in-atom expected-set success?)
jobs {:test-foo (fn [status {id :id :as args}]
@lock
(Thread/sleep 1500)
(swap! check-ins conj id)
[:done args])}
_ (dotimes [n num-jobs]
(schedule-job db-conn :test-foo :begin {:id (inc n)} (java.util.Date.)))]
(with-worker [wrk (worker db-conn
jobs
:num-consumer-threads 2
:err-fn #(deliver exception %)
:recovery-threshold-mins 0
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 0.5)]
(deliver lock :unlocked)
(Thread/sleep 1000))
(is (deref success? 10 false)
(str "Failed to finish " num-jobs " test jobs taking 1500ms with 2s quit timeout.\n"
"Missing job IDs: " (clj-set/difference expected-set @check-ins) "\n"
"Exception?: " (deref exception 0 "nope")))
(is (= num-jobs (count @check-ins))
"The number of executed jobs doesn't match the number of jobs queued.")))
(deftest status-test
(let [num-jobs 100
expected-set (->> num-jobs range (map inc) (into #{}))
success? (promise)
exception (promise)
check-ins (check-in-atom expected-set success?)
jobs {:test-foo (fn [status {id :id :as args}]
(Thread/sleep 20)
(swap! check-ins conj id)
[:done args])}]
(with-worker [wrk (worker db-conn
jobs
:num-consumer-threads 1
:err-fn #(deliver exception %)
:max-scheduler-sleep-interval 0.5
:max-recovery-sleep-interval 2)]
; Initial status
(let [{{:keys [scheduled-jobs jobs]} :db-queue
:keys [consumers prefetched-jobs]}
(status wrk)]
(is (= 1 (count consumers)))
(is (zero? (:overdue scheduled-jobs)))
(is (zero? (:total scheduled-jobs)))
(is (zero? (:stuck jobs)))
(is (zero? (:total jobs)))
(is (empty? prefetched-jobs)))
; Publishing jobs
(dotimes [n num-jobs]
(schedule-job db-conn :test-foo :begin {:id (inc n)} (java.util.Date.)))
; Publishing one stuck job
(let [scheduled-id (schedule-job db-conn :test-foo :begin {:id 1} (java.util.Date. 0))]
(queries/insert-job<! db-conn scheduled-id 0 "test-foo" "begin" (pr-str {:id 1}) 1 (java.util.Date. 0)))
; Status after publishing
(let [{{:keys [scheduled-jobs jobs]} :db-queue
:keys [consumers]
:as status}
(status wrk)]
(is (= 1 (count consumers)))
(is (<= 1 (:overdue scheduled-jobs) 101))
(is (<= 1 (:total scheduled-jobs) 101))
(is (<= 0 (:stuck jobs) 1))
(is (<= 0 (:total jobs) 50)))
(is (deref success? 15000 false) "Failed to process jobs in time. Test results below depend on this.")
; Extra sleep time to let stuck job processing finish
(Thread/sleep 1000)
; Status after finished
(let [{{:keys [scheduled-jobs jobs]} :db-queue
:keys [consumers recent-jobs recent-jobs-stats]
:as status}
(status wrk)]
(is (= 1 (count consumers)))
(is (zero? (:overdue scheduled-jobs)))
(is (zero? (:total scheduled-jobs)))
(is (zero? (:stuck jobs)))
(is (zero? (:total jobs)))
(is (= 50 (count recent-jobs)))
(is (= 50 (get-in recent-jobs-stats [:job-types :test-foo])))
(is (= 101 (:jobs-executed (first consumers))))))))