Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

async ops wip - needs testing

commit 4b1819b8b15c260b8a01f7a73cf6aea6d2c84bf2 1 parent 697bc8b
Rich Hickey authored September 23, 2013

Showing 1 changed file with 490 additions and 3 deletions. Show diff stats Hide diff stats

  1. 493  src/main/clojure/clojure/core/async.clj
493  src/main/clojure/clojure/core/async.clj
@@ -7,6 +7,7 @@
7 7
 ;;   You must not remove this notice, or any other, from this software.
8 8
 
9 9
 (ns clojure.core.async
  10
+  (:refer-clojure :exclude [reduce into merge])
10 11
   (:require [clojure.core.async.impl.protocols :as impl]
11 12
             [clojure.core.async.impl.channels :as channels]
12 13
             [clojure.core.async.impl.buffers :as buffers]
@@ -20,9 +21,11 @@
20 21
            [java.util.concurrent.locks Lock]
21 22
            [java.util.concurrent Executors Executor]))
22 23
 
  24
+(alias 'core 'clojure.core)
  25
+
23 26
 (set! *warn-on-reflection* true)
24 27
 
25  
-(defn- fn-handler
  28
+(defn fn-handler
26 29
   [f]
27 30
   (reify
28 31
    Lock
@@ -252,11 +255,11 @@
252 255
         opts (filter opt? clauses)
253 256
         clauses (remove opt? clauses)
254 257
         [clauses bindings]
255  
-        (reduce
  258
+        (core/reduce
256 259
          (fn [[clauses bindings] [ports expr]]
257 260
            (let [ports (if (vector? ports) ports [ports])
258 261
                  [ports bindings]
259  
-                 (reduce
  262
+                 (core/reduce
260 263
                   (fn [[ports bindings] port]
261 264
                     (if (vector? port)
262 265
                       (let [[port val] port
@@ -380,3 +383,487 @@
380 383
   the body when completed."
381 384
   [& body]
382 385
   `(thread-call (fn [] ~@body)))
  386
+
  387
+;;;;;;;;;;;;;;;;;;;; ops ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
  388
+
  389
+(defn map<
  390
+  "Takes a function and a source channel, and returns a channel which
  391
+  contains the values produced by applying f to each value taken from
  392
+  the source channel"
  393
+  [f ch]
  394
+  (reify
  395
+   impl/Channel
  396
+   (close! [_] (impl/close! ch))
  397
+
  398
+   impl/ReadPort
  399
+   (take! [_ fn1]
  400
+     (let [ret
  401
+       (impl/take! ch
  402
+         (reify
  403
+          Lock
  404
+          (lock [_] (.lock ^Lock fn1))
  405
+          (unlock [_] (.unlock ^Lock fn1))
  406
+          
  407
+          impl/Handler
  408
+          (active? [_] (impl/active? fn1))
  409
+          (lock-id [_] (impl/lock-id fn1))
  410
+          (commit [_]
  411
+           (let [f1 (impl/commit fn1)]
  412
+             #(f1 (if (nil? %) nil (f %)))))))]
  413
+       (if (and ret (not (nil? @ret)))
  414
+         (channels/box (f @ret))
  415
+         ret)))
  416
+   
  417
+   impl/WritePort
  418
+   (put! [_ val fn0] (impl/put! ch val fn0))))
  419
+
  420
+(defn map>
  421
+  "Takes a function and a target channel, and returns a channel which
  422
+  applies f to each value before supplying it to the target channel."
  423
+  [f ch]
  424
+  (reify
  425
+   impl/Channel
  426
+   (close! [_] (impl/close! ch))
  427
+   
  428
+   impl/ReadPort
  429
+   (take! [_ fn1] (impl/take! ch fn1))
  430
+   
  431
+   impl/WritePort
  432
+   (put! [_ val fn0]
  433
+    (impl/put! ch (f val) fn0))))
  434
+
  435
+(defn filter>
  436
+  "Takes a predicate and a target channel, and returns a channel which
  437
+  supplies only the values for which the predicate returns true to the
  438
+  target channel."
  439
+  [p ch]
  440
+  (reify
  441
+   impl/Channel
  442
+   (close! [_] (impl/close! ch))
  443
+
  444
+   impl/ReadPort
  445
+   (take! [_ fn1] (impl/take! ch fn1))
  446
+   
  447
+   impl/WritePort
  448
+   (put! [_ val fn0]
  449
+    (if (p val)
  450
+      (impl/put! ch val fn0)
  451
+      (channels/box nil)))))
  452
+
  453
+(defn remove>
  454
+  "Takes a predicate and a target channel, and returns a channel which
  455
+  supplies only the values for which the predicate returns false to the
  456
+  target channel."
  457
+  [p ch]
  458
+  (filter> (complement p) ch))
  459
+
  460
+(defmacro go-loop
  461
+  "Like (go (loop ...))"
  462
+  [bindings & body]
  463
+  `(go (loop ~bindings ~@body)))
  464
+
  465
+(defn filter<
  466
+  "Takes a predicate and a source channel, and returns a channel which
  467
+  contains only the values taken from the source channel for which the
  468
+  predicate returns true. The channel will be created by default, or
  469
+  can be supplied. By default the channel will close when the source
  470
+  channel closes, but can be determined by the close? parameter."
  471
+  ([p ch] (filter< p ch (chan)))
  472
+  ([p ch out] (filter< p ch out true))
  473
+  ([p ch out close?]
  474
+     (go-loop []
  475
+       (let [val (<! ch)]
  476
+         (if (nil? val)
  477
+           (when close? (close! out))
  478
+           (do (when (p val)
  479
+                 (>! out val))
  480
+               (recur)))))
  481
+     out))
  482
+
  483
+(defn remove<
  484
+  "Takes a predicate and a source channel, and returns a channel which
  485
+  contains only the values taken from the source channel for which the
  486
+  predicate returns false.
  487
+
  488
+  The out channel will be created by default, or
  489
+  can be supplied. By default the channel will close when the source
  490
+  channel closes, but can be determined by the close? parameter."
  491
+  ([p ch] (filter< (complement p) ch))
  492
+  ([p ch out] (filter< (complement p) ch out))
  493
+  ([p ch out close?] (filter< (complement p) ch out close?)))
  494
+
  495
+(defn mapcat<
  496
+  "Takes a function and a source channel, and returns a channel which
  497
+  contains the values in each collection produced by applying f to
  498
+  each value taken from the source channel. f must return a
  499
+  collection.
  500
+
  501
+  The out channel will be created by default, or can be supplied. By
  502
+  default the channel will close when the source channel closes, but
  503
+  can be determined by the close? parameter."
  504
+  ([f in] (mapcat< f in (chan)))
  505
+  ([f in out] (mapcat< f in out true))
  506
+  ([f in out close?]
  507
+     (go-loop []
  508
+       (let [val (<! in)]
  509
+         (if (nil? val)
  510
+           (when close? (close! out))
  511
+           (let [vals (f val)]
  512
+             (doseq [v vals]
  513
+               (>! out val))
  514
+             (recur)))))
  515
+     out))
  516
+
  517
+(defn mapcat>
  518
+  "Takes a function and a target channel, and returns a channel which
  519
+  applies f to each value put, then supplies each element of the result
  520
+  to the target channel. f must return a collection.
  521
+
  522
+  The in channel will be created by default, or can be supplied. By
  523
+  default the target channel will be closed when the source channel
  524
+  closes, but can be determined by the close? parameter."
  525
+  
  526
+  ([f out] (mapcat> (chan) out))
  527
+  ([f in out] (mapcat< in out) in)
  528
+  ([f in out closing?] (mapcat< in out closing?) in))
  529
+
  530
+(defn pipe
  531
+  "Takes elements from the from channel and supplies them to the to
  532
+  channel. By default the to channel will be closed when the
  533
+  from channel closes, but can be determined by the close?
  534
+  parameter."
  535
+  ([from to] (pipe from to true))
  536
+  ([from to close?]
  537
+     (go-loop []
  538
+      (let [v (<! from)]
  539
+        (if (nil? v)
  540
+          (when close? (close! to))
  541
+          (do (>! to v)
  542
+              (recur)))))
  543
+     to))
  544
+
  545
+(defn merge
  546
+  "Takes two channels and returns a third channel which contains all
  547
+  values taken from the first two.
  548
+
  549
+  The out channel will be created by default, or can be supplied. By
  550
+  default the channel will close after both source channels have
  551
+  closed, but can be determined by the close? parameter."
  552
+  ([c1 c2] (merge c1 c2 (chan)))
  553
+  ([c1 c2 out] (merge c1 c2 out true))
  554
+  ([c1 c2 out close?]
  555
+   (let [out (chan)]
  556
+     (go-loop [cs [c1 c2]]
  557
+       (if (pos? (count cs))
  558
+         (let [[v c] (alts! cs)]
  559
+           (if (nil? v)
  560
+             (recur (filterv #(not= c %) cs))
  561
+             (do (>! out v)
  562
+                 (recur cs))))
  563
+         (when close? (close! out))))
  564
+     out)))
  565
+
  566
+(defn split
  567
+  "Takes a predicate and a source channel and returns a vector of two
  568
+  channels, the first of which will contain the values for which the
  569
+  predicate returned true, the second those for which it returned
  570
+  false.
  571
+
  572
+  The out channels will be created by default, or can be supplied. By
  573
+  default the channels will close after the source channel has
  574
+  closed, but can be determined by the close? parameter."
  575
+  ([p ch] (split p ch (chan) (chan)))
  576
+  ([p ch truec falsec] (split p ch truec falsec true))
  577
+  ([p ch tc fc close?]
  578
+     (go-loop []
  579
+       (let [v (<! ch)]
  580
+         (if (nil? v)
  581
+           (when close? (close! tc) (close! fc))
  582
+           (do (>! (if (p v) tc fc) v)
  583
+               (recur)))))
  584
+     [tc fc]))
  585
+
  586
+(defn reduce
  587
+  "f should be a function of 2 arguments. Returns a channel containing
  588
+  the single result of applying f to init and the first item from the
  589
+  channel, then applying f to that result and the 2nd item, etc. If
  590
+  the channel closes without yielding items, returns init and f is not
  591
+  called. ch must close before reduce produces a result."
  592
+  [f init ch]
  593
+  (go-loop [ret init]
  594
+    (let [v (<! ch)]
  595
+      (if (nil? v)
  596
+        ret
  597
+        (recur (f ret v))))))
  598
+
  599
+(defn into
  600
+  "Returns a channel containing the single (collection) result of the
  601
+  items taken from the channel conjoined to the supplied
  602
+  collection. ch must close before into produces a result."
  603
+  [coll ch]
  604
+  (reduce conj coll ch))
  605
+
  606
+(defn- bounded-count
  607
+  "Returns the smaller of n or the count of coll, without examining
  608
+  more than n items if coll is not counted"
  609
+  [n coll]
  610
+  (if (counted? coll)
  611
+    (min n (count coll))
  612
+    (loop [i 0 s (seq coll)]
  613
+      (if (and s (< i n))
  614
+        (recur (inc i) (next s))
  615
+        i))))
  616
+
  617
+(defn onto-chan
  618
+  "Puts the contents of coll into the supplied channel.
  619
+
  620
+  By default the channel will be closed after the items are copied,
  621
+  but can be determined by the close? parameter.
  622
+
  623
+  Returns a channel which will close after the items are copied."
  624
+  ([ch coll] (onto-chan ch coll true))
  625
+  ([ch coll close?]
  626
+     (go-loop [vs (seq coll)]
  627
+       (if vs
  628
+         (do (>! ch (first vs))
  629
+             (recur (next vs)))
  630
+         (when close?
  631
+           (close! ch))))))
  632
+
  633
+(defn to-chan
  634
+  "Creates and returns a channel which contains the contents of coll,
  635
+  closing when exhausted."
  636
+  [coll]
  637
+  (let [ch (chan (bounded-count 100 coll))]
  638
+    (onto-chan ch coll)
  639
+    ch))
  640
+
  641
+(defprotocol Mux
  642
+  (muxch* [_]))
  643
+
  644
+(defprotocol Mult
  645
+  (tap* [m ch close?])
  646
+  (untap* [m ch])
  647
+  (untap-all* [m]))
  648
+
  649
+(defn mult
  650
+  "Creates and returns a mult(iple) of the supplied channel. Channels
  651
+  containing copies of the channel can be created with 'tap', and
  652
+  detached with 'untap'"
  653
+  [ch]
  654
+  (let [cs (atom {}) ;;ch->close?
  655
+        m (reify
  656
+           Mux
  657
+           (muxch* [_] ch)
  658
+           
  659
+           Mult
  660
+           (tap* [_ ch close?] (swap! cs assoc ch close?) nil)
  661
+           (untap* [_ ch] (swap! cs dissoc ch) nil)
  662
+           (untap-all* [_] (reset! cs {}) nil))]
  663
+    (go-loop []
  664
+     (let [val (<! ch)]
  665
+       (if (nil? val)
  666
+         (doseq [[c close?] @cs]
  667
+           (when close? (close! c)))
  668
+         (do (doseq [c (keys @cs)]
  669
+               (try
  670
+                 (put! c val)
  671
+                 (catch Exception e
  672
+                   (untap* m c))))
  673
+             (recur)))))
  674
+    m))
  675
+
  676
+(defn tap
  677
+  "Copies the mult source onto the supplied channel.
  678
+
  679
+  By default the channel will be closed when the source closes,
  680
+  but can be determined by the close? parameter."
  681
+  ([mult ch] (tap mult ch true))
  682
+  ([mult ch close?] (tap* mult ch close?) ch))
  683
+
  684
+(defn untap
  685
+  "Disconnects a target channel from a mult"
  686
+  [mult ch]
  687
+  (untap* mult ch))
  688
+
  689
+(defn untap-all
  690
+  "Disconnects all target channels from a mult"
  691
+  [mult] (untap-all* mult))
  692
+
  693
+(defprotocol Mix
  694
+  (admix* [m ch])
  695
+  (unmix* [m ch])
  696
+  (unmix-all* [m])
  697
+  (toggle* [m state-map])
  698
+  (solo-mode* [m mode]))
  699
+
  700
+(defn mix
  701
+  "Creates and returns a mix of one or more input channels which will
  702
+  be put on the supplied out channel. Input sources can be added to
  703
+  the mix with 'admix', and removed with 'unmix'. A mix supports
  704
+  soloing, muting and pausing multiple inputs atomically using
  705
+  'toggle', and can solo using either muting or pausing as determined
  706
+  by 'solo-mode'.
  707
+
  708
+  Each channel can have zero or more boolean modes set via 'toggle':
  709
+
  710
+  :solo - when true, only this (ond other soloed) channel(s) will appear
  711
+          in the mix output channel. :mute and :pause states of soloed
  712
+          channels are ignored. If solo-mode is :mute, non-soloed
  713
+          channels are muted, if :pause, non-soloed channels are
  714
+          paused.
  715
+
  716
+  :mute - muted channels will have their contents consumed but not included in the mix
  717
+  :pause - paused channels will not have their contents consumed (and thus also not included in the mix)
  718
+"
  719
+  [out]
  720
+  (let [cs (atom {}) ;;ch->attrs-map
  721
+        solo-modes #{:mute :pause}
  722
+        attrs (conj solo-modes :solo)
  723
+        solo-mode (atom :mute)
  724
+        change (chan)
  725
+        changed #(put! change true)
  726
+        pick (fn [attr chs]
  727
+               (reduce-kv
  728
+                   (fn [ret c v]
  729
+                     (if (attr v)
  730
+                       (conj ret c)
  731
+                       ret))
  732
+                   #{} chs))
  733
+        calc-state (fn []
  734
+                     (let [chs @cs
  735
+                           mode @solo-mode
  736
+                           solos (pick :solo chs)
  737
+                           pauses (pick :pause chs)]
  738
+                       {:solos solos
  739
+                        :mutes (pick :mute chs)
  740
+                        :reads (conj
  741
+                                (if (and (= mode :pause) (not (empty? solos)))
  742
+                                  (vec solos)
  743
+                                  (vec (remove pauses (keys chs))))
  744
+                                change)}))
  745
+        m (reify
  746
+           Mux
  747
+           (muxch* [_] out)
  748
+           Mix
  749
+           (admix* [_ ch] (swap! cs assoc ch {}) (changed))
  750
+           (unmix* [_ ch] (swap! cs dissoc ch) (changed))
  751
+           (unmix-all* [_] (reset! cs {}) (changed))
  752
+           (toggle* [_ state-map] (swap! cs merge-with merge state-map) (changed))
  753
+           (solo-mode* [_ mode]
  754
+             (assert (solo-modes mode) (str "mode must be one of: " solo-modes))
  755
+             (reset! solo-mode mode)
  756
+             (changed)))]
  757
+    (go-loop [{:keys [solos mutes reads] :as state} (calc-state)]
  758
+      (let [[v c] (alts! reads)]
  759
+        (if (or (nil? v) (= c change))
  760
+          (do (when (nil? v)
  761
+                (swap! cs dissoc c))
  762
+              (recur (calc-state)))
  763
+          (do (when (or (solos c)
  764
+                        (and (empty? solos) (not (mutes c))))
  765
+                (>! out v))
  766
+            (recur state)))))
  767
+    m))
  768
+
  769
+(defn admix
  770
+  "Adds ch as an input to the mix"
  771
+  [mix ch]
  772
+  (admix* mix ch))
  773
+
  774
+(defn unmix
  775
+  "Removes ch as an input to the mix"
  776
+  [mix ch]
  777
+  (unmix* mix ch))
  778
+
  779
+(defn unmix-all
  780
+  "removes all inputs from the mix"
  781
+  [mix]
  782
+  (unmix-all* mix))
  783
+
  784
+(defn toggle
  785
+  "Atomically sets the state(s) of one or more channels in a mix. The
  786
+  state map is a map of channels -> channel-state-map. A
  787
+  channel-state-map is a map of attrs -> boolean, where attr is one or
  788
+  more of :mute, :pause or :solo. Any states supplied are merged with
  789
+  the current state.
  790
+
  791
+  Note that channels can be added to a mix via toggle, which can be
  792
+  used to add channels in a particular (e.g. paused) state."
  793
+  [mix state-map]
  794
+  (toggle* mix state-map))
  795
+
  796
+(defn solo-mode
  797
+  "Sets the solo mode of the mix. mode must be one of :mute or :pause"
  798
+  [mix mode]
  799
+  (solo-mode* mix mode))
  800
+
  801
+(defprotocol Pub
  802
+  (sub* [p v ch close?])
  803
+  (unsub* [p v ch])
  804
+  (unsub-all* [p] [p v]))
  805
+
  806
+(defn pub
  807
+  "Creates and returns a pub(lication) of the supplied channel,
  808
+  partitioned into topics by the topic-fn. topic-fn will be applied to
  809
+  each value on the channel and the result will determine the topic on
  810
+  which that value will be put. Channels can be subscribed to receive
  811
+  copies of topics using 'sub', and unsubscribed using 'unsub'. Each
  812
+  partition will be handled by an internal mult on a deidicated
  813
+  channel. By default these internal channels will be created
  814
+  via (chan), but a chan-fn can be supplied which creates channels
  815
+  with desired properties.
  816
+
  817
+  Note that each topic is handled asynchronously, i.e. if a channel is
  818
+  subscribed to more than one topic it should not expect them to be
  819
+  interleaved identically with the source."
  820
+  ([ch topic-fn] (pub ch topic-fn chan))
  821
+  ([ch topic-fn chan-fn]
  822
+     (let [mults (atom {}) ;;dval->mult
  823
+           ensure-mult (fn [v]
  824
+                         (or (get @mults v)
  825
+                             (get (swap! mults #(if (% v) % (assoc % v (mult (chan-fn))))) v)))
  826
+           p (reify
  827
+              Mux
  828
+              (muxch* [_] ch)
  829
+              
  830
+              Pub
  831
+              (sub* [p v ch close?]
  832
+                    (let [m (ensure-mult v)]
  833
+                      (tap m ch close?)))
  834
+              (unsub* [p v ch]
  835
+                      (when-let [m (get @mults v)]
  836
+                        (untap m ch)))
  837
+              (unsub-all* [_] (reset! mults {}))
  838
+              (unsub-all* [_ v] (swap! mults dissoc v)))]
  839
+       (go-loop []
  840
+         (let [val (<! ch)]
  841
+           (if (nil? val)
  842
+             (doseq [m (vals @mults)]
  843
+               (close! (muxch* m)))
  844
+             (let [m (get @mults val)]
  845
+               (when m
  846
+                 (try
  847
+                   (put! (muxch* m) val)
  848
+                   (catch Exception e
  849
+                     (swap! mults dissoc val))))
  850
+               (recur))))))))
  851
+
  852
+(defn sub
  853
+  "Subscribes a channel to a topic of a pub.
  854
+
  855
+  By default the channel will be closed when the source closes,
  856
+  but can be determined by the close? parameter."
  857
+  ([p topic ch] (sub p topic ch true))
  858
+  ([p topic ch close?] (sub* p topic ch close?)))
  859
+
  860
+(defn unsub
  861
+  "Unsubscribes a channel from a topic of a pub"
  862
+  [p topic ch]
  863
+  (unsub* p topic ch))
  864
+
  865
+(defn unsub-all
  866
+  "Unsubscribes all channels from a pub, or a topic of a pub"
  867
+  ([p] (unsub-all* p))
  868
+  ([p topic] (unsub-all* p topic)))
  869
+

1 note on commit 4b1819b

lgrapenthin

This line overwrites the supplied chan out and should be removed.

lgrapenthin

Currently throws. Should probably be (swap! cs (partial merge-with core/merge) state-map)

lgrapenthin

Is there a reason why this takes exactly two and not an arbitrary number of channels?

Logan Linn

great additions :+1:

Please sign in to comment.
Something went wrong with that request. Please try again.