Skip to content
This repository has been archived by the owner on Feb 12, 2022. It is now read-only.

Commit

Permalink
Allow customizable watcher interval.
Browse files Browse the repository at this point in the history
  • Loading branch information
technomancy committed Mar 20, 2012
1 parent ec9c51e commit 209286b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 16 deletions.
1 change: 1 addition & 0 deletions .env.sample
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ MERGER_COUNT=2
LEIN_NO_DEV=y
GRAPHITE_URL=https://graphite.example.com
GRAPHITE_PERIOD=3600
WATCHER_INTERVAL=2500
1 change: 1 addition & 0 deletions src/pulse/conf.clj
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
(defn graphite-period [] (env! "GRAPHITE_PERIOD"))
(defn publish-threads [] (Integer/parseInt (env! "PUBLISH_THREADS")))
(defn merger-count [] (Integer/parseInt (env! "MERGER_COUNT")))
(defn watcher-interval [] (Integer/parseInt (or (env "WATCHER_INTERVAL") 1000)))

(defn api-password []
(second (str/split (:auth (util/url-parse (api-url))) #":")))
39 changes: 23 additions & 16 deletions src/pulse/queue.clj
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
(ns pulse.queue
(:require [pulse.util :as util]
[pulse.log :as log])
[pulse.log :as log]
[pulse.conf :as conf])
(:refer-clojure :exclude (take))
(:import java.util.concurrent.ArrayBlockingQueue
java.util.concurrent.atomic.AtomicLong))

(defn init [size]
[(ArrayBlockingQueue. size) (AtomicLong. 0) (AtomicLong. 0) (AtomicLong. 0)])

(defn offer [[^ArrayBlockingQueue queue ^AtomicLong pushed _ ^AtomicLong dropped] elem]
(defn offer [[^ArrayBlockingQueue queue ^AtomicLong pushed _
^AtomicLong dropped] elem]
(if (.offer queue elem)
(.getAndIncrement pushed)
(.getAndIncrement dropped)))
Expand All @@ -18,28 +20,33 @@
(.getAndIncrement popped)
elem))

(defn stats [[^ArrayBlockingQueue queue ^AtomicLong pushed ^AtomicLong popped ^AtomicLong dropped]]
(defn stats [[^ArrayBlockingQueue queue ^AtomicLong pushed
^AtomicLong popped ^AtomicLong dropped]]
[(.size queue) (.get pushed) (.get popped) (.get dropped)])

(defn log [& data]
(apply log/log :ns "queue" data))

(defn watcher [queue queue-name start pushed-prev popped-prev dropped-prev]
(let [elapsed (- (util/millis) start)
[depth pushed popped dropped] (stats queue)
push-rate (- pushed @pushed-prev)
pop-rate (- popped @popped-prev)
drop-rate (- dropped @dropped-prev)]
(swap! pushed-prev (constantly pushed))
(swap! popped-prev (constantly popped))
(swap! dropped-prev (constantly dropped))
(log :fn "init-watcher" :name queue-name :depth depth
:push-rate push-rate :pushed pushed
:pop-rate pop-rate :popped popped
:drop-rate drop-rate :dropped dropped)))

(defn init-watcher [queue queue-name]
(log :fn "init-watcher" :name queue-name)
(let [start (util/millis)
pushed-prev (atom 0)
popped-prev (atom 0)
dropped-prev (atom 0)]
(util/spawn-tick 1000 (fn []
(let [elapsed (- (util/millis) start)
[depth pushed popped dropped] (stats queue)
push-rate (- pushed @pushed-prev)
pop-rate (- popped @popped-prev)
drop-rate (- dropped @dropped-prev)]
(swap! pushed-prev (constantly pushed))
(swap! popped-prev (constantly popped))
(swap! dropped-prev (constantly dropped))
(log :fn "init-watcher" :name queue-name :depth depth
:push-rate push-rate :pushed pushed
:pop-rate pop-rate :popped popped
:drop-rate drop-rate :dropped dropped))))))
(util/spawn-tick (conf/watcher-interval)
(partial #'watcher queue queue-name start
pushed-prev popped-prev dropped-prev))))

0 comments on commit 209286b

Please sign in to comment.