-
Notifications
You must be signed in to change notification settings - Fork 5
/
core.clj
334 lines (280 loc) · 9.74 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
(ns tea-time.core
"Clocks and scheduled tasks. Provides functions for getting the current time
and running functions (Tasks) at specific times and periods. Includes a
threadpool for task execution, controlled by (start!) and (stop!)."
(:import [java.util.concurrent ConcurrentSkipListSet]
[java.util.concurrent.locks LockSupport])
(:require [clojure.stacktrace :refer [print-stack-trace]]
[clojure.tools.logging :refer [warn info]]))
(defprotocol Task
(succ [task]
"The successive task to this one.")
(run [task]
"Executes this task.")
(cancel! [task]
"Cancel this task."))
(defprotocol Deferrable
(defer! [this delay]
"Schedule a task for a new time measured in seconds from now")
(defer-micros! [this delay]
"Schedule a task for a new time measured in microseconds from now"))
;; The clock implementation ;;;;;;;;;;;;;;;;;;;;;
(defn real-unix-time-micros
"The current unix epoch time in microseconds, taken from
System/currentTimeMillis"
^long
[]
(* (System/currentTimeMillis) 1000))
(defn real-linear-time-micros
"A current time on a linear scale with no fixed epoch; counts in
microseconds. Unlike unix-time, which can pause, skip, or flow backwards,
advances mostly monotonically at (close) to physical time, one second per
second."
^long
[]
(long (/ (System/nanoTime) 1000)))
(defn micros->seconds
"Convert microseconds to seconds, as doubles."
^double
[t]
(/ t 1000000.0))
(defn seconds->micros
"Convert seconds to microseconds, as longs."
^long
[t]
(long (* t 1000000)))
(defn ^double real-unix-time
"The current unix epoch time in seconds, taken from System/currentTimeMillis"
^double
[]
(micros->seconds (real-unix-time-micros)))
(defn real-linear-time
"The current linear time in seconds, taken from System/nanoTime"
^double
[]
(micros->seconds (real-linear-time-micros)))
;; The clock API ;;;;;;;;;;;;;;;;;;;;;;;;;
(def unix-time-micros
"Rebindable alias for real-unix-time-micros"
real-unix-time-micros)
(def linear-time-micros
"Rebindable alias for real-linear-time-micros"
real-linear-time-micros)
(def unix-time
"Rebindable alias for real-unix-time"
real-unix-time)
(def linear-time
"Rebindable alias for real-linear-time"
real-linear-time)
;; More conversions ;;;;;;;;;;;;;;;;;;;;;
(defn unix-micros->linear-micros
"Converts an instant in the unix timescale to an instant on the linear
timescale, approximately."
^long
[^long unix]
(+ (linear-time-micros) (- unix (unix-time-micros))))
;; Global state ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
; TODO: pull this stuff out into some sort of configurable Scheduler datatype,
; and provide a global default?
(def max-task-id
(atom 0))
(def ^ConcurrentSkipListSet tasks
"Scheduled operations."
(ConcurrentSkipListSet.
(fn [a b] (compare [(:t a) (:id a)]
[(:t b) (:id b)]))))
(def thread-count
"Number of threads in the threadpool"
4)
(def park-interval-micros
"Time we might sleep when nothing is scheduled, in micros."
10000)
(def threadpool
(atom []))
(def running
"Whether the threadpool is currently supposed to be alive."
(atom false))
;; Scheduling guts ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn ceil
"Ceiling. For integers, identity. For other things, uses Math/ceil and
coerces to long."
[x]
(if (integer? x)
x
(long (Math/ceil x))))
(defn task-id
"Return a new task ID."
[]
(swap! max-task-id inc))
(defn next-tick
"Given a period dt, beginning at some point in time anchor, finds the next
tick after time now, such that the next tick is separate from anchor by an
exact multiple of dt. If now is omitted, defaults to (linear-time), and both
anchor and dt are in seconds. If now is passed, anchor, dt, and now can be in
any unit, so long as they all agree."
([anchor dt]
(next-tick anchor dt (linear-time)))
([anchor dt now]
(+ now (- dt (mod (- now anchor) dt)))))
; Look at all these bang! methods! Mutability is SO EXCITING!
(defn reset-tasks!
"Resets the task queue to empty, without triggering side effects."
[]
(.clear tasks))
(defn poll-task!
"Removes the next task from the queue."
[]
(.pollFirst tasks))
(defn schedule-sneaky!
"Schedules a task. Does *not* awaken any threads."
[task]
(.add tasks task)
task)
(defn schedule!
"Schedule a task. May awaken a thread from the threadpool to investigate."
[task]
(schedule-sneaky! task)
(when @running
(LockSupport/unpark (rand-nth @threadpool)))
task)
;; Task datatypes ;;;;;;;;;;;;;;;;;;;;;;;
(defrecord Once [id f ^long t cancelled]
Task
(succ [this] nil)
(run [this] (when-not @cancelled (f)))
(cancel! [this]
(reset! cancelled true)))
(defrecord Every [id f ^long t ^long interval deferred-t cancelled]
Task
(succ [this]
(when-not @cancelled
(let [next-time (or @deferred-t (+ t interval))]
(reset! deferred-t nil)
(assoc this :t next-time))))
(run [this]
(when-not (or @deferred-t @cancelled) (f)))
(cancel! [this]
(reset! cancelled true))
Deferrable
(defer! [this delay]
(micros->seconds (defer-micros! this (seconds->micros delay))))
(defer-micros! [this delay]
(reset! deferred-t (+ (linear-time-micros) delay))))
(defn at-linear-micros!
"Calls f at t microseconds on the linear timescale."
[t f]
(schedule! (Once. (task-id) f t (atom false))))
(defn at-unix-micros!
"Calls f at t microseconds on the unix timescale. We convert this time to the
linear timescale, so it may behave oddly across leap seconds."
[t f]
(at-linear-micros! (unix-micros->linear-micros t) f))
(defn at-unix!
"Calls f at t seconds on the unix timescale. We convert this time to
the linear timescale, so it may behave oddly across leap seconds."
[t f]
(at-unix-micros! (seconds->micros t) f))
(defn after!
"Calls f after delay seconds."
[delay f]
(schedule! (Once. (task-id)
f
(+ (linear-time-micros) (seconds->micros delay))
(atom false))))
(defn every!
"Calls f every interval seconds, after delay, also in seconds. If no delay is
provided, starts immediately."
([interval f]
(every! interval 0 f))
([interval delay f]
(assert (not (neg? delay)))
(schedule! (Every. (task-id)
f
(+ (linear-time-micros) (seconds->micros delay))
(seconds->micros interval)
(atom nil)
(atom false)))))
(defn run-tasks!
"While running, takes tasks from the queue and executes them when ready. Will
park the current thread when no tasks are available."
[i]
(while @running
(try
(if-let [task (poll-task!)]
; We've acquired a task.
(do
; (info "Task acquired")
(if (<= (:t task) (linear-time-micros))
; This task is ready to run
(do
;(info :task task :time (linear-time-micros))
; Run task
(try
(run task)
(catch Exception e
(warn e "Tea-Time task" task "threw"))
(catch AssertionError t
(warn t "Tea-Time task" task "threw")))
(when-let [task' (succ task)]
; Schedule the next task.
(schedule-sneaky! task')))
(do
; Return task.
(schedule-sneaky! task)
; Park until that task comes up next. We can't use parkUntil cuz
; it uses posix time which is non-monotonic. WHYYYYYY Note that
; we're sleeping 100 microseconds minimum, and aiming to wake up
; 1 ms before, so we have a better chance of actually executing
; on time.
(->> (- (:t task) (linear-time-micros) 1000)
(max 10)
(min park-interval-micros)
(* 1000)
LockSupport/parkNanos))))
; No task available; park for a bit and try again.
(LockSupport/parkNanos (* 1000 park-interval-micros)))
(catch Exception e
(warn e "tea-time task threw"))
(catch AssertionError t
(warn t "tea-time task threw")))))
(defn stop!
"Stops the task threadpool. Waits for threads to exit. Repeated calls to stop
are noops."
[]
(locking threadpool
(when @running
(reset! running false)
(while (some #(.isAlive ^Thread %) @threadpool)
; Allow at most 1/10th park-interval to pass after all threads exit.
(Thread/sleep (/ park-interval-micros 10000)))
(reset! threadpool []))))
(defn start!
"Starts the threadpool to execute tasks on the queue automatically. Repeated
calls to start are noops."
[]
(locking threadpool
(when-not @running
(reset! running true)
(reset! threadpool
(map (fn [i]
(let [^Runnable f (bound-fn [] (run-tasks! i))]
(doto (Thread. f (str "Tea-Time " i))
(.start))))
(range thread-count))))))
(def threadpool-users
"Number of callers who would like a threadpool open right now"
(atom 0))
(defmacro with-threadpool
"Ensures the threadpool is running within `body`, which is evaluated in an
implicit `do`. Multiple threads can call with-threadpool
concurrently. If any thread is within `with-threadpool`, the pool will run,
and when no threads are within `with-threadpool`, the pool will shut down.
You'll probably put this in the main entry points to your program, so the
threadpool runs for the entire life of the program."
[& body]
`(try (when (= 1 (swap! threadpool-users inc))
(start!))
~@body
(finally
(when (= 0 (swap! threadpool-users dec))
(stop!)))))