Skip to content

Commit

Permalink
streams/ddt: fix some edge case bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
aphyr committed Feb 24, 2013
1 parent ef931fb commit 4930a95
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 64 deletions.
89 changes: 52 additions & 37 deletions src/riemann/streams.clj
Expand Up @@ -501,6 +501,56 @@
(ref-set state nil))
))))

(defn ddt-real
"(ddt) in real time."
[n & children]
(let [state (atom (list nil)) ; Events at t3, t2, and t1.
swap (fn swap []
(let [[_ e2 e1] (swap! state
(fn [[e3 e2 e1 :as state]]
; If no events have come in this
; interval, we preserve the last event
; in both places, which means we emit
; zeroes.
(if e3
(list nil e3 e2)
(list nil e2 e2))))]
(when (and e1 e2)
(let [dt (- (:time e2) (:time e1))
out (merge e2
(if (zero? dt)
{:time (unix-time)
:metric 0}
(let [diff (/ (- (:metric e2)
(:metric e1))
dt)]
{:time (unix-time)
:metric diff})))]
(call-rescue out children)))))
poller (periodically-until-expired n swap)]

(fn stream [event]
(when (:metric event)
(swap! state (fn [[most-recent & more]] (cons event more))))
(poller event))))

(defn ddt-events
"(ddt) between each pair of events."
[& children]
(let [prev (ref nil)]
(fn stream [event]
(when-let [m (:metric event)]
(let [prev-event (dosync
(let [prev-event (deref prev)]
(ref-set prev event)
prev-event))]
(when prev-event
(let [dt (- (:time event) (:time prev-event))]
(when-not (zero? dt)
(let [diff (/ (- m (:metric prev-event)) dt)]
(call-rescue (assoc event :metric diff) children))))))))))


(defn ddt
"Differentiate metrics with respect to time. With no args, emits an event for
each one received, but with metric equal to the difference between the
Expand All @@ -509,43 +559,8 @@
seconds instead, until expired. Skips events without metrics."
[& args]
(if (number? (first args))
; Emit a differential every n seconds
(do
(let [[n & children] args
prev (ref nil)
most-recent (ref nil)
swap (fn []
(let [[a b] (dosync
(let [prev-event (deref prev)
last-event (deref most-recent)]
(ref-set prev last-event)
[prev-event last-event]))]
(when (and a b)
(let [dt (- (:time b) (:time a))]
(when-not (zero? dt)
(let [diff (/ (- (:metric b) (:metric a))
dt)]
(call-rescue (assoc b :metric diff) children)))))))
poller (periodically-until-expired n swap)]
(fn [event]
(when (:metric event)
(dosync (ref-set most-recent event)))
(poller event))))

; Emit a differential for every event
(do
(let [prev (ref nil)]
(fn [event]
(when-let [m (:metric event)]
(let [prev-event (dosync
(let [prev-event (deref prev)]
(ref-set prev event)
prev-event))]
(when prev-event
(let [dt (- (:time event) (:time prev-event))]
(when-not (zero? dt)
(let [diff (/ (- m (:metric prev-event)) dt)]
(call-rescue (assoc event :metric diff) args))))))))))))
(apply ddt-real args)
(apply ddt-events args)))

(defn rate
"Take the sum of every event over interval seconds and divide by the interval
Expand Down
67 changes: 40 additions & 27 deletions test/riemann/test/streams.clj
Expand Up @@ -690,34 +690,47 @@
{:metric -3 :time 4}]))

(deftest ddt-interval-test
; Quick burst without crossing interval
(is (= (map :metric (run-stream-intervals
(ddt 0.1)
[{:metric 1} nil {:metric 2} nil {:metric 3}]))
[]))

; 1 event per interval
(let [t0 (unix-time)]
(is (= (map :metric (run-stream-intervals
(ddt 0.1)
[{:metric -1 :time t0} 0.1
{:metric 0 :time (+ 1/10 t0)} 0.1
{:metric -5 :time (+ 2/10 t0)} 0.1]))
[10 -50])))
(testing "Quick burst without crossing interval"
(reset-time!)
(is (= (map :metric (run-stream-intervals
(ddt 0.1)
[{:metric 1} nil
{:metric 2} nil
{:metric 3}]))
[])))

(testing "1 event per interval"
(reset-time!)
(test-stream-intervals
(ddt 1)
; Note that the swap occurs just prior to events at time 1.
[{:time 0 :metric -1} 99/100
{:time 1 :metric 0} 1
{:time 2 :metric -5} 1]
[{:time 1 :metric 1}
{:time 2 :metric -5}]))

(reset-time!)

; n events per interval
(let [t0 (unix-time)]
(is (= (map :metric (run-stream-intervals
(ddt 0.1)
[{:metric -1 :time t0} 0.01 ; counts
{:metric 100 :time (+ 1/20 t0)} 0.05
{:metric 1 :time (+ 2/20 t0)} 0.05
{:metric nil :time (+ 3/20 t0)} 0.05
{:metric -3 :time (+ 4/20 t0)} 0.05]))
[20 -40])))
)
(testing "n events per interval"
(reset-time!)
(test-stream-intervals
(ddt 1)
[{:time 0 :metric -1} 1/100
{:time 1/2 :metric 100} 1/2 ; Ignored
{:time 2/2 :metric 1} 1/2
{:time 3/2 :metric nil} 1/2 ; Ignored
{:time 4/2 :metric -3} 1/2]
[{:time 2/2 :metric 2}
{:time 4/2 :metric -4}]))

(testing "emits zeroes when no events arrive in an interval"
(reset-time!)
(test-stream-intervals (ddt 2)
[{:time 0 :metric 0} 1
{:time 1 :metric 1} 2
{:time 3 :metric 2} 3]
[{:time 2 :metric 1}
{:time 4 :metric 1/2}
{:time 6 :metric 0}])))

(deftest rate-slow-even
(let [output (ref [])
Expand Down

0 comments on commit 4930a95

Please sign in to comment.