/
dispatch.cljs
66 lines (51 loc) · 1.71 KB
/
dispatch.cljs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
(ns sailing-study-guide.dispatch
(:require-macros [cljs.core.async.macros :refer [go go-loop]])
(:require [cljs.core.async :refer [chan mult tap put! <! >! pub sub unsub close!]]))
(defonce default-buffer-size 5)
(defonce *dispatcher-logging-enabled* true)
(defonce ^:private dispatch-chan (chan default-buffer-size))
(defonce ^:private dispatch-mult (mult dispatch-chan))
(defonce ^:private dispatch-pub-chan (chan default-buffer-size))
(defonce ^:private dispatch-pub (pub dispatch-pub-chan #(:tag %)))
(tap dispatch-mult dispatch-pub-chan)
(defn- retrieve-message [payload]
(when payload
(:message payload)))
(defn register [tag]
(let [c (chan)]
(sub dispatch-pub tag c)))
(defn unregister [tag chan]
(unsub dispatch-pub tag chan)
(close! chan))
(defn whenever [tag cb]
(let [c (register tag)]
(go-loop [payload (<! c)]
(if payload
(do
;; (println "Processing mesg in " payload)
(cb (retrieve-message payload))
(recur (<! c)))
(do
(println "Leaving loop for " c)
(close! c))))
c))
(defn dispatch! [tagortags message]
(let [tags (if (sequential? tagortags) tagortags [tagortags])]
(doseq [tag tags]
(go
(>! dispatch-chan {:tag tag :message message})
(println "Put!")))))
;; Start logger
(when *dispatcher-logging-enabled*
(defonce ^:private dispatch-logger-chan (chan))
(tap dispatch-mult dispatch-logger-chan)
(go-loop []
(println "Logged: " (pr-str (<! dispatch-logger-chan)))
(recur)))
(comment
(defonce tags [:answer-chosen])
(defonce payload "message")
(dispatch! tags payload)
;; (dispatch! [:answer-unchosen "foo"])
;; (retrieve! bus)
)