Permalink
Browse files

metrics emitter

  • Loading branch information...
1 parent 75d0ac7 commit 7f98f661352ccd8cca282534d7a92f6ebd3315b0 @mmcgrana mmcgrana committed Apr 25, 2012
Showing with 66 additions and 2 deletions.
  1. +1 −0 Procfile
  2. +2 −1 Readme.md
  3. +1 −0 project.clj
  4. +1 −0 src/pulse/conf.clj
  5. +57 −0 src/pulse/emitter.clj
  6. +1 −1 src/pulse/io.clj
  7. +3 −0 src/pulse/util.clj
View
@@ -5,3 +5,4 @@ merger1: lein trampoline run -m pulse.merger 1
merger2: lein trampoline run -m pulse.merger 2
merger3: lein trampoline run -m pulse.merger 3
merger4: lein trampoline run -m pulse.merger 4
+emitter: lein trampoline run -m pulse.emitter
View
@@ -59,9 +59,10 @@ $ heroku config:add SCALES_URL=... -r $DEPLOY
$ heroku config:add CANONICAL_HOST=... -r $DEPLOY
$ heroku config:add REDIS_URL=... -r $DEPLOY
$ heroku config:add AORTA_URLS=... -r $DEPLOY
+$ heroku config:add METRICS_URLS=... -r $DEPLOY
$ git push $DEPLOY master
-$ heroku scale receiver=60 merger0=1 merger1=1 merger2=1 merger3=1 merger4=1 web=5 -r $DEPLOY
+$ heroku scale receiver=60 merger0=1 merger1=1 merger2=1 merger3=1 merger4=1 web=5 emitter=1 -r $DEPLOY
```
View
@@ -4,4 +4,5 @@
[clj-json "0.3.1"]
[hiccup "0.3.8"]
[ring-basic-auth "0.1.0"]
+ [clj-http "0.4.0"]
[ring/ring-jetty-adapter "1.0.1"]])
View
@@ -17,6 +17,7 @@
(defn force-https? [] (boolean (env "FORCE_HTTPS")))
(defn scales-url [] (env! "SCALES_URL"))
(defn api-url [] (env! "API_URL"))
+(defn metrics-urls [] (str/split (env! "METRICS_URLS") #","))
(defn deploy [] (env! "DEPLOY"))
(defn cloud [] (env! "CLOUD"))
(defn canonical-host [] (env! "CANONICAL_HOST"))
View
@@ -0,0 +1,57 @@
+(ns pulse.emitter
+ (:require [clj-json.core :as json]
+ [clj-redis.client :as redis]
+ [clj-http.client :as http]
+ [pulse.conf :as conf]
+ [pulse.util :as util]
+ [pulse.log :as log]))
+
+(defn log [& data]
+ (apply log/log :ns "emitter" data))
+
+(defonce stats-buff-a
+ (atom '()))
+
+(defn post [metrics-url stats]
+ (let [{:keys [host]} (util/url-parse metrics-url)]
+ (log :fn "post" :at "start" :host host)
+ (http/post metrics-url
+ {:body (json/generate-string stats)
+ :socket-timeout 5000
+ :conn-timeout 5000
+ :content-type :json})
+ (log :fn "post" :at "finish" :host host)))
+
+(defn init-emitter []
+ (log :fn "init-emitter" :at "start")
+ (loop []
+ (let [stats-size (count @stats-buff-a)
+ stats (take-last stats-size @stats-buff-a)]
+ (if-not (zero? stats-size)
+ (do
+ (log :fn "init-emitter" :at "emit" :stats-size stats-size)
+ (swap! stats-buff-a #(drop-last stats-size %))
+ (doseq [metrics-url (conf/metrics-urls)]
+ (util/spawn #(post metrics-url stats)))
+ (log :fn "init-emitter" :at "emitted" :stats-size stats-size))
+ (log :fn "init-emitter" :at "empty")))
+ (util/sleep 500)
+ (recur)))
+
+(defn init-buffer []
+ (log :fn "init-buffer" :at "start")
+ (let [rd (redis/init {:url (conf/redis-url)})]
+ (redis/subscribe rd ["stats.merged"] (fn [_ stat-json]
+ (let [[metric value] (json/parse-string stat-json)
+ time (util/millis)
+ period 60
+ buff-size (count @stats-buff-a)]
+ (if (< buff-size 10000)
+ (swap! stats-buff-a conj {"metric" metric "value" value "time" time "period" period})
+ (log :fn "init-buffer" :at "drop" :buffer-size buff-size)))))))
+
+(defn -main []
+ (log :fn "main" :at "start")
+ (util/spawn init-buffer)
+ (util/spawn init-emitter)
+ (log :fn "main" :at "finish"))
View
@@ -30,7 +30,7 @@
(log/log :fn "bleeder" :at "connect_exception" :aorta_host host))
(catch SocketException e
(log/log :fn "bleeder" :at "socket_exception" :aorta_host host)))
- (Thread/sleep 100)
+ (util/sleep 100)
(recur))))
(defn shard-for [stat-name]
View
@@ -36,5 +36,8 @@
(defn millis []
(System/currentTimeMillis))
+(defn sleep [t]
+ (Thread/sleep t))
+
(defn update [m k f]
(assoc m k (f (get m k))))

0 comments on commit 7f98f66

Please sign in to comment.