/
async.clj
137 lines (111 loc) · 3.46 KB
/
async.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
(ns clojure.more.async
(:require
[clojure.core.async :as a]))
(defn produce
"Puts the contents repeatedly calling f into the supplied channel.
By default the channel will be closed after the items are copied,
but can be determined by the close? parameter.
Returns a channel which will close after the items are copied.
Based on clojure.core.async/onto-chan.
Equivalent to (onto-chan ch (repeatedly f)) but cuts out the seq."
([ch f] (produce ch f true))
([ch f close?]
(a/go-loop [v (f)]
(if (and v (a/>! ch v))
(recur (f))
(when close?
(a/close! ch))))))
(defn produce-blocking
"Like `produce` but blocking in a thread."
([ch f] (produce ch f true))
([ch f close?]
(a/thread
(loop [v (f)]
(if (and v (a/>!! ch v))
(recur (f))
(when close?
(a/close! ch)))))))
(defn produce-bound-blocking
"Like `produce-blocking`, but calls `pre` and `post` in the context
of the thread.
The value returned by `pre` is passed to `f` and `post`.
`pre` is called before the loop, `post` after it exhausts.
Useful for non thread safe objects which throw upon being accessed from
different threads."
[ch f close? pre post]
(a/thread
(let [pv (pre)]
(loop [v (f pv)]
(if (and v (a/>!! ch v))
(recur (f pv))
(when close?
(a/close! ch))))
(post pv))))
(defn consume
"Takes values repeatedly from channels and applies f to them.
The opposite of produce.
Stops consuming values when the channel is closed."
[ch f]
(a/go-loop [v (a/<! ch)]
(when v
(f v)
(recur (a/<! ch)))))
(defn consume?
"Takes values repeatedly from channels and applies f to them.
Recurs only when f returns a non false-y value.
The opposite of produce.
Stops consuming values when the channel is closed."
[ch f]
(a/go-loop [v (a/<! ch)]
(when v
(when (f v)
(recur (a/<! ch))))))
(defn consume-blocking
"Takes values repeatedly from channels and applies f to them.
The opposite of produce.
Stops consuming values when the channel is closed."
[ch f]
(a/thread
(loop [v (a/<!! ch)]
(when v
(f v)
(recur (a/<!! ch))))))
(defn consume-blocking?
"Takes values repeatedly from channels and applies f to them.
Recurs only when f returns a non false-y value.
The opposite of produce.
Stops consuming values when the channel is closed."
[ch f]
(a/thread
(loop [v (a/<!! ch)]
(when v
(when (f v)
(recur (a/<!! ch)))))))
(defn split*
"Takes a channel, function f :: v -> k and a map of keys to channels k -> ch,
routing the values v from the input channel to the channel such that
(f v) -> ch.
(get m (f v)) must be non-nil for every v! "
([f ch m]
(a/go-loop []
(let [v (a/<! ch)]
(if (nil? v)
(doseq [c (vals m)] (a/close! c))
(if-let [o (get m (f v))]
(when (a/>! o v)
(recur))
(throw (Exception. "Channel does not exist"))))))))
(defn split-maybe
"Takes a channel, function f :: v -> k and a map of keys to channels k -> ch,
routing the values v from the input channel to the channel such that
(f v) -> ch.
If (f v) is not in m, the value is dropped"
([f ch m]
(a/go-loop []
(let [v (a/<! ch)]
(if (nil? v)
(doseq [c (vals m)] (a/close! c))
(if-let [o (get m (f v))]
(when (a/>! o v)
(recur))
(recur)))))))