Skip to content
Yury edited this page Sep 22, 2017 · 8 revisions

Pubbing and Subbing

(It is recommended that you fire up a REPL with access to core.async while reading this)

(use 'clojure.core.async)

Publishing and subscribing is an oft-used metaphor to manage asynchronous events.

The pub function takes a channel and a topic function and returns a publication:

(def input-chan (chan))
(def our-pub (pub input-chan :msg-type))

A publication is not a channel:

(put! our-pub {:msg-type :greeting :text "hello"})
;IllegalArgumentException No implementation of method put! found for class: clojure.core.async$pub$reify

Instead, we just push messages to the original channel:

(>!! input-chan {:msg-type :greeting :text "hello"})
;true

(Normally, with nothing drawing from input-chan, this would block, but the implementation of pub creates two go blocks that will valiantly sacrifice themselves and hang so the main thread doesn't have to, acting like size 1 buffers. This is an implementation detail, however, and should not be relied upon. It's good to know the underlying workings, especially if you're playing around experimenting with buffers).

The topic-fn is applied to all values that are passed to the publication. In the above, :msg-type is being used as a fn, and it will return :greeting. The topic-fn is used to categorize values, not modify them; a publication with inc as its topic function would pass the values unchanged.

How do we receive messages? Here the sub function comes into play. It's called like so:

(def output-chan (chan))
(sub our-pub :greeting output-chan)
;#<ManyToManyChannel>

sub takes a publication, a topic, and a subscribing channel. The subscribing channel will receive all values from the publication for which (= (topic-fn value) topic):

(go-loop []
  (let [{:keys [text]} (<! output-chan)]
    (println text)
    (recur)))

Notice that although we put a value onto the input-chan earlier, nothing happens right after we run this go block. This is because publications will drop values that don't match a subscribed topic. Our original message was lost. But now that there is a subscribed channel, new input will be published:

(>!! input-chan {:msg-type :greeting :text "hi"})
;true
;hi

The publication will copy the input to as many channels as are subscribed; if we create a duplicate channel subscribed to the same topic:

(let [c (chan)]
  (sub our-pub :greeting c)
  (go-loop []
    (let [{:keys [msg-type text]} (<! c)]
      (println text)
    (recur))))

and then put another message on input-chan, then both channels will receive it:

(>!! input-chan {:msg-type :greeting :text "hi"})
;true
;hi
;hi

It's worth noting that if a publication tries to pass to a channel that's not accepting values, the whole publication will block:

(def loser-chan (chan))
(sub our-pub :loser loser-chan)
(>!! input-chan {:msg-type :loser :text "I won't be accepted"})

Careful: this will return true and won't block, because the publication is taking values from input-chan. But inside the publication, a go block is hanging. The >!! of a :loser value won't block the main thread either, but all following will.

The moral of the story is: if you subscribe a channel to a publication, make sure it can accept values! You should be doing this anyway, but it's especially important when dealing with publications, because you might not have any inkling that something is wrong until some other topic is hanging.

Sometimes you can't do this---perhaps because the consuming operation is resource-intensive. In these cases you can create topic-specific buffers: the pub function allows a buf-fn argument that, given a topic, returns a buffer or a number, which is then passed internally to chan. Your buffer-fn might even simply be a map:

{:predict-election 100
 :flip-a-coin 3}

You can get fancier with special buffers and buffer functions if you wish:

(def pub-central
  (let [topic-fn #(case (:msg-type %) 
                    :db-change :acid
                    :http-request :stateless)
        buf-fn #(case %
                  :stateless (sliding-buffer 10)
                  :acid (dropping-buffer 1000))]
    (pub request-source topic-fn buf-fn)))

As always, the documentation and source code are available for your perusal. Happy publishing!