forked from riemann/riemann
-
Notifications
You must be signed in to change notification settings - Fork 0
/
graphite.clj
72 lines (66 loc) · 3.08 KB
/
graphite.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
(ns riemann.transport.graphite
(:import [org.jboss.netty.util CharsetUtil]
[org.jboss.netty.channel Channels]
[org.jboss.netty.handler.codec.oneone OneToOneDecoder]
[org.jboss.netty.handler.codec.string StringDecoder StringEncoder]
[org.jboss.netty.handler.codec.frame
DelimiterBasedFrameDecoder
Delimiters])
(:use [riemann.transport.tcp :only [tcp-server]]
[clojure.string :only [split]]))
(defn decode-graphite-line
"Decode a line coming from graphite.
Graphite uses a simple scheme where each metric is given as a CRLF delimited
line, space split with three items:
* The metric name
* The metric value (optionally NaN)
* The timestamp
By default, decode-graphite-line will yield a simple metric with just
a service metric and timestamp, a parser-fn can be given to it, which
will yield a map to merge onto the result. This can be used when
graphite metrics have known patterns that you wish to extract more
information (host, refined service name, tags) from"
[line parser-fn]
(when-let [[service metric timestamp] (split line #" ")]
(when (not= metric "nan") ;; discard nan values
(try
(let [res {:service service
:metric (Float. metric)
:time (Long. timestamp)}]
(if parser-fn (merge res (parser-fn res)) res))
(catch Exception e {:ok :true :service "exception"})))))
(defn graphite-frame-decoder
"A closure which yields a graphite frame-decoder. Taking an argument
which will be given to decode-graphite-line (hence the closure)"
[parser-fn]
(fn []
(proxy [OneToOneDecoder] []
(decode [context channel message]
(decode-graphite-line message parser-fn)))))
(defn graphite-handler
"Given a core and a MessageEvent, applies the message to core."
[core e]
(doseq [stream (:streams core)]
(stream (.getMessage e))))
(defn graphite-server
"Start a graphite-server, some bits could be factored with tcp-server.
Only the default option map and the bootstrap change."
([] (graphite-server {}))
([opts]
(let [pipeline-factory #(doto (Channels/pipeline)
(.addLast "framer"
(DelimiterBasedFrameDecoder.
1024 ;; Will the magic ever stop ?
(Delimiters/lineDelimiter)))
(.addLast "string-decoder"
(StringDecoder. CharsetUtil/UTF_8))
(.addLast "string-encoder"
(StringEncoder. CharsetUtil/UTF_8))
(.addLast "graphite-decoder"
((graphite-frame-decoder
(:parser-fn opts)))))]
(tcp-server (merge {:host "127.0.0.1"
:port 2003
:pipeline-factory pipeline-factory
:handler graphite-handler}
opts)))))