Jean Niklas L'orange edited this page Jun 10, 2016 · 4 revisions
Clone this wiki locally

Conduit Examples

Extensions to clojure.core

These functions are transducer variants of existing functions in clojure.core which does not have a transducer variant.

(defn replicating
  "A transducer which emits n values for every value that sent in to it."
   (while true
     (let [val (await)]
       (dotimes [_ n]
         (yield val))))))

(defn partitioning
  "Like partition, but as a transducer.

  Could probably be a bit more efficient."
   (partitioning n n))
  ([n step]
    (loop [acc []]
      (let [chunk (loop [acc' acc]
                    (if (= (count acc') n)
                      (recur (conj acc' (await)))))]
        (yield chunk)
        (recur (vec (drop step chunk))))))))

(defn butlasting
  "Like butlast, but as a transducer."
  ([] (butlasting 1))
    (let [queue
          (loop [queue clojure.lang.PersistentQueue/EMPTY
                 i 0]
            (if (= i n)
              (let [val (await)]
                (recur (conj queue val)
                       (inc i)))))]
      (loop [queue queue]
        (let [val (await)]
          ;; received a new value, so we can emit an old one
          (yield (peek queue))
          (recur (conj (pop queue) val))))))))

Simple Moving Average

Moving averages can be used to smooth out fluctuations in a trend, and can be useful to hide "spikey" values. One use case could be to compute the "average" number of active customers by doing a sampling every minute, then average it over, say 5 minutes. This may give you a more interesting number for trends.

If we have a sequence of such samplings on the shape

{:time timestamp
 :customers integer}

we can create a conduit that reads a key, computes the moving average for the key, then attaches it back to the map we got in:

{:time timestamp
 :customers integer
 :customer-moving-average double}

We'll use a simple moving averages, and to avoid some common issues we will center the results. That is, if we have a 5 unit window, we will use values from elements with timestamp 1-5 to compute the moving average for the element with timestamp 3. Some of the first values won't be able to have a moving average, and in that case we'll just drop them. We will also have a slightly lagged output, since we need data from "future" elements.

This is not as easy as one would hope, but fortunately we can split it into three parts that make the computation manageable:

  1. We begin by reading the values we can not emit a sliding window for (the window size minus one divided by 2). We still need them to compute the sum, so we will retain both the sum so far along with a queue of the values we've read.
  2. Now we read the values we can emit a window for, but not yet enough data to to compute a moving average for (the window size plus one divided by 2). We still compute the sum and put the number into a queue, but we also store the actual values into a queue as well.
  3. Finally we have enough data to yield values. We have enough data to emit the first one straight away, so we do that. We then read a value, update the queues and sum and recur.

The queues we will use is the hidden nugget clojure.lang.PersistentQueue in clojure.core, which works as you would expect for pop, peek and conj.

The final result is this rather large and complex conduit:

(defn simple-moving-average
  "Returns a transducer that is a simple moving average over the entries given.
  `read-key` is the value to perform an average over, and `write-key` is
  that the moving average would be attached to."
  [window-size read-key write-key]
  {:pre [(pos? window-size) (odd? window-size)]}
  (let [read-initial (/ (dec window-size) 2)]
     ;; Read "initial" values, the ones we will not emit a sliding window for
     (let [[init-sum init-sum-window]
           (loop [sum 0
                  sum-window clojure.lang.PersistentQueue/EMPTY
                  i 0]
             (if (= read-initial i)
               [sum sum-window]
               (let [val-key (get (await) read-key)]
                 (recur (+ sum val-key)
                        (conj sum-window val-key)
                        (inc i)))))
           ;; Read and retain these values, as we can emit a moving average for
           ;; them once we have enough data
           [sum sum-window value-window]
           (loop [sum init-sum
                  sum-window init-sum-window
                  value-window clojure.lang.PersistentQueue/EMPTY
                  i 0]
             (if (= i (inc read-initial))
               [sum sum-window value-window]
               (let [val (await)
                     val-key (get val read-key)]
                 (recur (+ sum val-key)
                        (conj sum-window val-key)
                        (conj value-window val)
                        (inc i)))))]
       ;; Now we can emit, and then read as a "usual" conduit
       (loop [sum sum
              sum-window sum-window
              value-window value-window]
         (yield (assoc (peek value-window)
                       write-key (/ (double sum) (double window-size))))
         (let [val (await)
               val-key (get val read-key)
               new-sum (- (+ sum val-key) (peek sum-window))]
           (recur new-sum
                  (conj (pop sum-window) val-key)
                  (conj (pop value-window) val))))))))

Imagine the effort it would take to create this one with the basic transducer interface!

Now we can of course play around with the conduit:

user=> (sequence (simple-moving-average 5 :val :sma)
                 (map (fn [v] {:val v}) (range 10)))
({:val 2, :sma 2.0} {:val 3, :sma 3.0} {:val 4, :sma 4.0}
 {:val 5, :sma 5.0} {:val 6, :sma 6.0} {:val 7, :sma 7.0})

user=> (def randoms (let [r (java.util.Random. 42)]
                      (repeatedly 10 #(.nextInt r 100))))

user=> randoms
(30 63 48 84 70 25 5 18 19 93)

user=> (sequence (simple-moving-average 5 :val :sma)
                 (map (fn [v] {:val v}) randoms))
({:val 48, :sma 59.0} {:val 84, :sma 58.0} {:val 70, :sma 46.4}
 {:val 25, :sma 40.4} {:val 5, :sma 27.4} {:val 18, :sma 32.0})