/
server.clj
275 lines (253 loc) · 10.3 KB
/
server.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
(ns aleph.http.websocket.server
(:require
[aleph.http.core :as http]
[aleph.http.websocket.common :as ws.common]
[aleph.netty :as netty]
[clojure.string :as str]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
[manifold.stream :as s])
(:import
;; Do not remove
(aleph.http.core
NettyRequest)
(io.netty.channel
Channel
ChannelHandler
ChannelPipeline)
(io.netty.handler.codec.http
DefaultHttpHeaders
HttpContentCompressor
HttpRequest)
(io.netty.handler.codec.http.websocketx
WebSocketServerHandshakerFactory
WebSocketServerHandshaker
PingWebSocketFrame
PongWebSocketFrame
TextWebSocketFrame
BinaryWebSocketFrame
CloseWebSocketFrame
WebSocketFrameAggregator)
(io.netty.handler.codec.http.websocketx.extensions.compression
WebSocketServerCompressionHandler)
(io.netty.handler.stream
ChunkedWriteHandler)
(io.netty.handler.timeout
IdleState
IdleStateEvent)
(java.io
IOException)
(java.util.concurrent
ConcurrentLinkedQueue)
(java.util.concurrent.atomic
AtomicBoolean)))
(defn websocket-server-handler
([raw-stream? ch handshaker]
(websocket-server-handler raw-stream? ch handshaker nil))
([raw-stream?
^Channel ch
^WebSocketServerHandshaker handshaker
heartbeats]
(let [d (d/deferred)
^ConcurrentLinkedQueue pending-pings (ConcurrentLinkedQueue.)
closing? (AtomicBoolean. false)
coerce-fn (ws.common/websocket-message-coerce-fn
ch
pending-pings
(fn [^CloseWebSocketFrame frame]
(if-not (.compareAndSet closing? false true)
false
(do
(.close handshaker ch frame)
true))))
description (fn [] {:websocket-selected-subprotocol (.selectedSubprotocol handshaker)})
out (netty/sink ch false coerce-fn description)
in (netty/buffered-source ch (constantly 1) 16)]
(s/on-closed out #(ws.common/resolve-pings! pending-pings false))
(s/on-drained
in
;; there's a chance that the connection was closed by the client,
;; in that case *out* would be closed earlier and the underlying
;; netty channel is already terminated
#(when (and (.isOpen ch)
(.compareAndSet closing? false true))
(.close handshaker ch (CloseWebSocketFrame.))))
(let [s (doto
(s/splice out in)
(reset-meta! {:aleph/channel ch}))]
[s
(netty/channel-inbound-handler
:exception-caught
([_ ctx ex]
(when-not (instance? IOException ex)
(log/warn ex "error in websocket handler"))
(s/close! out)
(netty/close ctx))
:channel-inactive
([_ ctx]
(s/close! out)
(s/close! in)
(.fireChannelInactive ctx))
:user-event-triggered
([_ ctx evt]
(if (and (instance? IdleStateEvent evt)
(= IdleState/ALL_IDLE (.state ^IdleStateEvent evt)))
(ws.common/handle-heartbeat ctx s heartbeats)
(.fireUserEventTriggered ctx evt)))
:channel-read
([_ ctx msg]
(let [ch (.channel ctx)]
(cond
(instance? TextWebSocketFrame msg)
(if raw-stream?
(let [body (.content ^TextWebSocketFrame msg)]
;; pass ByteBuf body directly to next level (it's
;; their reponsibility to release)
(netty/put! ch in body))
(let [text (.text ^TextWebSocketFrame msg)]
;; working with text now, so we do not need
;; ByteBuf inside TextWebSocketFrame
;; note, that all *WebSocketFrame classes are
;; subclasses of DefaultByteBufHolder, meaning
;; there's no difference between releasing
;; frame & frame's content
(netty/release msg)
(netty/put! ch in text)))
(instance? BinaryWebSocketFrame msg)
(let [body (.content ^BinaryWebSocketFrame msg)]
(netty/put! ch in
(if raw-stream?
body
;; copied data into byte array, deallocating ByteBuf
(netty/release-buf->array body))))
(instance? PingWebSocketFrame msg)
(let [body (.content ^PingWebSocketFrame msg)]
;; reusing the same buffer
;; will be deallocated by Netty
(netty/write-and-flush ch (PongWebSocketFrame. body)))
(instance? PongWebSocketFrame msg)
(do
(netty/release msg)
(ws.common/resolve-pings! pending-pings true))
(instance? CloseWebSocketFrame msg)
(if-not (.compareAndSet closing? false true)
;; closing already, nothing else could be done
(netty/release msg)
;; reusing the same buffer
;; will be deallocated by Netty
(.close handshaker ch ^CloseWebSocketFrame msg))
:else
;; no need to release buffer when passing to a next handler
(.fireChannelRead ctx msg)))))]))))
;; note, as we set `keep-alive?` to `false`, `send-message` will close the connection
;; after writes are done, which is exactly what we expect to happen
(defn send-websocket-request-expected!
[ch ssl?]
(http/send-message
ch
false
ssl?
(http/ring-response->netty-response
{:status 400
:headers {"content-type" "text/plain"}})
"expected websocket request"))
(defn websocket-upgrade-request?
"Returns `true` if given request is an attempt to upgrade to websockets"
[^NettyRequest req]
(let [headers (:headers req)
conn (get headers :connection)
upgrade (get headers :upgrade)]
(and (contains? (when (some? conn)
(set (map str/trim
(-> (str/lower-case conn) (str/split #",")))))
"upgrade")
(= "websocket" (when (some? upgrade) (str/lower-case upgrade))))))
(defn initialize-websocket-handler
[^NettyRequest req
{:keys [raw-stream?
headers
max-frame-payload
max-frame-size
allow-extensions?
compression?
pipeline-transform
heartbeats]
:or {raw-stream? false
max-frame-payload 65536
max-frame-size 1048576
allow-extensions? false
compression? false}
:as options}]
(when (and (true? (:compression? options))
(false? (:allow-extensions? options)))
(throw (IllegalArgumentException.
"Per-message deflate requires extensions to be allowed")))
(-> req ^AtomicBoolean (.websocket?) (.set true))
(let [^Channel ch (.ch req)
ssl? (identical? :https (:scheme req))
url (str
(if ssl? "wss://" "ws://")
(get-in req [:headers "host"])
(:uri req))
req (http/ring-request->full-netty-request req)
factory (WebSocketServerHandshakerFactory.
url
nil
(or allow-extensions? compression?)
max-frame-payload)]
(try
(if-let [handshaker (.newHandshaker factory req)]
(try
(let [[s ^ChannelHandler handler] (websocket-server-handler raw-stream?
ch
handshaker
heartbeats)
p (.newPromise ch)
h (doto (DefaultHttpHeaders.) (http/map->headers! headers))
^ChannelPipeline pipeline (.pipeline ch)]
;; actually, we're not going to except anything but websocket, so...
(doto pipeline
(.remove "request-handler")
(.remove "continue-handler")
(netty/remove-if-present HttpContentCompressor)
(netty/remove-if-present ChunkedWriteHandler)
(.addLast "websocket-frame-aggregator" (WebSocketFrameAggregator. max-frame-size))
(ws.common/attach-heartbeats-handler heartbeats)
(.addLast "websocket-handler" handler))
(when compression?
;; Hack:
;; WebSocketServerCompressionHandler is stateful and requires
;; HTTP request to be send through the pipeline
;; See more:
;; * https://github.com/clj-commons/aleph/issues/494
;; * https://github.com/netty/netty/pull/8973
(let [compression-handler (WebSocketServerCompressionHandler.)
ctx (.context pipeline "websocket-frame-aggregator")]
(.addAfter pipeline
"websocket-frame-aggregator"
"websocket-deflater"
compression-handler)
(.fireChannelRead ctx req)))
(-> (try
(netty/wrap-future (.handshake handshaker ch ^HttpRequest req h p))
(catch Throwable e
(d/error-deferred e)))
(d/chain'
(fn [_]
(when (some? pipeline-transform)
(pipeline-transform (.pipeline ch)))
s))
(d/catch'
(fn [e]
(send-websocket-request-expected! ch ssl?)
(d/error-deferred e)))))
(catch Throwable e
(d/error-deferred e)))
(do
(WebSocketServerHandshakerFactory/sendUnsupportedVersionResponse ch)
(d/error-deferred (IllegalStateException. "unsupported version"))))
(finally
;; I find this approach to handle request release somewhat
;; fragile... We have to release the object both in case of
;; handshake initialization and "unsupported version" response
(netty/release req)))))