Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add cloud-scoped pred checks to a few stat primitives we missed earlier.

  • Loading branch information...
commit ecf3593d93a5dd4f1615cd109be374658f3023e7 1 parent 075e17f
@technomancy technomancy authored
Showing with 44 additions and 41 deletions.
  1. +44 −41 src/pulse/def.clj
View
85 src/pulse/def.clj
@@ -126,31 +126,32 @@
(rate 60 70 pred-fn))
(defn rate-by-key [time-unit time-buffer pred-fn key-fn]
- {:receive-init
+ (let [pred-fn (cloud-scoped-pred pred-fn *cloud*)]
+ {:receive-init
(fn []
[(util/millis) {}])
- :receive-apply
+ :receive-apply
(fn [[window-start window-counts] event]
[window-start (if (pred-fn event) (util/update window-counts (str (key-fn event)) safe-inc) window-counts)])
- :receive-emit
+ :receive-emit
(fn [[window-start window-counts]]
[window-start (util/millis) window-counts])
- :merge-init
+ :merge-init
(fn []
[])
- :merge-apply
+ :merge-apply
(fn [windows window]
(conj windows window))
- :merge-emit
+ :merge-emit
(fn [windows]
- (let [now (util/millis)
- recent-windows (filter (fn [[window-start _ _]] (>= window-start (- now (* 1000 time-buffer) 1000))) windows)
- complete-windows (filter (fn [[window-start _ _]] (< window-start (- now 1000))) recent-windows)
- complete-counts (apply merge-with + (map (fn [[_ _ window-counts]] window-counts) complete-windows))
- complete-sorted-counts (sort-by (fn [[k kc]] (- kc)) complete-counts)
- complete-high-counts (take 10 complete-sorted-counts)
- complete-rates (map (fn [[k kc]] [k (double (/ kc (/ time-buffer time-unit)))]) complete-high-counts)]
- [recent-windows complete-rates]))})
+ (let [now (util/millis)
+ recent-windows (filter (fn [[window-start _ _]] (>= window-start (- now (* 1000 time-buffer) 1000))) windows)
+ complete-windows (filter (fn [[window-start _ _]] (< window-start (- now 1000))) recent-windows)
+ complete-counts (apply merge-with + (map (fn [[_ _ window-counts]] window-counts) complete-windows))
+ complete-sorted-counts (sort-by (fn [[k kc]] (- kc)) complete-counts)
+ complete-high-counts (take 10 complete-sorted-counts)
+ complete-rates (map (fn [[k kc]] [k (double (/ kc (/ time-buffer time-unit)))]) complete-high-counts)]
+ [recent-windows complete-rates]))}))
(defn per-second-by-key [pred-fn key-fn]
(rate-by-key 1 10 pred-fn key-fn))
@@ -159,78 +160,80 @@
(rate-by-key 60 70 pred-fn key-fn))
(defn rate-unique [time-buffer pred-fn key-fn]
- {:receive-init
+ (let [pred-fn (cloud-scoped-pred pred-fn *cloud*)]
+ {:receive-init
(fn []
[(util/millis) #{}])
- :receive-apply
+ :receive-apply
(fn [[window-start window-hits] event]
[window-start (if (pred-fn event) (conj window-hits (key-fn event)) window-hits)])
- :receive-emit
+ :receive-emit
(fn [window]
window)
- :merge-init
+ :merge-init
(fn []
[])
- :merge-apply
+ :merge-apply
(fn [windows window]
(conj windows window))
- :merge-emit
+ :merge-emit
(fn [windows]
- (let [now (util/millis)
- recent-windows (filter (fn [[window-start _]] (>= window-start (- now (* 1000 time-buffer)))) windows)
- recent-hits (apply set/union (map (fn [[_ window-hits]] window-hits) recent-windows))
- recent-count (count recent-hits)]
- [recent-windows recent-count]))})
+ (let [now (util/millis)
+ recent-windows (filter (fn [[window-start _]] (>= window-start (- now (* 1000 time-buffer)))) windows)
+ recent-hits (apply set/union (map (fn [[_ window-hits]] window-hits) recent-windows))
+ recent-count (count recent-hits)]
+ [recent-windows recent-count]))}))
(defn per-minute-unique [pred-fn key-fn]
(rate-unique 60 pred-fn key-fn))
(defn last [pred-fn val-fn]
- {:receive-init
+ (let [pred-fn (cloud-scoped-pred pred-fn *cloud*)]
+ {:receive-init
(fn []
nil)
- :receive-apply
+ :receive-apply
(fn [last-val event]
(if (pred-fn event)
(val-fn event)
last-val))
- :receive-emit
+ :receive-emit
(fn [last-val]
last-val)
- :merge-init
+ :merge-init
(fn []
nil)
- :merge-apply
+ :merge-apply
(fn [last-val received]
(or received last-val))
- :merge-emit
+ :merge-emit
(fn [last-val]
- [last-val last-val])})
+ [last-val last-val])}))
(defn last-agg [recent-interval pred-fn part-fn val-fn agg-fn]
- {:receive-init
- (fn []
- {})
- :receive-apply
+ (let [pred-fn (cloud-scoped-pred pred-fn *cloud*)]
+ {:receive-init
+ (fn [] {})
+ :receive-apply
(fn [last-timed-vals evt]
(if (pred-fn evt)
(assoc last-timed-vals (part-fn evt) [(util/millis) (val-fn evt)])
last-timed-vals))
- :receive-emit
+ :receive-emit
(fn [last-timed-vals]
last-timed-vals)
- :merge-init
+ :merge-init
(fn []
{})
- :merge-apply
+ :merge-apply
(fn [last-timed-vals received]
(merge last-timed-vals received))
- :merge-emit
+ :merge-emit
(fn [last-timed-vals]
(let [now (util/millis)
recent-timed-vals (into {} (filter (fn [[_ [last-time _]]] (< (- now last-time) (* (or recent-interval 300) 1000))) last-timed-vals))
recent-agg (agg-fn (map (fn [[_ [_ last-val]]] last-val) recent-timed-vals))]
- [recent-timed-vals recent-agg]))})
+ [recent-timed-vals recent-agg]))}))
(defn last-mean [recent-interval pred-fn part-fn val-fn]
(last-agg recent-interval pred-fn part-fn val-fn coll-mean))
Please sign in to comment.
Something went wrong with that request. Please try again.