-
Notifications
You must be signed in to change notification settings - Fork 0
/
internals.clj
87 lines (77 loc) · 3.06 KB
/
internals.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
(ns conquerant.internals
(:import [java.util.concurrent CompletableFuture CompletionStage Executor Executors ExecutorService ForkJoinPool]
java.util.function.Function))
(defonce ^:dynamic *executor*
(ForkJoinPool/commonPool))
(defonce ^:dynamic *timeout-scheduler*
(Executors/newSingleThreadExecutor))
(defn complete [^CompletableFuture promise val]
(.complete promise val))
(defn ^CompletableFuture promise* [f]
(let [p (CompletableFuture.)
reject #(.completeExceptionally p %)
resolve #(complete p %)]
(CompletableFuture/runAsync #(try
(f resolve reject)
(catch Throwable e
(reject e)))
*executor*)
p))
(defn promise? [v]
(instance? CompletionStage v))
(defn bind [^CompletionStage p callback]
(let [binds (clojure.lang.Var/getThreadBindingFrame)
func (reify Function
(apply [_ v]
(clojure.lang.Var/resetThreadBindingFrame binds)
(callback v)))]
(.thenComposeAsync p ^Function func ^Executor *executor*)))
(defn then
([p f]
(bind p (fn promise-wrap [in]
(let [out (f in)]
(if (promise? out)
out
(promise* out))))))
([p f timeout-ms timeout-val]
(let [promise (CompletableFuture.)
start-time-millis (System/currentTimeMillis)]
(.submit ^ExecutorService *timeout-scheduler*
^Runnable #(try
(let [spent-ms (- (System/currentTimeMillis)
start-time-millis)]
(complete promise (deref p
(max 0 (- timeout-ms spent-ms))
timeout-val)))
(catch Throwable e
(.completeExceptionally promise e))))
(then promise f))))
(defn attempt [callback]
(promise* (fn [resolve reject]
(let [result (callback)]
(if (promise? result)
(then result resolve)
(resolve result))))))
(defmacro ado [& body]
`(attempt (fn []
~@body)))
(defmacro alet [bindings & body]
(if (not-any? identity
(for [expr (->> bindings rest (take-nth 2))]
(and (coll? expr)
(= 'await (first expr)))))
`(let ~bindings ~@body)
(->> (partition 2 bindings)
reverse
(reduce (fn [acc [l r]]
(if (and (coll? r)
(symbol? (first r))
(not= "." (subs (name (first r)) 0 1)))
(if (= 'await (first r))
(let [[_ expr & timeout] r]
`(then ~expr
(fn [~l] ~acc)
~@timeout))
`(let [~l ~r] ~acc))
`(let [~l ~r] ~acc)))
`(ado ~@body)))))