Browse files

Shard merger processes.

  • Loading branch information...
1 parent f8057a2 commit 49e8bc5781565d5c0a5d0e029ad4f2def63f54c9 @technomancy technomancy committed with mmcgrana Feb 3, 2012
Showing with 22 additions and 6 deletions.
  1. +2 −1 Procfile
  2. +13 −1 src/pulse/io.clj
  3. +5 −3 src/pulse/merger.clj
  4. +2 −1 src/pulse/receiver.clj
View
3 Procfile
@@ -1,3 +1,4 @@
web: lein run -m pulse.web
receiver: lein run -m pulse.receiver
-merger: lein run -m pulse.merger
+merger0: lein run -m pulse.merger 0
+merger1: lein run -m pulse.merger 1
View
14 src/pulse/io.clj
@@ -5,7 +5,8 @@
[clj-redis.client :as redis])
(:import (clojure.lang LineNumberingPushbackReader)
(java.io InputStreamReader BufferedReader PrintWriter)
- (java.net Socket SocketException ConnectException)))
+ (java.net Socket SocketException ConnectException)
+ (org.apache.commons.codec.digest DigestUtils)))
(defn log [& data]
(apply log/log :ns "io" data))
@@ -32,6 +33,14 @@
(Thread/sleep 100)
(recur))))
+(defn shard-channel [[stat-name]]
+ (let [hash (DigestUtils/sha stat-name)
+ merger-count (Integer. (or (System/getenv "MERGER_COUNT") "2"))
+ shard (mod (first hash) merger-count)]
+ (str "stats.received." shard)))
+
+(alter-var-root #'shard-channel memoize)
+
(defn init-bleeders [aorta-urls apply-queue]
(log :fn "init-bleeders" :at "start")
(doseq [aorta-url aorta-urls]
@@ -48,6 +57,9 @@
(log :fn "init-publishers" :at "spawn" :chan chan :index i)
(util/spawn-loop (fn []
(let [data (queue/take publish-queue)
+ chan (if (ifn? chan)
+ (chan data)
+ chan)
data-str (try
(ser data)
(catch Exception e
View
8 src/pulse/merger.clj
@@ -36,15 +36,17 @@
[stat-def stat-state] (get stats-map stat-name)]
(stat/merge-apply stat-def stat-state pub))))))
-(defn -main []
+(defn -main [shard]
(log :fn "main" :at "start")
(let [apply-queue (queue/init 2000)
publish-queue (queue/init 100)
stats-map (init-stats def/all)]
(queue/init-watcher apply-queue "apply")
(queue/init-watcher publish-queue "publish")
- (io/init-publishers publish-queue (conf/redis-url) "stats.merged" json/generate-string (conf/publish-threads))
+ (io/init-publishers publish-queue (conf/redis-url) "stats.merged"
+ json/generate-string (conf/publish-threads))
(init-emitter stats-map publish-queue)
(init-appliers stats-map apply-queue (conf/apply-threads))
- (io/init-subscriber (conf/redis-url) "stats.received" read-string apply-queue))
+ (io/init-subscriber (conf/redis-url) (str "stats.received." shard)
+ read-string apply-queue))
(log :fn "main" :at "finish"))
View
3 src/pulse/receiver.clj
@@ -41,7 +41,8 @@
stats-states (init-stats def/all)]
(queue/init-watcher apply-queue "apply")
(queue/init-watcher publish-queue "publish")
- (io/init-publishers publish-queue (conf/redis-url) "stats.received" pr-str (conf/publish-threads))
+ (io/init-publishers publish-queue (conf/redis-url) io/shard-channel
+ pr-str (conf/publish-threads))
(init-emitter stats-states publish-queue)
(init-appliers stats-states apply-queue (conf/apply-threads))
(io/init-bleeders (conf/aorta-urls) apply-queue)

0 comments on commit 49e8bc5

Please sign in to comment.