forked from andrewvc/engulf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
recorder.clj
96 lines (82 loc) · 2.96 KB
/
recorder.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
(ns parbench.recorder
(:require [clojure.tools.logging :as log])
(:use [parbench.utils :only [median percentiles increment-keys]]
lamina.core))
(defn record-avg-runtime-by-start-time [stats {:keys [req-start runtime]}]
(update-in stats
[:avg-runtime-by-start-time (long (/ req-start 1000))]
(fn [bucket]
(let [rcount (+ 1 (get bucket :count 0))
total (+ runtime (get bucket :total 0))]
(merge bucket
{:count rcount
:total total
:avg (long (/ total rcount))})))))
(defn record-runtime [stats {:keys [runtime]}]
(update-in stats [:runtimes] #(conj %1 runtime)))
(defn record-response-code-count [stats {{resp-code :status} :response}]
(update-in stats [:response-code-counts] increment-keys resp-code))
(defn record-run-succeeded [stats data]
(increment-keys stats :runs-succeeded :runs-total))
(defn runtime-agg-stats [{:keys [runtimes runs-total]} started-at ended-at]
(let [runtime (- ended-at started-at)
sorted-runtimes (vec (sort runtimes))]
{:runtime runtime
:runs-sec (/ runs-total (/ runtime 1000))
:median-runtime (median sorted-runtimes)
:runtime-percentiles (percentiles sorted-runtimes)}))
(defprotocol Recordable
"Recording protocol"
(processed-stats [this])
(record-start [this])
(record-end [this])
(record-result [this worker-id data])
(record-error [this worker-id err]))
(defrecord StandardRecorder [started-at ended-at stats]
Recordable
(processed-stats [this]
(let [statsd @stats]
;(println (runtime-agg-stats this statsd))
(merge
(runtime-agg-stats statsd
@started-at
(or @ended-at (System/currentTimeMillis)))
(select-keys statsd
[:runs-total :runs-succeeded :runs-failed
:avg-runtime-by-start-time
:response-code-counts]))))
(record-start [this]
(compare-and-set! started-at nil (System/currentTimeMillis)))
(record-end [this]
(compare-and-set! ended-at nil (System/currentTimeMillis)))
(record-result
[this worker-id data]
(send stats
(fn [statsd]
(try
(reduce
(fn [v stat-fn] (stat-fn v data))
statsd
[record-avg-runtime-by-start-time
record-runtime
record-response-code-count
record-run-succeeded])
(catch Exception e
(.printStackTrace e))))))
(record-error [this worker-id err]
(send stats increment-keys :runs-failed)) )
(defn- empty-stats []
{:started-at nil
:ended-at nil
:runtime nil
:runs-sec nil
:runs-total 0
:runs-succeeded 0
:runs-failed 0
:response-code-counts {}
:avg-runtime-by-start-time {}
:runtimes []})
(defn create-recorder []
(StandardRecorder. (atom nil)
(atom nil)
(agent (empty-stats))))