Skip to content
Browse files

Move cloud-scoping to rate/mean/max stat calculations.

  • Loading branch information...
1 parent 24e2b8f commit 3d0e9e4eb487a5161a94f8b5f127147d63a7be81 @technomancy technomancy committed Oct 2, 2012
Showing with 39 additions and 38 deletions.
  1. +39 −38 src/pulse/def.clj
View
77 src/pulse/def.clj
@@ -21,41 +21,49 @@
0
(float (/ (coll-sum c) n)))))
+(def ^:dynamic *cloud*)
+
+(defn cloud-scoped-pred [pred-fn cloud]
+ (fn [evt]
+ (and (= (:cloud evt) cloud) (pred-fn evt))))
+
(defn max [time-buffer pred-fn val-fn]
- {:receive-init
+ (let [pred-fn (cloud-scoped-pred pred-fn *cloud*)]
+ {:receive-init
(fn []
[(util/millis) nil])
- :receive-apply
+ :receive-apply
(fn [[window-start window-max :as window] evt]
(if-not (pred-fn evt)
window
(let [val (val-fn evt)]
(cond
- (nil? window-max) [window-start val]
- (> val window-max) [window-start val]
- :else window))))
- :receive-emit
+ (nil? window-max) [window-start val]
+ (> val window-max) [window-start val]
+ :else window))))
+ :receive-emit
(fn [receive-buffer]
receive-buffer)
- :merge-init
+ :merge-init
(fn []
[])
- :merge-apply
+ :merge-apply
(fn [windows [window-start window-max :as window]]
(conj windows window))
- :merge-emit
+ :merge-emit
(fn [windows]
(let [now (util/millis)
recent-windows (filter (fn [[window-start _]] (>= window-start (- now (* 1000 time-buffer)))) windows)
recent-values (->> windows (map (fn [[_ window-max]] window-max)) (filter #(not (nil? %))))
recent-max (or (and (seq recent-values) (apply clojure.core/max recent-values)) 0)]
- [recent-windows recent-max]))})
+ [recent-windows recent-max]))}))
(defn mean [time-buffer pred-fn val-fn]
- {:receive-init
+ (let [pred-fn (cloud-scoped-pred pred-fn *cloud*)]
+ {:receive-init
(fn []
[(util/millis) 0 0])
- :receive-apply
+ :receive-apply
(fn [[window-start window-count window-sum :as receive-buffer] evt]
(if-not (pred-fn evt)
receive-buffer
@@ -65,48 +73,49 @@
(prn :fn "mean" :at "nil-val" :msg (:msg evt))
receive-buffer)
[window-start (inc window-count) (+ window-sum val)]))))
- :receive-emit
+ :receive-emit
(fn [receive-buffer]
receive-buffer)
- :merge-init
+ :merge-init
(fn []
[])
- :merge-apply
+ :merge-apply
(fn [windows window]
(conj windows window))
- :merge-emit
+ :merge-emit
(fn [windows]
(let [now (util/millis)
recent-windows (filter (fn [[window-start _ _]] (>= window-start (- now (* 1000 time-buffer)))) windows)
recent-count (coll-sum (map (fn [[_ window-count _]] window-count) recent-windows))
recent-sum (coll-sum (map (fn [[_ _ window-sum]] window-sum) recent-windows))
recent-mean (double (if (zero? recent-count) 0 (/ recent-sum recent-count)))]
- [recent-windows recent-mean]))})
+ [recent-windows recent-mean]))}))
(defn rate [time-unit time-buffer pred-fn]
- {:receive-init
+ (let [pred-fn (cloud-scoped-pred pred-fn *cloud*)]
+ {:receive-init
(fn []
[(util/millis) 0])
- :receive-apply
+ :receive-apply
(fn [[window-start window-count] event]
[window-start (if (pred-fn event) (inc window-count) window-count)])
- :receive-emit
+ :receive-emit
(fn [[window-start window-count]]
[window-start (util/millis) window-count])
- :merge-init
+ :merge-init
(fn []
[])
- :merge-apply
+ :merge-apply
(fn [windows window]
(conj windows window))
- :merge-emit
+ :merge-emit
(fn [windows]
(let [now (util/millis)
recent-windows (filter (fn [[window-start _ _]] (>= window-start (- now (* 1000 time-buffer) 1000))) windows)
complete-windows (filter (fn [[window-start _ _]] (< window-start (- now 1000))) recent-windows)
complete-count (coll-sum (map (fn [[_ _ window-count]] window-count) complete-windows))
complete-rate (double (/ complete-count (/ time-buffer time-unit)))]
- [recent-windows complete-rate]))})
+ [recent-windows complete-rate]))}))
(defn per-second [pred-fn]
(rate 1 10 pred-fn))
@@ -246,20 +255,12 @@
(str cloud "." (name stat-name))))
(defmacro defstat [stat-name stat-body]
- (let [body-sym (gensym)]
- `(let [~body-sym ~stat-body]
- ~@(for [cloud (conf/clouds)
- :let [scoped-name (scope-stat cloud stat-name)
- scoped-var-name (symbol (string/replace scoped-name "." "-"))]]
- `(do
- (def ~scoped-var-name (assoc ~body-sym
- :name ~(str (conf/graphite-prefix) scoped-name)
- :pred-fn (fn [evt#]
- (when (> (rand) 0.95)
- (log :at "defstat/pred-fn" :cloud (:cloud evt#)))
- (and (= (:cloud evt#) ~cloud)
- ((:pred-fn ~body-sym) evt#)))))
- (swap! all conj ~scoped-var-name))))))
+ `(do
+ ~@(for [cloud (conf/clouds)
+ :let [scoped-name (scope-stat cloud stat-name)
+ scoped-var-name (symbol (string/replace scoped-name "." "-"))]]
+ `(swap! all conj (assoc (binding [*cloud* ~cloud] ~stat-body)
+ :name ~(str (conf/graphite-prefix) scoped-name))))))
(defn kv? [m k v]
(= (k m) v))

0 comments on commit 3d0e9e4

Please sign in to comment.
Something went wrong with that request. Please try again.