|
57 | 57 |
|
58 | 58 | (defn <! |
59 | 59 | "takes a val from port. Must be called inside a (go ...) block. Will |
60 | | - return nil if closed. Will park if nothing is available." |
| 60 | + return nil if closed. Will park if nothing is available. |
| 61 | + Returns true unless port is already closed" |
61 | 62 | [port] |
62 | 63 | (assert nil "<! used not in (go ...) block")) |
63 | 64 |
|
|
76 | 77 | (dispatch/run #(fn1 val))))) |
77 | 78 | nil))) |
78 | 79 |
|
79 | | -(defn- nop []) |
| 80 | +(defn- nop [_]) |
| 81 | +(def ^:private fhnop (fn-handler nop)) |
80 | 82 |
|
81 | 83 | (defn >! |
82 | 84 | "puts a val into port. nil values are not allowed. Must be called |
83 | | - inside a (go ...) block. Will park if no buffer space is available." |
| 85 | + inside a (go ...) block. Will park if no buffer space is available. |
| 86 | + Returns true unless port is already closed." |
84 | 87 | [port val] |
85 | 88 | (assert nil ">! used not in (go ...) block")) |
86 | 89 |
|
|
89 | 92 | complete. nil values are not allowed. Will throw if closed. If |
90 | 93 | on-caller? (default true) is true, and the put is immediately |
91 | 94 | accepted, will call fn0 on calling thread. Returns nil." |
92 | | - ([port val] (put! port val nop)) |
93 | | - ([port val fn0] (put! port val fn0 true)) |
94 | | - ([port val fn0 on-caller?] |
95 | | - (let [ret (impl/put! port val (fn-handler fn0))] |
96 | | - (when (and ret (not= fn0 nop)) |
| 95 | + ([port val] |
| 96 | + (if-let [ret (impl/put! port val fhnop)] |
| 97 | + @ret |
| 98 | + true)) |
| 99 | + ([port val fn1] (put! port val fn1 true)) |
| 100 | + ([port val fn1 on-caller?] |
| 101 | + (if-let [retb (impl/put! port val (fn-handler fn1))] |
| 102 | + (let [ret @retb] |
97 | 103 | (if on-caller? |
98 | | - (fn0) |
99 | | - (dispatch/run fn0))) |
100 | | - nil))) |
| 104 | + (fn1 ret) |
| 105 | + (dispatch/run #(fn1 ret))) |
| 106 | + ret) |
| 107 | + true))) |
101 | 108 |
|
102 | 109 | (defn close! |
103 | 110 | ([port] |
|
150 | 157 | wport (when (vector? port) (port 0)) |
151 | 158 | vbox (if wport |
152 | 159 | (let [val (port 1)] |
153 | | - (impl/put! wport val (alt-handler flag #(fret [nil wport])))) |
| 160 | + (impl/put! wport val (alt-handler flag #(fret [% wport])))) |
154 | 161 | (impl/take! port (alt-handler flag #(fret [% port]))))] |
155 | 162 | (if vbox |
156 | 163 | (channels/box [@vbox (or wport port)]) |
|
163 | 170 |
|
164 | 171 | (defn alts! |
165 | 172 | "Completes at most one of several channel operations. Must be called |
166 | | - inside a (go ...) block. ports is a vector of channel endpoints, which |
167 | | - can be either a channel to take from or a vector of |
| 173 | + inside a (go ...) block. ports is a vector of channel endpoints, |
| 174 | + which can be either a channel to take from or a vector of |
168 | 175 | [channel-to-put-to val-to-put], in any combination. Takes will be |
169 | 176 | made as if by <!, and puts will be made as if by >!. Unless |
170 | 177 | the :priority option is true, if more than one port operation is |
171 | 178 | ready a non-deterministic choice will be made. If no operation is |
172 | 179 | ready and a :default value is supplied, [default-val :default] will |
173 | 180 | be returned, otherwise alts! will park until the first operation to |
174 | 181 | become ready completes. Returns [val port] of the completed |
175 | | - operation, where val is the value taken for takes, and nil for puts. |
| 182 | + operation, where val is the value taken for takes, and a |
| 183 | + boolean (true unless already closed, as per put!) for puts. |
176 | 184 |
|
177 | 185 | opts are passed as :key val ... Supported options: |
178 | 186 |
|
|
197 | 205 | (reify |
198 | 206 | impl/Channel |
199 | 207 | (close! [_] (impl/close! ch)) |
| 208 | + (closed? [_] (impl/closed? ch)) |
200 | 209 |
|
201 | 210 | impl/ReadPort |
202 | 211 | (take! [_ fn1] |
|
214 | 223 | ret))) |
215 | 224 |
|
216 | 225 | impl/WritePort |
217 | | - (put! [_ val fn0] (impl/put! ch val fn0)))) |
| 226 | + (put! [_ val fn1] (impl/put! ch val fn1)))) |
218 | 227 |
|
219 | 228 | (defn map> |
220 | 229 | "Takes a function and a target channel, and returns a channel which |
|
228 | 237 | (take! [_ fn1] (impl/take! ch fn1)) |
229 | 238 |
|
230 | 239 | impl/WritePort |
231 | | - (put! [_ val fn0] |
232 | | - (impl/put! ch (f val) fn0)))) |
| 240 | + (put! [_ val fn1] |
| 241 | + (impl/put! ch (f val) fn1)))) |
233 | 242 |
|
234 | 243 |
|
235 | 244 |
|
|
241 | 250 | (reify |
242 | 251 | impl/Channel |
243 | 252 | (close! [_] (impl/close! ch)) |
| 253 | + (closed? [_] (impl/closed? ch)) |
244 | 254 |
|
245 | 255 | impl/ReadPort |
246 | 256 | (take! [_ fn1] (impl/take! ch fn1)) |
247 | 257 |
|
248 | 258 | impl/WritePort |
249 | | - (put! [_ val fn0] |
| 259 | + (put! [_ val fn1] |
250 | 260 | (if (p val) |
251 | | - (impl/put! ch val fn0) |
252 | | - (channels/box nil))))) |
| 261 | + (impl/put! ch val fn1) |
| 262 | + (channels/box (not (impl/closed? ch))))))) |
253 | 263 |
|
254 | 264 | (defn remove> |
255 | 265 | "Takes a predicate and a target channel, and returns a channel which |
|
290 | 300 | (let [val (<! in)] |
291 | 301 | (if (nil? val) |
292 | 302 | (close! out) |
293 | | - (let [vals (f val)] |
294 | | - (doseq [v vals] |
295 | | - (>! out v)) |
296 | | - (recur)))))) |
| 303 | + (do (doseq [v (f val)] |
| 304 | + (>! out v)) |
| 305 | + (when-not (impl/closed? out) |
| 306 | + (recur))))))) |
297 | 307 |
|
298 | 308 | (defn mapcat< |
299 | 309 | "Takes a function and a source channel, and returns a channel which |
|
327 | 337 |
|
328 | 338 | (defn pipe |
329 | 339 | "Takes elements from the from channel and supplies them to the to |
330 | | - channel. By default, the to channel will be closed when the |
331 | | - from channel closes, but can be determined by the close? |
332 | | - parameter." |
| 340 | + channel. By default, the to channel will be closed when the from |
| 341 | + channel closes, but can be determined by the close? parameter. Will |
| 342 | + stop consuming the from channel if the to channel closes" |
| 343 | + |
333 | 344 | ([from to] (pipe from to true)) |
334 | 345 | ([from to close?] |
335 | 346 | (go-loop [] |
336 | 347 | (let [v (<! from)] |
337 | 348 | (if (nil? v) |
338 | 349 | (when close? (close! to)) |
339 | | - (do (>! to v) |
| 350 | + (when (>! to v) |
340 | 351 | (recur))))) |
341 | 352 | to)) |
342 | 353 |
|
|
354 | 365 | (let [tc (chan t-buf-or-n) |
355 | 366 | fc (chan f-buf-or-n)] |
356 | 367 | (go-loop [] |
357 | | - (let [v (<! ch)] |
358 | | - (if (nil? v) |
359 | | - (do (close! tc) (close! fc)) |
360 | | - (do (>! (if (p v) tc fc) v) |
361 | | - (recur))))) |
| 368 | + (let [v (<! ch)] |
| 369 | + (if (nil? v) |
| 370 | + (do (close! tc) (close! fc)) |
| 371 | + (when (>! (if (p v) tc fc) v) |
| 372 | + (recur))))) |
362 | 373 | [tc fc]))) |
363 | 374 |
|
364 | 375 | (defn reduce |
|
385 | 396 | ([ch coll] (onto-chan ch coll true)) |
386 | 397 | ([ch coll close?] |
387 | 398 | (go-loop [vs (seq coll)] |
388 | | - (if vs |
389 | | - (do (>! ch (first vs)) |
390 | | - (recur (next vs))) |
391 | | - (when close? |
392 | | - (close! ch)))))) |
| 399 | + (if (and vs (>! ch (first vs))) |
| 400 | + (recur (next vs)) |
| 401 | + (when close? |
| 402 | + (close! ch)))))) |
393 | 403 |
|
394 | 404 |
|
395 | 405 | (defn to-chan |
|
420 | 430 |
|
421 | 431 | Items received when there are no taps get dropped. |
422 | 432 |
|
423 | | - If a tap put throws an exception, it will be removed from the mult." |
| 433 | + If a tap puts to a closed channel, it will be removed from the mult." |
424 | 434 | [ch] |
425 | 435 | (let [cs (atom {}) ;;ch->close? |
426 | 436 | m (reify |
|
433 | 443 | (untap-all* [_] (reset! cs {}) nil)) |
434 | 444 | dchan (chan 1) |
435 | 445 | dctr (atom nil) |
436 | | - done #(when (zero? (swap! dctr dec)) |
437 | | - (put! dchan true))] |
| 446 | + done (fn [_] (when (zero? (swap! dctr dec)) |
| 447 | + (put! dchan true)))] |
438 | 448 | (go-loop [] |
439 | 449 | (let [val (<! ch)] |
440 | 450 | (if (nil? val) |
|
443 | 453 | (let [chs (keys @cs)] |
444 | 454 | (reset! dctr (count chs)) |
445 | 455 | (doseq [c chs] |
446 | | - (try |
447 | | - (put! c val done) |
448 | | - (catch js/Object e |
449 | | - (swap! dctr dec) |
450 | | - (untap* m c)))) |
| 456 | + (when-not (put! c val done) |
| 457 | + (swap! dctr dec) |
| 458 | + (untap* m c))) |
451 | 459 | ;;wait for all |
452 | 460 | (when (seq chs) |
453 | 461 | (<! dchan)) |
|
541 | 549 | (do (when (nil? v) |
542 | 550 | (swap! cs dissoc c)) |
543 | 551 | (recur (calc-state))) |
544 | | - (do (when (or (solos c) |
545 | | - (and (empty? solos) (not (mutes c)))) |
546 | | - (>! out v)) |
| 552 | + (if (or (solos c) |
| 553 | + (and (empty? solos) (not (mutes c)))) |
| 554 | + (when (>! out v) |
| 555 | + (recur state)) |
547 | 556 | (recur state))))) |
548 | 557 | m)) |
549 | 558 |
|
|
634 | 643 | (close! (muxch* m))) |
635 | 644 | (let [topic (topic-fn val) |
636 | 645 | m (get @mults topic)] |
637 | | - (when m |
638 | | - (try |
639 | | - (>! (muxch* m) val) |
640 | | - (catch js/Object e |
641 | | - (swap! mults dissoc topic)))) |
| 646 | + (when-not (>! (muxch* m) val) |
| 647 | + (swap! mults dissoc topic)) |
642 | 648 | (recur))))) |
643 | 649 | p))) |
644 | 650 |
|
|
0 commit comments