Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

first phase of refactor to allow pluggable transports

  • Loading branch information...
commit 86c743cd2e22fdedc067d8660757909b8daa3aa4 1 parent ffbcb40
Pierre-Yves Ritschard pyr authored
1  riemann.config
View
@@ -11,5 +11,6 @@
(let [index (default :ttl 300 (update-index (index)))]
(streams
+ prn
index
))
8 src/riemann/common.clj
View
@@ -12,11 +12,14 @@
[clojure.java.io :as io])
(:use [clojure.string :only [split]]
[riemann.time :only [unix-time]]
- clojure.tools.logging
riemann.codec
gloss.core
clojure.math.numeric-tower))
+(defprotocol Match
+ (match [pred object]
+ "Does predicate describe object?"))
+
; Times
(defn time-at
"Returns the Date of a unix epoch time."
@@ -91,9 +94,6 @@
(re-find re string)))
; Matching
-(defprotocol Match
- (match [pred object] "Does predicate describe object?"))
-
(extend-protocol Match
; Regexes are matched against strings.
java.util.regex.Pattern
19 src/riemann/config.clj
View
@@ -4,13 +4,16 @@
config. Provides a default core and functions ((tcp|udp)-server, streams,
index) which modify that core."
(:require [riemann.core :as core]
- riemann.server
- riemann.repl
- riemann.index
+ [riemann.transport.tcp :as tcp]
+ [riemann.transport.udp :as udp]
+ [riemann.transport.websockets :as websockets]
+ [riemann.transport.graphite :as graphite]
+ [riemann.repl]
+ [riemann.index]
[riemann.logging :as logging]
[riemann.folds :as folds]
[riemann.pubsub :as pubsub]
- [riemann.graphite :as graphite]
+ [riemann.graphite :as graphite-client]
[clojure.tools.nrepl.server :as repl])
(:use clojure.tools.logging
riemann.client
@@ -22,7 +25,7 @@
(def ^{:doc "A default core."} core (core/core))
-(def graphite #'graphite/graphite)
+(def graphite #'graphite-client/graphite)
(defn repl-server
"Adds a new REPL server with opts to the default core."
@@ -33,7 +36,7 @@
"Add a new TCP server with opts to the default core."
[& opts]
(swap! (core :servers) conj
- (riemann.server/tcp-server core (apply hash-map opts))))
+ (tcp/tcp-server core (apply hash-map opts))))
(defn graphite-server
"Add a new Graphite TCP server with opts to the default core."
@@ -46,14 +49,14 @@
"Add a new UDP server with opts to the default core."
[& opts]
(swap! (core :servers) conj
- (riemann.server/udp-server core (apply hash-map opts))))
+ (udp/udp-server core (apply hash-map opts))))
(defn ws-server
"Add a new websockets server with opts to the default core."
[& opts]
(swap!
(core :servers) conj
- (riemann.server/ws-server core (apply hash-map opts))))
+ (websockets/ws-server core (apply hash-map opts))))
(defn streams
"Add any number of streams to the default core."
55 src/riemann/graphite.clj
View
@@ -14,32 +14,8 @@
(:use [clojure.string :only [split join replace]]
clojure.tools.logging
riemann.pool
- riemann.common)
- (:require [riemann.server :as server]))
+ riemann.common))
-(defn decode-graphite-line
- "Decode a line coming from graphite.
- Graphite uses a simple scheme where each metric is given as a CRLF delimited
- line, space split with three items:
-
- * The metric name
- * The metric value (optionally NaN)
- * The timestamp
-
- By default, decode-graphite-line will yield a simple metric with just
- a service metric and timestamp, a parser-fn can be given to it, which
- will yield a map to merge onto the result. This can be used when
- graphite metrics have known patterns that you wish to extract more
- information (host, refined service name, tags) from"
- [line parser-fn]
- (when-let [[service metric timestamp] (split line #" ")]
- (when (not= metric "nan") ;; discard nan values
- {:ok true
- :states []
- :events [(let [res {:service service
- :metric (Float. metric)
- :time (Long. timestamp)}]
- (if parser-fn (merge res (parser-fn res)) res))]})))
(defn graphite-path-basic
"Constructs a path for an event. Takes the hostname fqdn, reversed,
@@ -117,33 +93,4 @@
(.write ^OutputStreamWriter out string)
(.flush ^OutputStreamWriter out)))))))
-(defn graphite-frame-decoder
- "A closure which yields a graphite frame-decoder. Taking an argument
- which will be given to decode-graphite-line (hence the closure)"
- [parser-fn]
- (fn []
- (proxy [OneToOneDecoder] []
- (decode [context channel message]
- (decode-graphite-line message parser-fn)))))
-(defn graphite-server
- "Start a graphite-server, some bits could be factored with tcp-server.
- Only the default option map and the bootstrap change."
- ([core] (graphite-server core {}))
- ([core opts]
- (let [pipeline-factory #(doto (Channels/pipeline)
- (.addLast "framer"
- (DelimiterBasedFrameDecoder.
- 1024 ;; Will the magic ever stop ?
- (Delimiters/lineDelimiter)))
- (.addLast "string-decoder"
- (StringDecoder. CharsetUtil/UTF_8))
- (.addLast "string-encoder"
- (StringEncoder. CharsetUtil/UTF_8))
- (.addLast "graphite-decoder"
- ((graphite-frame-decoder
- (:parser-fn opts)))))]
- (server/tcp-server core (merge {:host "127.0.0.1"
- :port 2003
- :pipeline-factory pipeline-factory}
- opts)))))
22 src/riemann/index.clj
View
@@ -8,24 +8,24 @@
(:use [riemann.time :only [unix-time]])
(:import (org.cliffc.high_scale_lib NonBlockingHashMap)))
-; The index accepts states and maintains a table of the most recent state for
-; each unique [host, service]. It can be searched for states matching a query.
-
-(def default-ttl 60)
-
(defprotocol Index
(clear [this]
- "Resets the index")
+ "Resets the index")
(delete [this event]
- "Deletes any event with this host & service from index")
+ "Deletes any event with this host & service from index")
(delete-exactly [this event]
- "Deletes event from index")
+ "Deletes event from index")
(expire [this]
- "Return a seq of expired states from this index, removing each.")
+ "Return a seq of expired states from this index, removing each.")
(search [this query-ast]
- "Returns a seq of events from the index matching this query AST")
+ "Returns a seq of events from the index matching this query AST")
(update [this event]
- "Updates index with event"))
+ "Updates index with event"))
+
+; The index accepts states and maintains a table of the most recent state for
+; each unique [host, service]. It can be searched for states matching a query.
+
+(def default-ttl 60)
(defn nbhm-index
"Create a new nonblockinghashmap backed index"
10 src/riemann/pool.clj
View
@@ -7,14 +7,14 @@
(defprotocol Pool
(grow [pool]
- "Adds an element to the pool.")
+ "Adds an element to the pool.")
(claim [pool] [pool timeout]
- "Take a thingy from the pool. Timeout in seconds; if unspecified, 0.
- Returns nil if no thingy available.")
+ "Take a thingy from the pool. Timeout in seconds; if unspecified, 0.
+ Returns nil if no thingy available.")
(release [pool thingy]
- "Returns a thingy to the pool.")
+ "Returns a thingy to the pool.")
(invalidate [pool thingy]
- "Tell the pool a thingy is no longer valid."))
+ "Tell the pool a thingy is no longer valid."))
(defrecord FixedQueuePool [queue open close regenerate-interval]
Pool
310 src/riemann/server.clj
View
@@ -1,310 +0,0 @@
-(ns riemann.server
- "Accepts messages from external sources. Associated with a core. Sends
- incoming events to the core's streams, queries the core's index for states."
- (:import (java.net InetSocketAddress)
- (java.util.concurrent Executors)
- (org.jboss.netty.util CharsetUtil)
- (org.jboss.netty.bootstrap ConnectionlessBootstrap
- ServerBootstrap)
- (org.jboss.netty.buffer ChannelBufferInputStream
- ChannelBuffers)
- (org.jboss.netty.channel ChannelHandler
- ChannelHandlerContext
- ChannelPipeline
- ChannelPipelineFactory
- ChannelStateEvent
- Channels
- ExceptionEvent
- FixedReceiveBufferSizePredictorFactory
- MessageEvent
- SimpleChannelHandler
- SimpleChannelUpstreamHandler)
- (org.jboss.netty.channel.group ChannelGroup
- DefaultChannelGroup)
- (org.jboss.netty.channel.socket DatagramChannelFactory)
- (org.jboss.netty.channel.socket.nio NioDatagramChannelFactory
- NioServerSocketChannelFactory)
- (org.jboss.netty.handler.codec.string StringDecoder StringEncoder)
- (org.jboss.netty.handler.codec.frame LengthFieldBasedFrameDecoder
- LengthFieldPrepender
- DelimiterBasedFrameDecoder
- Delimiters)
- (org.jboss.netty.handler.codec.oneone OneToOneDecoder)
- (org.jboss.netty.handler.execution
- ExecutionHandler
- OrderedMemoryAwareThreadPoolExecutor
- MemoryAwareThreadPoolExecutor))
-
- (:require [riemann.query :as query]
- [riemann.index :as index]
- gloss.io)
- (:use riemann.core
- riemann.common
- riemann.pubsub
- clojure.tools.logging
- clojure.stacktrace
- lamina.core
- aleph.http
- [slingshot.slingshot :only [try+]]
- [clj-http.util :only [url-decode]]
- [clojure.string :only [split]]))
-
-(defn handle
- "Handles a msg with the given core."
- [core msg]
- (try+
- ; Send each event/state to each stream
- (doseq [event (concat (:events msg) (:states msg))
- stream (deref (:streams core))]
- (stream event))
-
- (if (:query msg)
- ; Handle query
- (let [ast (query/ast (:string (:query msg)))]
- (if-let [i (deref (:index core))]
- {:ok true :events (index/search i ast)}
- {:ok false :error "no index"}))
-
- ; Generic acknowledge
- {:ok true})
-
- ; Some kind of error happened
- (catch [:type :riemann.query/parse-error] {:keys [message]}
- {:ok false :error (str "parse error: " message)})
- (catch Exception ^Exception e
- {:ok false :error (.getMessage e)})))
-
-(defn int32-frame-decoder
- []
- ; Offset 0, 4 byte header, skip those 4 bytes.
- (LengthFieldBasedFrameDecoder. Integer/MAX_VALUE, 0, 4, 0, 4))
-
-(defn int32-frame-encoder
- []
- (LengthFieldPrepender. 4))
-
-(defn protobuf-frame-decoder []
- (proxy [OneToOneDecoder] []
- (decode [context channel message]
- (let [instream (ChannelBufferInputStream. message)]
- (decode-inputstream instream)))))
-
-(defn tcp-handler
- "Returns a TCP handler for the given core"
- [core ^ChannelGroup channel-group]
- (proxy [SimpleChannelHandler] []
- (channelOpen [context ^ChannelStateEvent state-event]
- (.add channel-group (.getChannel state-event)))
-
- (messageReceived [^ChannelHandlerContext context
- ^MessageEvent message-event]
- (let [channel (.getChannel message-event)
- msg (.getMessage message-event)]
- (try
- (let [response (handle core msg)
- encoded (encode response)]
- (.write channel (ChannelBuffers/wrappedBuffer encoded)))
- (catch java.nio.channels.ClosedChannelException e
- (warn "channel closed"))
- (catch com.google.protobuf.InvalidProtocolBufferException e
- (warn "invalid message, closing")
- (.close channel)))))
-
- (exceptionCaught [context ^ExceptionEvent exception-event]
- (warn (.getCause exception-event) "TCP handler caught")
- (.close (.getChannel exception-event)))))
-
-(defn udp-handler
- "Returns a UDP handler for the given core."
- [core ^ChannelGroup channel-group]
- (proxy [SimpleChannelUpstreamHandler] []
- (channelOpen [context ^ChannelStateEvent state-event]
- (.add channel-group (.getChannel state-event)))
-
- (messageReceived [context ^MessageEvent message-event]
- (handle core (.getMessage message-event)))
- (exceptionCaught [context ^ExceptionEvent exception-event]
- (warn (.getCause exception-event) "UDP handler caught"))))
-
-(defn channel-pipeline-factory
- "Return a factory for ChannelPipelines given a wire protocol-specific
- pipeline factory and a network protocol-specific handler."
- [pipeline-factory handler]
- (proxy [ChannelPipelineFactory] []
- (getPipeline []
- (doto ^ChannelPipeline (pipeline-factory)
- (.addLast "executor" (ExecutionHandler.
- (OrderedMemoryAwareThreadPoolExecutor.
- 16 1048576 1048576))) ; Maaagic values!
- (.addLast "handler" handler)))))
-
-(defn tcp-server
- "Create a new TCP server for a core. Starts immediately. Options:
- :host The host to listen on (default 127.0.0.1).
- :port The port to listen on. (default 5555)"
- ([core] (tcp-server core {}))
- ([core opts]
- (let [pipeline-factory #(doto (Channels/pipeline)
- (.addLast "int32-frame-decoder"
- (int32-frame-decoder))
- (.addLast "int32-frame-encoder"
- (int32-frame-encoder))
- (.addLast "protobuf-decoder"
- (protobuf-frame-decoder)))
- opts (merge {:host "127.0.0.1"
- :port 5555
- :pipeline-factory pipeline-factory}
- opts)
- bootstrap (ServerBootstrap.
- (NioServerSocketChannelFactory.
- (Executors/newCachedThreadPool)
- (Executors/newCachedThreadPool)))
- all-channels (DefaultChannelGroup. (str "tcp-server " opts))
- cpf (channel-pipeline-factory
- (:pipeline-factory opts) (tcp-handler core all-channels))]
-
- ; Configure bootstrap
- (doto bootstrap
- (.setPipelineFactory cpf)
- (.setOption "readWriteFair" true)
- (.setOption "tcpNoDelay" true)
- (.setOption "reuseAddress" true)
- (.setOption "child.tcpNoDelay" true)
- (.setOption "child.reuseAddress" true)
- (.setOption "child.keepAlive" true))
-
- ; Start bootstrap
- (let [server-channel (.bind bootstrap
- (InetSocketAddress. ^String (:host opts)
- ^Integer (:port opts)))]
- (.add all-channels server-channel))
- (info "TCP server" (select-keys opts [:host :port]) "online")
-
- ; fn to close server
- (fn []
- (-> all-channels .close .awaitUninterruptibly)
- (.releaseExternalResources bootstrap)
- (info "TCP server" (select-keys opts [:host :port]) "shut down")))))
-
-(defn udp-server
- "Starts a new UDP server for a core. Starts immediately.
-
- IMPORTANT: The UDP server has a maximum datagram size--by default, 16384
- bytes. If your client does not agree on the maximum datagram size (and send
- big messages over TCP instead), it can send large messages which will be
- dropped with protobuf parse errors in the log.
-
- Options:
- :host The address to listen on (default 127.0.0.1).
- :port The port to listen on (default 5555).
- :max-size The maximum datagram size (default 16384 bytes)."
- ([core] (udp-server core {}))
- ([core opts]
- (let [pipeline-factory #(doto (Channels/pipeline)
- (.addLast "protobuf-decoder"
- (protobuf-frame-decoder)))
- opts (merge {:host "127.0.0.1"
- :port 5555
- :max-size 16384
- :pipeline-factory pipeline-factory}
- opts)
- bootstrap (ConnectionlessBootstrap.
- (NioDatagramChannelFactory.
- (Executors/newCachedThreadPool)))
- all-channels (DefaultChannelGroup. (str "udp-server " opts))
- cpf (channel-pipeline-factory
- (:pipeline-factory opts) (udp-handler core all-channels))]
-
- ; Configure bootstrap
- (doto bootstrap
- (.setPipelineFactory cpf)
- (.setOption "broadcast" "false")
- (.setOption "receiveBufferSizePredictorFactory"
- (FixedReceiveBufferSizePredictorFactory. (:max-size opts))))
-
- ; Start bootstrap
- (let [server-channel (.bind bootstrap
- (InetSocketAddress. ^String (:host opts)
- ^Integer (:port opts)))]
- (.add all-channels server-channel))
- (info "UDP server" (select-keys opts [:host :port :max-size]) "online")
-
- ; fn to close server
- (fn []
- (-> all-channels .close .awaitUninterruptibly)
- (.releaseExternalResources bootstrap)
- (info "UDP server" (select-keys opts [:host :port :max-size])
- "shut down")))))
-
-(defn http-query-map
- "Converts a URL query string into a map."
- [string]
- (apply hash-map
- (map url-decode
- (mapcat (fn [kv] (split kv #"=" 2))
- (split string #"&")))))
-
-;;; Websockets
-(defn ws-pubsub-handler [core ch hs]
- (let [topic (url-decode (last (split (:uri hs) #"/" 3)))
- params (http-query-map (:query-string hs))
- query (params "query")
- pred (query/fun (query/ast query))
- sub (subscribe (:pubsub core) topic
- (fn [event]
- (when (pred event)
- (enqueue ch (event-to-json event)))))]
- (info "New websocket subscription to" topic ":" query)
- (receive-all ch (fn [msg]
- (when-not msg
- ; Shut down channel
- (info "Closing websocket "
- (:remote-addr hs) topic query)
- (close ch)
- (unsubscribe (:pubsub core) sub))))))
-
-(defn ws-index-handler
- "Queries the index for events and streams them to the client. If subscribe is
- true, also initiates a pubsub subscription to the index topic with that
- query."
- [core ch hs]
- (let [params (http-query-map (:query-string hs))
- query (params "query")
- ast (query/ast query)]
- (when-let [i (deref (:index core))]
- (doseq [event (index/search i ast)]
- (enqueue ch (event-to-json event))))
- (if (= (params "subscribe") "true")
- (ws-pubsub-handler core ch (assoc hs :uri "/pubsub/index"))
- (close ch))))
-
-(defn ws-handler [core]
- (fn [ch handshake]
- (info "Websocket connection from" (:remote-addr handshake)
- (:uri handshake)
- (:query-string handshake))
- (condp re-matches (:uri handshake)
- #"^/index/?$" (ws-index-handler core ch handshake)
- #"^/pubsub/[^/]+/?$" (ws-pubsub-handler core ch handshake)
- :else (do
- (info "Unknown URI " (:uri handshake) ", closing")
- (close ch)))))
-
-(defn ws-server
- "Starts a new websocket server for a core. Starts immediately.
-
- Options:
- :host The address to listen on (default 127.0.0.1)
- :post The port to listen on (default 5556)"
- ([core] (udp-server core {}))
- ([core opts]
- (let [opts (merge {:host "127.0.0.1"
- :port 5556}
- opts)
- s (start-http-server (ws-handler core) {:host (:host opts)
- :port (:port opts)
- :websocket true})]
- (info "Websockets server" opts "online")
- (fn []
- (s)
- (info "Websockets server" opts "shut down")))))
28 src/riemann/time.clj
View
@@ -4,9 +4,21 @@
threadpool for task execution, controlled by (start!) and (stop!)."
(:import [java.util.concurrent ConcurrentSkipListSet]
[java.util.concurrent.locks LockSupport])
- (:use clojure.math.numeric-tower
- [clojure.stacktrace :only [print-stack-trace]]
- clojure.tools.logging))
+ (:use [clojure.math.numeric-tower :only [ceil]]
+ [clojure.stacktrace :only [print-stack-trace]]
+ [clojure.tools.logging :only [warn]]))
+
+(defprotocol Task
+ (succ [task]
+ "The successive task to this one.")
+ (run [task]
+ "Executes this task.")
+ (cancel [task]
+ "Cancel this task."))
+
+(defprotocol Deferrable
+ (defer [this new-time]
+ "Schedule a task for a new time."))
(defn unix-time-real
"The current unix epoch time in seconds, taken from
@@ -24,16 +36,6 @@
(def unix-time unix-time-real)
(def linear-time linear-time-real)
-(defprotocol Task
- (succ [task] "The successive task to this one.")
-
- (run [task] "Executes this task.")
-
- (cancel [task] "Cancel this task."))
-
-(defprotocol Deferrable
- (defer [this new-time] "Schedule a task for a new time."))
-
(defrecord Once [id f t cancelled]
Task
(succ [this] nil)
70 src/riemann/transport.clj
View
@@ -0,0 +1,70 @@
+(ns riemann.transport
+ "Functions used in several transports. Some netty parts transpire
+ here since netty is the preferred method of providing transports"
+ (:use [slingshot.slingshot :only [try+]]
+ [riemann.common :only [decode-inputstream]]
+ [riemann.core :only [core]])
+ (:require [riemann.query :as query]
+ [riemann.protocol :as p])
+ (:import
+ [org.jboss.netty.channel ChannelPipelineFactory ChannelPipeline]
+ [org.jboss.netty.buffer ChannelBufferInputStream]
+ [org.jboss.netty.handler.codec.oneone OneToOneDecoder]
+ [org.jboss.netty.handler.execution ExecutionHandler
+ OrderedMemoryAwareThreadPoolExecutor]))
+
+(defprotocol Transport
+ "A riemann transport is a way of emitting and receiving events
+ over the wire."
+ (setup [this opts]
+ "Setup step for transports. In order to handle server life-cycle
+ correctly, can be called several times.")
+ (capabilities [this]
+ "Return a collection of keywords representing what the transport
+ can handle, possible values are: :queries and :events")
+ (start [this]
+ "Start listening for events and ")
+ (stop [this]
+ "Gracefully stop the server"))
+
+(defn channel-pipeline-factory
+ "Return a factory for ChannelPipelines given a wire protocol-specific
+ pipeline factory and a network protocol-specific handler."
+ [pipeline-factory handler]
+ (proxy [ChannelPipelineFactory] []
+ (getPipeline []
+ (doto ^ChannelPipeline (pipeline-factory)
+ (.addLast "executor" (ExecutionHandler.
+ (OrderedMemoryAwareThreadPoolExecutor.
+ 16 1048576 1048576))) ; Maaagic values!
+ (.addLast "handler" handler)))))
+
+(defn protobuf-frame-decoder []
+ (proxy [OneToOneDecoder] []
+ (decode [context channel message]
+ (let [instream (ChannelBufferInputStream. message)]
+ (decode-inputstream instream)))))
+
+(defn handle
+ "Handles a msg with the given core."
+ [core msg]
+ (try+
+ ;; Send each event/state to each stream
+ (doseq [event (concat (:events msg) (:states msg))
+ stream (deref (:streams core))]
+ (stream event))
+
+ (if (:query msg)
+ ;; Handle query
+ (let [ast (query/ast (:string (:query msg)))]
+ (if-let [i (deref (:index core))]
+ {:ok true :events (p/search i ast)}
+ {:ok false :error "no index"}))
+
+ {:ok true})
+
+ ;; Some kind of error happened
+ (catch [:type :riemann.query/parse-error] {:keys [message]}
+ {:ok false :error (str "parse error: " message)})
+ (catch Exception ^Exception e
+ {:ok false :error (.getMessage e)})))
65 src/riemann/transport/graphite.clj
View
@@ -0,0 +1,65 @@
+(ns riemann.transport.graphite
+ (:import [org.jboss.netty.util CharsetUtil]
+ [org.jboss.netty.channel Channels]
+ [org.jboss.netty.handler.codec.oneone OneToOneDecoder]
+ [org.jboss.netty.handler.codec.string StringDecoder StringEncoder]
+ [org.jboss.netty.handler.codec.frame
+ DelimiterBasedFrameDecoder
+ Delimiters])
+ (:use [riemann.transport.tcp :only [tcp-server]]
+ [clojure.string :only [split]]))
+
+(defn decode-graphite-line
+ "Decode a line coming from graphite.
+ Graphite uses a simple scheme where each metric is given as a CRLF delimited
+ line, space split with three items:
+
+ * The metric name
+ * The metric value (optionally NaN)
+ * The timestamp
+
+ By default, decode-graphite-line will yield a simple metric with just
+ a service metric and timestamp, a parser-fn can be given to it, which
+ will yield a map to merge onto the result. This can be used when
+ graphite metrics have known patterns that you wish to extract more
+ information (host, refined service name, tags) from"
+ [line parser-fn]
+ (when-let [[service metric timestamp] (split line #" ")]
+ (when (not= metric "nan") ;; discard nan values
+ {:ok true
+ :states []
+ :events [(let [res {:service service
+ :metric (Float. metric)
+ :time (Long. timestamp)}]
+ (if parser-fn (merge res (parser-fn res)) res))]})))
+
+(defn graphite-frame-decoder
+ "A closure which yields a graphite frame-decoder. Taking an argument
+ which will be given to decode-graphite-line (hence the closure)"
+ [parser-fn]
+ (fn []
+ (proxy [OneToOneDecoder] []
+ (decode [context channel message]
+ (decode-graphite-line message parser-fn)))))
+
+(defn graphite-server
+ "Start a graphite-server, some bits could be factored with tcp-server.
+ Only the default option map and the bootstrap change."
+ ([core] (graphite-server core {}))
+ ([core opts]
+ (let [pipeline-factory #(doto (Channels/pipeline)
+ (.addLast "framer"
+ (DelimiterBasedFrameDecoder.
+ 1024 ;; Will the magic ever stop ?
+ (Delimiters/lineDelimiter)))
+ (.addLast "string-decoder"
+ (StringDecoder. CharsetUtil/UTF_8))
+ (.addLast "string-encoder"
+ (StringEncoder. CharsetUtil/UTF_8))
+ (.addLast "graphite-decoder"
+ ((graphite-frame-decoder
+ (:parser-fn opts)))))]
+ (tcp-server core (merge {:host "127.0.0.1"
+ :port 2003
+ :pipeline-factory pipeline-factory}
+ opts)))))
110 src/riemann/transport/tcp.clj
View
@@ -0,0 +1,110 @@
+(ns riemann.transport.tcp
+ "Accepts messages from external sources. Associated with a core. Sends
+ incoming events to the core's streams, queries the core's index for states."
+ (:import [java.net InetSocketAddress]
+ [java.util.concurrent Executors]
+ [org.jboss.netty.bootstrap ServerBootstrap]
+ [org.jboss.netty.buffer ChannelBuffers]
+ [org.jboss.netty.channel ChannelHandler
+ ChannelHandlerContext
+ ChannelPipeline
+ ChannelPipelineFactory
+ ChannelStateEvent
+ Channels
+ ExceptionEvent
+ MessageEvent
+ SimpleChannelHandler]
+ [org.jboss.netty.channel.group ChannelGroup DefaultChannelGroup]
+ [org.jboss.netty.channel.socket.nio NioServerSocketChannelFactory]
+ [org.jboss.netty.handler.codec.frame LengthFieldBasedFrameDecoder
+ LengthFieldPrepender]
+ [org.jboss.netty.handler.execution
+ OrderedMemoryAwareThreadPoolExecutor])
+ (:use [riemann.transport :only [handle protobuf-frame-decoder
+ channel-pipeline-factory]]
+ [clojure.tools.logging :only [info warn]]
+ [riemann.transport :only [handle]]
+ [riemann.common :only [encode]]))
+
+(defn int32-frame-decoder
+ []
+ ; Offset 0, 4 byte header, skip those 4 bytes.
+ (LengthFieldBasedFrameDecoder. Integer/MAX_VALUE, 0, 4, 0, 4))
+
+(defn int32-frame-encoder
+ []
+ (LengthFieldPrepender. 4))
+
+(defn tcp-handler
+ "Returns a TCP handler for the given core"
+ [core ^ChannelGroup channel-group]
+ (proxy [SimpleChannelHandler] []
+ (channelOpen [context ^ChannelStateEvent state-event]
+ (.add channel-group (.getChannel state-event)))
+
+ (messageReceived [^ChannelHandlerContext context
+ ^MessageEvent message-event]
+ (let [channel (.getChannel message-event)
+ msg (.getMessage message-event)]
+ (try
+ (let [response (handle core msg)
+ encoded (encode response)]
+ (.write channel (ChannelBuffers/wrappedBuffer encoded)))
+ (catch java.nio.channels.ClosedChannelException e
+ (warn "channel closed"))
+ (catch com.google.protobuf.InvalidProtocolBufferException e
+ (warn "invalid message, closing")
+ (.close channel)))))
+
+ (exceptionCaught [context ^ExceptionEvent exception-event]
+ (warn (.getCause exception-event) "TCP handler caught")
+ (.close (.getChannel exception-event)))))
+
+(defn tcp-server
+ "Create a new TCP server for a core. Starts immediately. Options:
+ :host The host to listen on (default 127.0.0.1).
+ :port The port to listen on. (default 5555)"
+ ([core]
+ (tcp-server core {}))
+ ([core opts]
+ (let [pipeline-factory #(doto (Channels/pipeline)
+ (.addLast "int32-frame-decoder"
+ (int32-frame-decoder))
+ (.addLast "int32-frame-encoder"
+ (int32-frame-encoder))
+ (.addLast "protobuf-decoder"
+ (protobuf-frame-decoder)))
+ opts (merge {:host "127.0.0.1"
+ :port 5555
+ :pipeline-factory pipeline-factory}
+ opts)
+ bootstrap (ServerBootstrap.
+ (NioServerSocketChannelFactory.
+ (Executors/newCachedThreadPool)
+ (Executors/newCachedThreadPool)))
+ all-channels (DefaultChannelGroup. (str "tcp-server " opts))
+ cpf (channel-pipeline-factory
+ (:pipeline-factory opts) (tcp-handler core all-channels))]
+
+ ;; Configure bootstrap
+ (doto bootstrap
+ (.setPipelineFactory cpf)
+ (.setOption "readWriteFair" true)
+ (.setOption "tcpNoDelay" true)
+ (.setOption "reuseAddress" true)
+ (.setOption "child.tcpNoDelay" true)
+ (.setOption "child.reuseAddress" true)
+ (.setOption "child.keepAlive" true))
+
+ ;; Start bootstrap
+ (let [server-channel (.bind bootstrap
+ (InetSocketAddress. ^String (:host opts)
+ ^Integer (:port opts)))]
+ (.add all-channels server-channel))
+ (info "TCP server" (select-keys opts [:host :port]) "online")
+
+ ;; fn to close server
+ (fn []
+ (-> all-channels .close .awaitUninterruptibly)
+ (.releaseExternalResources bootstrap)
+ (info "TCP server" (select-keys opts [:host :port]) "shut down")))))
80 src/riemann/transport/udp.clj
View
@@ -0,0 +1,80 @@
+(ns riemann.transport.udp
+ "Accepts messages from external sources. Associated with a core. Sends
+ incoming events to the core's streams, queries the core's index for states."
+ (:import [java.net InetSocketAddress]
+ [java.util.concurrent Executors]
+ [org.jboss.netty.bootstrap ConnectionlessBootstrap]
+ [org.jboss.netty.channel ChannelStateEvent
+ Channels
+ ExceptionEvent
+ FixedReceiveBufferSizePredictorFactory
+ MessageEvent
+ SimpleChannelUpstreamHandler]
+ [org.jboss.netty.channel.group ChannelGroup DefaultChannelGroup]
+ [org.jboss.netty.channel.socket.nio NioDatagramChannelFactory])
+ (:use [clojure.tools.logging :only [warn info]]
+ [clojure.string :only [split]]
+ [riemann.transport :only [handle protobuf-frame-decoder
+ channel-pipeline-factory]]))
+
+(defn udp-handler
+ "Returns a UDP handler for the given core."
+ [core ^ChannelGroup channel-group]
+ (proxy [SimpleChannelUpstreamHandler] []
+ (channelOpen [context ^ChannelStateEvent state-event]
+ (.add channel-group (.getChannel state-event)))
+
+ (messageReceived [context ^MessageEvent message-event]
+ (handle core (.getMessage message-event)))
+ (exceptionCaught [context ^ExceptionEvent exception-event]
+ (warn (.getCause exception-event) "UDP handler caught"))))
+
+(defn udp-server
+ "Starts a new UDP server for a core. Starts immediately.
+
+ IMPORTANT: The UDP server has a maximum datagram size--by default, 16384
+ bytes. If your client does not agree on the maximum datagram size (and send
+ big messages over TCP instead), it can send large messages which will be
+ dropped with protobuf parse errors in the log.
+
+ Options:
+ :host The address to listen on (default 127.0.0.1).
+ :port The port to listen on (default 5555).
+ :max-size The maximum datagram size (default 16384 bytes)."
+ ([core] (udp-server core {}))
+ ([core opts]
+ (let [pipeline-factory #(doto (Channels/pipeline)
+ (.addLast "protobuf-decoder"
+ (protobuf-frame-decoder)))
+ opts (merge {:host "127.0.0.1"
+ :port 5555
+ :max-size 16384
+ :pipeline-factory pipeline-factory}
+ opts)
+ bootstrap (ConnectionlessBootstrap.
+ (NioDatagramChannelFactory.
+ (Executors/newCachedThreadPool)))
+ all-channels (DefaultChannelGroup. (str "udp-server " opts))
+ cpf (channel-pipeline-factory
+ (:pipeline-factory opts) (udp-handler core all-channels))]
+
+ ; Configure bootstrap
+ (doto bootstrap
+ (.setPipelineFactory cpf)
+ (.setOption "broadcast" "false")
+ (.setOption "receiveBufferSizePredictorFactory"
+ (FixedReceiveBufferSizePredictorFactory. (:max-size opts))))
+
+ ; Start bootstrap
+ (let [server-channel (.bind bootstrap
+ (InetSocketAddress. ^String (:host opts)
+ ^Integer (:port opts)))]
+ (.add all-channels server-channel))
+ (info "UDP server" (select-keys opts [:host :port :max-size]) "online")
+
+ ; fn to close server
+ (fn []
+ (-> all-channels .close .awaitUninterruptibly)
+ (.releaseExternalResources bootstrap)
+ (info "UDP server" (select-keys opts [:host :port :max-size])
+ "shut down")))))
86 src/riemann/transport/websockets.clj
View
@@ -0,0 +1,86 @@
+(ns riemann.transport.websockets
+ "Accepts messages from external sources. Associated with a core. Sends
+ incoming events to the core's streams, queries the core's index for states."
+ (:require [riemann.query :as query]
+ [riemann.index :as index]
+ [riemann.pubsub :as p])
+ (:use [riemann.common :only [event-to-json]]
+ [riemann.core :only [core]]
+ [aleph.http :only [start-http-server]]
+ [lamina.core :only [receive-all close enqueue]]
+ [clojure.tools.logging :only [info warn]]
+ [clj-http.util :only [url-decode]]
+ [clojure.string :only [split]]))
+
+(defn http-query-map
+ "Converts a URL query string into a map."
+ [string]
+ (apply hash-map
+ (map url-decode
+ (mapcat (fn [kv] (split kv #"=" 2))
+ (split string #"&")))))
+
+;;; Websockets
+(defn ws-pubsub-handler [core ch hs]
+ (let [topic (url-decode (last (split (:uri hs) #"/" 3)))
+ params (http-query-map (:query-string hs))
+ query (params "query")
+ pred (query/fun (query/ast query))
+ sub (p/subscribe (:pubsub core) topic
+ (fn [event]
+ (when (pred event)
+ (enqueue ch (event-to-json event)))))]
+ (info "New websocket subscription to" topic ":" query)
+ (receive-all ch (fn [msg]
+ (when-not msg
+ ; Shut down channel
+ (info "Closing websocket "
+ (:remote-addr hs) topic query)
+ (close ch)
+ (p/unsubscribe (:pubsub core) sub))))))
+
+(defn ws-index-handler
+ "Queries the index for events and streams them to the client. If subscribe is
+ true, also initiates a pubsub subscription to the index topic with that
+ query."
+ [core ch hs]
+ (let [params (http-query-map (:query-string hs))
+ query (params "query")
+ ast (query/ast query)]
+ (when-let [i (deref (:index core))]
+ (doseq [event (index/search i ast)]
+ (enqueue ch (event-to-json event))))
+ (if (= (params "subscribe") "true")
+ (ws-pubsub-handler core ch (assoc hs :uri "/pubsub/index"))
+ (close ch))))
+
+(defn ws-handler [core]
+ (fn [ch handshake]
+ (info "Websocket connection from" (:remote-addr handshake)
+ (:uri handshake)
+ (:query-string handshake))
+ (condp re-matches (:uri handshake)
+ #"^/index/?$" (ws-index-handler core ch handshake)
+ #"^/pubsub/[^/]+/?$" (ws-pubsub-handler core ch handshake)
+ :else (do
+ (info "Unknown URI " (:uri handshake) ", closing")
+ (close ch)))))
+
+(defn ws-server
+ "Starts a new websocket server for a core. Starts immediately.
+
+ Options:
+ :host The address to listen on (default 127.0.0.1)
+ :post The port to listen on (default 5556)"
+ ([core] (ws-server core {}))
+ ([core opts]
+ (let [opts (merge {:host "127.0.0.1"
+ :port 5556}
+ opts)
+ s (start-http-server (ws-handler core) {:host (:host opts)
+ :port (:port opts)
+ :websocket true})]
+ (info "Websockets server" opts "online")
+ (fn []
+ (s)
+ (info "Websockets server" opts "shut down")))))
Please sign in to comment.
Something went wrong with that request. Please try again.