-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.clj
454 lines (406 loc) · 15 KB
/
task.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
(ns halfling.task
(:import (clojure.lang IMeta IPending IBlockingDeref IDeref)
(java.util.concurrent Future)
(java.io Writer)))
(declare task
run
run-async
wait
mapply
then
recover
get!
get-or-else
fulfilled?
broken?
done?
executed?
attempt
task?
execute
execute-par
remap
do-tasks)
(def ^:const serial :serial)
(def ^:const parallel :parallel)
(def ^:private ^:const successful :success)
(def ^:private ^:const failed :failed)
(deftype Task [exec future actions recovery]
IMeta (meta [_] {:type Task})
IDeref (deref [this] (get! this))
IBlockingDeref (deref [this timeout else] (get! this timeout else)))
(defmethod print-method Task [tsk ^Writer writer]
(letfn [(write! [status value]
(.write writer (str "#Task"
{:executed? (executed? tsk)
:status status
:value value})))]
(cond
(fulfilled? tsk) (write! successful (get! tsk))
(broken? tsk) (write! failed (get! tsk))
:else (write! :pending nil))))
(defrecord Result [status value])
(defn- is-task? [task fn]
(assert (task? task) (str "Input to `" fn "` must be a `Task`")))
(defn- all-tasks? [tasks fn]
(assert (every? task? tasks) (str "All values provided to `" fn "` must be `Task`s")))
(defn const-future
"Wraps a value in a completed `Future` object."
{:added "1.0.0"}
[value]
(reify
IDeref
(deref [_] value)
IBlockingDeref
(deref [_ _ _] value)
IPending
(isRealized [_] true)
Future
(get [_] value)
(get [_ _ _] value)
(isDone [_] true)
(isCancelled [_] false)
(cancel [_ _] false)))
(defn- task?
"Returns `true` if `thing` is of type `Task`, `false` otherwise."
{:added "1.0.0"}
[thing] (= Task (type thing)))
(defn- purely
"Given any value `a`, returns a task containing it."
{:added "1.0.0"}
[a] (Task. serial a [] nil))
(defn- pure
"Given a `Result`, returns a `task` containing it."
{:added "1.0.0"}
[^Result result] (purely (const-future result)))
(defn- fail
"Returns a failed `Result`."
{:added "1.0.0"}
[error]
(Result. failed error))
(defn- succeed
"Returns a successful `Result`."
{:added "1.0.0"}
[value]
(Result. successful value))
(defn- failure?
"Returns `true` if `result` is failed, `false` otherwise."
{:added "1.0.0"}
[^Result result]
(= :failed (:status result)))
(defn- success?
"Returns `true` if `result` is successful, `false` otherwise."
{:added "1.0.0"}
[^Result result]
(= :success (:status result)))
(defn- peer
"Returns the `Result` of `task`, which contains the value and whether
it was successful or not. Blocks `task` until the task is realised.
If you want the concrete value of `Result`, see `get!`.
Note: Doesn't run the task!"
{:added "1.0.0"}
[^Task task] (is-task? task "peer")
@(.future task))
(defn- remap
"Changes the attributes of `task` as specified by
the mappings in `map-f`.
`map-f` may contain the following:
{:future a function applied on the current future, that returns a new one
:actions a function applied on the current collection of actions, that returns a new one
:exec a value from #{:serial, :parallel}
:recovery a recovery function}
Returns a new task containing the change specified in `map-f`."
{:added "1.0.0"}
[map-f ^Task task] (is-task? task "remap")
(let [f (:future map-f identity)
g (:actions map-f identity)
exec (:exec map-f (.exec task))
recovery (:recovery map-f (.recovery task))]
(Task. exec (f (.future task)) (g (.actions task)) recovery)))
(defmacro attempt
"Safely runs a `body` in a `try` block
and captures its outcome in a `Result`.
In case of a success, the result will look like:
{:status :success
:value [<result of computation>]}
In case of a failure, the result will look like:
{:status :failure
:value [<throwable/exception object>]}"
{:added "1.0.0"
:revised "1.2.0"}
[& body]
`(try (succeed (do ~@body))
(catch Exception e# (fail e#))))
(defmacro task
"Takes and number of expressions or actions and
returns a task that will lazily evaluate them."
{:added "1.0.0"}
[& actions]
`(Task. serial
(const-future ~(succeed nil))
[(fn [x#] ~(cons 'do actions))]
nil))
(defn- execute
"Executes a `task` blockingly until it finishes.
Returns a `Result` of that execution."
{:added "1.0.0"}
[^Task task] (is-task? task "execute")
(letfn [(recoverable? [result] (and (failure? result) (.recovery task)))
(parallel-task? [tsk] (and (task? tsk) (= parallel (.exec tsk))))]
(loop [result (peer task)
actions (.actions task)]
(let [[f & fs] actions
value (:value result)
recover (.recovery task)]
(cond
(recoverable? result) (execute (halfling.task/task (recover value)))
(failure? result) result
(parallel-task? value) (recur (execute-par value) actions)
(task? value) (recur (execute value) actions)
(nil? f) result
:else (recur (attempt (f value)) fs))))))
(defn- execute-par
"A `task` executed by `execute-par` will contain as payload a collection
of other tasks that are to be executed in parallel.
Tries to execute that payload in parallel, whilst blocking the current thread
until they finish.
Returns a `Result` of that execution."
{:added "1.0.0"}
[^Task task] (is-task? task "execute-par")
(letfn [(error [tasks] (->> tasks (filter broken?) (first) (get!)))
(recoverable? [tasks] (and (some broken? tasks) (.recovery task)))
(recover [tasks] (halfling.task/task ((.recovery task) (error tasks))))]
(let [[compose & actions] (.actions task)
tasks (->> (get! task)
(mapv run-async)
(mapv wait))]
(cond (every? fulfilled? tasks)
(->> tasks
(mapv get!)
(apply compose)
(succeed)
(pure)
(remap {:actions (constantly actions)})
(run)
(peer))
(recoverable? tasks) (peer (run (recover tasks)))
:else (fail (error tasks))))))
(defn success
"Given some `value`, returns a realised successful task containing the given `value`"
{:added "1.0.0"}
[value] (pure (succeed value)))
(defn failure
"Given some string `message`, returns a realised failed task containing an error with the given `message`"
{:added "1.0.0"}
[message] (pure (fail (Exception. message))))
(defn failure-t
"Given a proper error `Throwable`, returns a realised failed task containing it."
{:added "1.2.0"}
[^Throwable throwable] (pure (fail throwable)))
(defn done?
"Returns `true` if `task` has been realised, `false` otherwise.
Note: Doesn't check if the task has been run.
An un-run task is still considered to be realised.
For both checks, take a look at `executed?`."
{:added "1.0.0"}
[^Task task] (is-task? task "done?")
(realized? (.future task)))
(defn executed?
"Returns `true` if `task` has been realised and run, `false` otherwise."
{:added "1.0.0"}
[^Task task] (is-task? task "executed?")
(and (done? task)
(empty? (.actions task))))
(defn fulfilled?
"Returns `true` if `task` has been realised and was successful, `false` otherwise.
Note: Doesn't check if a task has been run.
An un-run task is still considered to be realised."
{:added "1.0.0"}
[^Task task] (is-task? task "fulfilled?")
(and (done? task)
(success? (peer task))))
(defn broken?
"Returns `true` if `task` has been realised and failed, `false` otherwise.
Note: Doesn't check if a task has been run.
An un-run task is still considered to be realised."
{:added "1.0.0"}
[^Task task] (is-task? task "broken?")
(and (done? task)
(failure? (peer task))))
(defn wait
"Blocks thread until `task` has been realised."
{:added "1.0.0"}
([^Task task] (is-task? task "wait")
(remap {:future #(const-future @%)} task))
([^Task task timeout else] (is-task? task "wait")
(remap {:future #(const-future (deref % timeout (succeed else)))} task)))
(defn then
"Lazily applies a function `f` on the value of `task` only
in case it is successful. `f` is allowed to return both simple
values or other tasks.
The following law applies:
(then (task 1) #(inc %)) == (then (task 1) #(task (inc %))"
{:added "1.0.0"}
[^Task task f] (is-task? task "then")
(remap {:actions #(conj % f)} task))
(defmacro then-do
"A version of `then` where the result of `task` is ignored.
Lazily runs the `body` after the `task` ignoring the `task`s return.
Mainly thought for sequential side-effects.
The following law applies:
(-> (task 1) (then-do (println 12))) == (do-tasks [_ (task 1) _ (println 12)])"
{:added "1.0.0"}
[^Task task & body]
`(then ~task (fn [x#] (do ~@body))))
(defn recover
"Recovers a failed `task` with function `f`.
`f` has as argument the `Throwable` error object that occurred during execution.
`f` may either return a simple value or another task.
Note:
In the case of parallel tasks, the `f` will have as an argument the `Throwable` object of
the first execution that failed."
{:added "1.0.0"}
[^Task task f] (is-task? task "recover")
(remap {:recovery f} task))
(defmacro recover-as [^Task task value]
"Like `recover` but instead of a function, receives a single value
with which it will recover the task.
Ignores any errors from previous executions."
{:added "1.2.0"}
`(recover ~task (fn [e#] ~value)))
(defmacro do-tasks
"A `let`-like construct that allows working with
tasks as if they were successfully realised.
Binds single tasks to names in a `let`-like fashion and
considers the names as being the realised values of those tasks.
In addition, it also supports non-task expressions. These will
automatically be lifted to a `task`.
Example:
(do-tasks [a (task 1)
_ (println a)
b (task 2)]
(+ a b))
As of 1.2.0 also supports syntax for `recover` and `recover-as`:
(do-tasks [a (task 1)
_ (println a)
b (task 2)
:recover #(.getMessage %)]
(+ a b))
...
(do-tasks [a (task 1)
_ (println a)
b (task 2)
:recover-as -1]
(+ a b))"
{:added "1.0.0"
:revised "1.2.0"}
[bindings & body]
(let [bindings# (partition 2 bindings)
recovery-f# (some (fn [[n# f#]] (when (= :recover n#) f#)) bindings#)
recovery-v# (some (fn [[n# v#]] (when (= :recover-as n#) v#)) bindings#)
task# (->> bindings#
(remove (fn [[n# _]]
(or (= :recover n#)
(= :recover-as n#))))
(reduce concat)
(destructure)
(partition 2)
(reverse)
(reduce
(fn [expr [name# binding#]]
`(then (task ~binding#) (fn [~name#] ~expr)))
(cons 'do body)))]
(cond
recovery-f# `(recover ~task# ~recovery-f#)
recovery-v# `(recover-as ~task# ~recovery-v#)
:else task#)))
(defn get!
"Returns the value of `task`. Blocks `task` until it is realised.
If `task` is successful, returns its value.
If it is failed, returns a collection of `Throwable` error objects.
Note: Doesn't run the task!"
{:added "1.0.0"}
([^Task task] (is-task? task "get!")
(-> (wait task) (.future) (deref) (:value)))
([^Task task timeout else] (is-task? task "get!")
(-> (wait task timeout else) (.future) (deref) (:value))))
(defn get-or-else
"Returns the value of `task` if it was successful,
returns `else` otherwise. Blocks `task` until it is realised.
Note: Doesn't run the task!"
{:added "1.0.0"}
([^Task task else] (is-task? task "get-or-else")
(let [result (-> (wait task) (.future) (deref))]
(if (failure? result) else (:value result))))
([^Task task timeout else] (is-task? task "get-or-else")
(let [result (-> (wait task timeout else) (.future) (deref))]
(if (failure? result) else (:value result)))))
(defn mapply
"Takes all the values of `tasks` and lazily applies a
function `f` on them if they were successful.
If one task fails, the total result will be a failure.
`tasks` will be run in parallel.
`f` is allowed to return both simple values and other tasks.
The arity of `f` is equal to the amount of `tasks`.
The following law applies:
(arity f) == (count tasks)"
{:added "1.0.0"}
[f & tasks] (all-tasks? tasks "mapply")
(remap {:exec parallel} (-> (vec tasks) (succeed) (pure) (then f))))
(defn zip
"Takes the values of `tasks` and aggregates them to a vector if they were successful.
If one task fails, the total result will be a failure.
`tasks` will be run in parallel."
{:added "1.0.0"}
[& tasks] (all-tasks? tasks "zip")
(apply (partial mapply vector) tasks))
(defn sequenced-par
"Takes a collection of tasks and lazily transforms it in a task containing
a collection of all the values of those tasks.
If one task fails, the total result will be a failure.
`tasks` will be run in parallel."
{:added "1.0.0"
:revised "1.3.0"}
[tasks] (all-tasks? tasks "sequenced-par")
(let [inside-out (apply zip tasks)]
(cond
(set? tasks) (then inside-out set)
(list? tasks) (then inside-out seq)
:else inside-out)))
(defn sequenced
"Takes a collection of tasks and lazily transforms it in a task containing
a collection of all the values of those tasks.
If one task fails, the total result will be a failure.
`tasks` will be run sequentially."
{:added "1.0.0"}
[tasks] (all-tasks? tasks "sequenced")
(let [inside-out (reduce
(fn [current next]
(do-tasks [acc current
result next]
(conj acc result))) (task []) tasks)]
(cond
(set? tasks) (then inside-out set)
(list? tasks) (then inside-out seq)
:else inside-out)))
(defn run
"Runs `task` synchronously.
Returns another task containing the result of that execution.
Note: Parallel tasks will be run in parallel,
but the current thread will be blocked until all tasks realise."
{:added "1.0.0"}
[task] (is-task? task "run")
(let [execution (.exec task)]
(if (= serial execution)
(pure (execute task))
(pure (execute-par task)))))
(defn run-async
"Runs `task` asynchronously.
Returns another task containing the result of that execution.
Note: Returns immediately. Parallel tasks will be run in parallel."
{:added "1.0.0"}
[task] (is-task? task "run-async")
(let [execution (.exec task)]
(if (= serial execution)
(purely (future (execute task)))
(purely (future (execute-par task))))))