/
server.clj
311 lines (279 loc) · 12.1 KB
/
server.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
(ns riemann.server
"Accepts messages from external sources. Associated with a core. Sends
incoming events to the core's streams, queries the core's index for states."
(:import (java.net InetSocketAddress)
(java.util.concurrent Executors)
(org.jboss.netty.bootstrap ConnectionlessBootstrap
ServerBootstrap)
(org.jboss.netty.buffer ChannelBufferInputStream
ChannelBuffers)
(org.jboss.netty.channel ChannelHandler
ChannelHandlerContext
ChannelPipeline
ChannelPipelineFactory
ChannelStateEvent
Channels
ExceptionEvent
FixedReceiveBufferSizePredictorFactory
MessageEvent
SimpleChannelHandler
SimpleChannelUpstreamHandler)
(org.jboss.netty.channel.group ChannelGroup
DefaultChannelGroup)
(org.jboss.netty.channel.socket DatagramChannelFactory)
(org.jboss.netty.channel.socket.nio NioDatagramChannelFactory
NioServerSocketChannelFactory)
(org.jboss.netty.handler.codec.frame LengthFieldBasedFrameDecoder
LengthFieldPrepender)
(org.jboss.netty.handler.codec.oneone OneToOneDecoder)
(org.jboss.netty.handler.execution
ExecutionHandler
OrderedMemoryAwareThreadPoolExecutor
MemoryAwareThreadPoolExecutor))
(:require [riemann.query :as query])
(:require [riemann.index :as index])
(:use riemann.core)
(:use riemann.common)
(:use riemann.pubsub)
(:use clojure.tools.logging)
(:use [protobuf.core])
(:use [slingshot.slingshot :only [try+]])
(:use clojure.stacktrace)
(:use lamina.core)
(:use aleph.http)
(:use [clj-http.util :only [url-decode]])
(:use [clojure.string :only [split]])
(:require gloss.io))
(defn handle
"Handles a msg with the given core."
[core msg]
(try+
; Send each event/state to each stream
(doseq [event (concat (:events msg) (:states msg))
stream (deref (:streams core))]
(stream event))
(if (:query msg)
; Handle query
(let [ast (query/ast (:string (:query msg)))]
(if-let [i (deref (:index core))]
{:ok true :events (index/search i ast)}
{:ok false :error "no index"}))
; Generic acknowledge
{:ok true})
; Some kind of error happened
(catch [:type :riemann.query/parse-error] {:keys [message]}
{:ok false :error (str "parse error: " message)})
(catch Exception ^Exception e
{:ok false :error (.getMessage e)})))
(defn int32-frame-decoder
[]
; Offset 0, 4 byte header, skip those 4 bytes.
(LengthFieldBasedFrameDecoder. Integer/MAX_VALUE, 0, 4, 0, 4))
(defn int32-frame-encoder
[]
(LengthFieldPrepender. 4))
(defn protobuf-frame-decoder []
(proxy [OneToOneDecoder] []
(decode [context channel message]
(let [instream (ChannelBufferInputStream. message)]
(decode-inputstream instream)))))
(defn tcp-handler
"Returns a TCP handler for the given core"
[core ^ChannelGroup channel-group]
(proxy [SimpleChannelHandler] []
(channelOpen [context ^ChannelStateEvent state-event]
(.add channel-group (.getChannel state-event)))
(messageReceived [^ChannelHandlerContext context
^MessageEvent message-event]
(let [channel (.getChannel message-event)
msg (.getMessage message-event)]
(try
(let [response (handle core msg)
encoded (encode response)]
(.write channel (ChannelBuffers/wrappedBuffer encoded)))
(catch java.nio.channels.ClosedChannelException e
(warn "channel closed"))
(catch com.google.protobuf.InvalidProtocolBufferException e
(warn "invalid message, closing")
(.close channel)))))
(exceptionCaught [context ^ExceptionEvent exception-event]
(warn (.getCause exception-event) "TCP handler caught")
(.close (.getChannel exception-event)))))
(defn udp-handler
"Returns a UDP handler for the given core."
[core ^ChannelGroup channel-group]
(proxy [SimpleChannelUpstreamHandler] []
(channelOpen [context ^ChannelStateEvent state-event]
(.add channel-group (.getChannel state-event)))
(messageReceived [context ^MessageEvent message-event]
(handle core (.getMessage message-event)))
(exceptionCaught [context ^ExceptionEvent exception-event]
(warn (.getCause exception-event) "UDP handler caught"))))
(defn tcp-cpf
"TCP Channel Pipeline Factory"
[core channel-group message-decoder]
(proxy [ChannelPipelineFactory] []
(getPipeline []
(let [decoder (int32-frame-decoder)
encoder (int32-frame-encoder)
executor (ExecutionHandler.
(OrderedMemoryAwareThreadPoolExecutor.
16 1048576 1048576)) ; Maaagic values!
handler (tcp-handler core channel-group)]
(doto (Channels/pipeline)
(.addLast "int32-frame-decoder" decoder)
(.addLast "int32-frame-encoder" encoder)
(.addLast "message-decoder" (message-decoder))
(.addLast "executor" executor)
(.addLast "handler" handler))))))
(defn udp-cpf
"Channel Pipeline Factory"
[core channel-group message-decoder]
(proxy [ChannelPipelineFactory] []
(getPipeline []
(let [executor (ExecutionHandler.
(MemoryAwareThreadPoolExecutor.
16 1048576 1048576)) ;; Moar magic!
handler (udp-handler core channel-group)]
(doto (Channels/pipeline)
(.addLast "message-decoder" (message-decoder))
(.addLast "executor" executor)
(.addLast "handler" handler))))))
(defn tcp-server
"Create a new TCP server for a core. Starts immediately. Options:
:host The host to listen on (default localhost).
:port The port to listen on. (default 5555)"
([core] (tcp-server core {}))
([core opts]
(let [opts (merge {:host "localhost"
:port 5555
:message-decoder protobuf-frame-decoder}
opts)
bootstrap (ServerBootstrap.
(NioServerSocketChannelFactory.
(Executors/newCachedThreadPool)
(Executors/newCachedThreadPool)))
all-channels (DefaultChannelGroup. (str "tcp-server " opts))
cpf (tcp-cpf core all-channels (:message-decoder opts))]
; Configure bootstrap
(doto bootstrap
(.setPipelineFactory cpf)
(.setOption "readWriteFair" true)
(.setOption "tcpNoDelay" true)
(.setOption "reuseAddress" true)
(.setOption "child.tcpNoDelay" true)
(.setOption "child.reuseAddress" true)
(.setOption "child.keepAlive" true))
; Start bootstrap
(let [server-channel (.bind bootstrap
(InetSocketAddress. ^String (:host opts)
^Integer (:port opts)))]
(.add all-channels server-channel))
(info "TCP server" opts " online")
; fn to close server
(fn []
(-> all-channels .close .awaitUninterruptibly)
(.releaseExternalResources bootstrap)))))
(defn udp-server
"Starts a new UDP server for a core. Starts immediately.
IMPORTANT: The UDP server has a maximum datagram size--by default, 16384
bytes. If your client does not agree on the maximum datagram size (and send
big messages over TCP instead), it can send large messages which will be
dropped with protobuf parse errors in the log.
Options:
:host The address to listen on (default localhost).
:port The port to listen on (default 5555).
:max-size The maximum datagram size (default 16384 bytes)."
([core] (udp-server core {}))
([core opts]
(let [opts (merge {:host "localhost"
:port 5555
:max-size 16384
:message-decoder protobuf-frame-decoder}
opts)
bootstrap (ConnectionlessBootstrap.
(NioDatagramChannelFactory.
(Executors/newCachedThreadPool)))
all-channels (DefaultChannelGroup. (str "udp-server " opts))
cpf (udp-cpf core all-channels (:message-decoder opts))]
; Configure bootstrap
(doto bootstrap
(.setPipelineFactory cpf)
(.setOption "broadcast" "false")
(.setOption "receiveBufferSizePredictorFactory"
(FixedReceiveBufferSizePredictorFactory. (:max-size opts))))
; Start bootstrap
(let [server-channel (.bind bootstrap
(InetSocketAddress. ^String (:host opts)
^Integer (:port opts)))]
(.add all-channels server-channel))
(info "UDP server" opts "online")
; fn to close server
(fn []
(-> all-channels .close .awaitUninterruptibly)
(.releaseExternalResources bootstrap)))))
(defn http-query-map
"Converts a URL query string into a map."
[string]
(apply hash-map
(map url-decode
(mapcat (fn [kv] (split kv #"=" 2))
(split string #"&")))))
;;; Websockets
(defn ws-pubsub-handler [core ch hs]
(let [topic (url-decode (last (split (:uri hs) #"/" 3)))
params (http-query-map (:query-string hs))
query (params "query")
pred (query/fun (query/ast query))
sub (subscribe (:pubsub core) topic
(fn [event]
(when (pred event)
(enqueue ch (event-to-json event)))))]
(info "New websocket subscription to" topic ":" query)
(receive-all ch (fn [msg]
(when-not msg
; Shut down channel
(info "Closing websocket "
(:remote-addr hs) topic query)
(close ch)
(unsubscribe (:pubsub core) sub))))))
(defn ws-index-handler
"Queries the index for events and streams them to the client. If subscribe is
true, also initiates a pubsub subscription to the index topic with that
query."
[core ch hs]
(let [params (http-query-map (:query-string hs))
query (params "query")
ast (query/ast query)]
(when-let [i (deref (:index core))]
(doseq [event (index/search i ast)]
(enqueue ch (event-to-json event))))
(if (= (params "subscribe") "true")
(ws-pubsub-handler core ch (assoc hs :uri "/pubsub/index"))
(close ch))))
(defn ws-handler [core]
(fn [ch handshake]
(info "Websocket connection from" (:remote-addr handshake)
(:uri handshake)
(:query-string handshake))
(condp re-matches (:uri handshake)
#"^/index/?$" (ws-index-handler core ch handshake)
#"^/pubsub/[^/]+/?$" (ws-pubsub-handler core ch handshake)
:else (do
(info "Unknown URI " (:uri handshake) ", closing")
(close ch)))))
(defn ws-server
"Starts a new websocket server for a core. Starts immediately.
Options:
:host The address to listen on (default localhost)
:post The port to listen on (default 5556)"
([core] (udp-server core {}))
([core opts]
(let [opts (merge {:host "localhost"
:port 5556}
opts)
s (start-http-server (ws-handler core) {:host (:host opts)
:port (:port opts)
:websocket true})]
(info "Websockets server" opts "online")
s)))