-
Notifications
You must be signed in to change notification settings - Fork 0
/
channels.clj
53 lines (48 loc) · 1.43 KB
/
channels.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
(ns conquerant.channels
(:require [conquerant.core :as c]
[conquerant.internals :as ci])
(:import [java.util.concurrent ArrayBlockingQueue BlockingQueue]))
(defn ^BlockingQueue chan
"Returns a channel with the given capacity (or 1)."
([]
(chan 1))
([capacity]
(ArrayBlockingQueue. capacity)))
(defn take!
"Returns a `c/promise` that will be completed
with the value received from ch."
[^BlockingQueue ch]
(c/promise [resolve]
(ci/schedule
#(resolve (or (.poll ch)
(take! ch))))))
(defn put!
"Returns a `c/promise` that will resolve to
true once x has been put on ch."
[^BlockingQueue ch x]
(c/promise [resolve]
(ci/schedule
#(resolve (or (.offer ch x)
(put! ch x))))))
(defn alts!
"Returns a `c/promise` that will resolve to [ch x],
where ch is the first chan out of chans to give a value,
and x is the value received from ch."
[chans]
(let [[ch] chans]
(c/promise [resolve]
(ci/schedule
#(if-let [x (.poll ch)]
(resolve [ch x])
(resolve (alts! (rest (cycle chans)))))))))
(defn timeout!
"Returns a `chan` that will eventually have
timeout-val (or `::timeout`) after timeout-ms."
([timeout-ms]
(timeout! timeout-ms ::timeout))
([timeout-ms timeout-val]
(let [ch (chan)
pr (c/promise)]
(c/async (let [res (c/await pr timeout-ms timeout-val)]
(put! ch res)))
ch)))