-
Notifications
You must be signed in to change notification settings - Fork 6
/
core_async.cljc
60 lines (53 loc) · 1.57 KB
/
core_async.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
(ns exoscale.interceptor.core-async
"core.async support"
(:require #?(:clj [clojure.core.async :as async]
:cljs [cljs.core.async :as async])
[exoscale.interceptor.protocols :as p]
[exoscale.interceptor.impl :as impl]))
(defn ^:no-doc exception?
[e]
(instance? #?(:clj Exception
:cljs js/Error)
e))
(defn ^:no-doc channel?
[x]
(instance? #?(:clj clojure.core.async.impl.channels.ManyToManyChannel
:cljs cljs.core.async.impl.channels.ManyToManyChannel)
x))
(defn ^:no-doc fmap
[ch f]
(async/take! ch
#(if (channel? %)
(fmap % f)
(f %))))
(defn ^:no-doc offer!
[ch x]
(if x
(async/offer! ch x)
(async/close! ch))
ch)
(extend-protocol p/AsyncContext
#?(:cljs cljs.core.async.impl.channels.ManyToManyChannel
:clj clojure.core.async.impl.channels.ManyToManyChannel)
(then [ch f]
(let [out-ch (async/promise-chan)]
(fmap ch #(offer! out-ch (f %)))
out-ch))
(catch [ch f]
(let [out-ch (async/promise-chan)]
(fmap ch
#(offer! out-ch
(cond-> %
(exception? %)
(f %))))
out-ch)))
(defn execute
"Like `exoscale.interceptor/execute` but ensures we always get a
core.async channel back"
([ctx interceptors]
(execute (impl/enqueue ctx interceptors)))
([ctx]
(let [ch (async/promise-chan)
done #(async/offer! ch %)]
(impl/execute ctx done done)
ch)))