-
Notifications
You must be signed in to change notification settings - Fork 241
/
client.clj
316 lines (291 loc) · 11.5 KB
/
client.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
(ns aleph.http.websocket.client
(:require
[aleph.http.core :as http]
[aleph.http.websocket.common :as ws.common]
[aleph.netty :as netty]
[clj-commons.byte-streams :as bs]
[clojure.tools.logging :as log]
[manifold.deferred :as d]
[manifold.stream :as s])
(:import
(io.netty.channel
Channel
ChannelHandler
ChannelPipeline)
(io.netty.handler.codec.http
HttpClientCodec
DefaultHttpHeaders
HttpResponse
FullHttpResponse
HttpObjectAggregator)
(io.netty.handler.codec.http.websocketx
CloseWebSocketFrame
PingWebSocketFrame
PongWebSocketFrame
TextWebSocketFrame
BinaryWebSocketFrame
WebSocketClientHandshaker
WebSocketClientHandshakerFactory
WebSocketFrame
WebSocketFrameAggregator
WebSocketVersion)
(io.netty.handler.codec.http.websocketx.extensions.compression
WebSocketClientCompressionHandler)
(io.netty.handler.timeout
IdleState
IdleStateEvent)
(java.net
URI
InetSocketAddress)
(java.util.concurrent
ConcurrentLinkedQueue)
(java.util.concurrent.atomic
AtomicBoolean)))
(defn websocket-frame-size [^WebSocketFrame frame]
(-> frame .content .readableBytes))
(defn websocket-handshaker
^WebSocketClientHandshaker
[uri sub-protocols extensions? headers max-frame-payload]
(WebSocketClientHandshakerFactory/newHandshaker
uri
WebSocketVersion/V13
sub-protocols
extensions?
(doto (DefaultHttpHeaders.) (http/map->headers! headers))
max-frame-payload))
(defn websocket-client-handler
([raw-stream?
uri
sub-protocols
extensions?
headers
max-frame-payload]
(websocket-client-handler raw-stream?
uri
sub-protocols
extensions?
headers
max-frame-payload
nil))
([raw-stream?
uri
sub-protocols
extensions?
headers
max-frame-payload
heartbeats]
(let [d (d/deferred)
in (atom nil)
desc (atom {})
^ConcurrentLinkedQueue pending-pings (ConcurrentLinkedQueue.)
handshaker (websocket-handshaker uri
sub-protocols
extensions?
headers
max-frame-payload)
closing? (AtomicBoolean. false)]
[d
(netty/channel-inbound-handler
:exception-caught
([_ ctx ex]
(when-not (d/error! d ex)
(log/warn ex "error in websocket client"))
(s/close! @in)
(netty/close ctx))
:channel-inactive
([_ ctx]
(when (realized? d)
;; close only on success
(d/chain' d s/close!)
(ws.common/resolve-pings! pending-pings false))
(.fireChannelInactive ctx))
:channel-active
([_ ctx]
(-> (.channel ctx)
netty/maybe-ssl-handshake-future
(d/on-realized (fn [ch]
(reset! in (netty/buffered-source ch (constantly 1) 16))
(.handshake handshaker ch))
netty/ignore-ssl-handshake-errors))
(.fireChannelActive ctx))
:user-event-triggered
([_ ctx evt]
(if (and (instance? IdleStateEvent evt)
(= IdleState/ALL_IDLE (.state ^IdleStateEvent evt)))
(when (d/realized? d)
(ws.common/handle-heartbeat ctx @d heartbeats))
(.fireUserEventTriggered ctx evt)))
:channel-read
([_ ctx msg]
(let [ch ^Channel (.channel ctx)]
(cond
(not (.isHandshakeComplete handshaker))
(try
;; Here we rely on the HttpObjectAggregator being added
;; to the pipeline in advance, so there's no chance we
;; could read only a partial request
(.finishHandshake handshaker ch msg)
(let [close-fn (fn [^CloseWebSocketFrame frame]
(if-not (.compareAndSet closing? false true)
(do
(netty/release frame)
false)
(do
(-> (.close handshaker ch frame)
netty/wrap-future
(d/chain' (fn [_] (netty/close ctx))))
true)))
coerce-fn (ws.common/websocket-message-coerce-fn
ch
pending-pings
close-fn)
headers (http/headers->map (.headers ^HttpResponse msg))
subprotocol (.actualSubprotocol handshaker)
_ (swap! desc assoc
:websocket-handshake-headers headers
:websocket-selected-subprotocol subprotocol)
out (netty/sink ch false coerce-fn (fn [] @desc))]
(s/on-closed out #(ws.common/resolve-pings! pending-pings false))
(d/success! d
(doto
(s/splice out @in)
(reset-meta! {:aleph/channel ch})))
(s/on-drained @in #(close-fn (CloseWebSocketFrame.))))
(catch Throwable ex
;; handle handshake exception
(d/error! d ex)
(s/close! @in)
(netty/close ctx))
(finally
(netty/release msg)))
(instance? FullHttpResponse msg)
(let [rsp ^FullHttpResponse msg
content (bs/to-string (.content rsp))]
(netty/release msg)
(throw
(IllegalStateException.
(str "unexpected HTTP response, status: "
(.status rsp)
", body: '"
content
"'"))))
(instance? TextWebSocketFrame msg)
(if raw-stream?
(let [body (.content ^TextWebSocketFrame msg)]
;; pass ByteBuf body directly to lower next
;; level. it's their reponsibility to release
(netty/put! ch @in body))
(let [text (.text ^TextWebSocketFrame msg)]
(netty/release msg)
(netty/put! ch @in text)))
(instance? BinaryWebSocketFrame msg)
(let [frame (.content ^BinaryWebSocketFrame msg)]
(netty/put! ch @in
(if raw-stream?
frame
(netty/release-buf->array frame))))
(instance? PongWebSocketFrame msg)
(do
(netty/release msg)
(ws.common/resolve-pings! pending-pings true))
(instance? PingWebSocketFrame msg)
(let [frame (.content ^PingWebSocketFrame msg)]
(netty/write-and-flush ch (PongWebSocketFrame. frame)))
;; todo(kachayev): check RFC what should we do in case
;; we've got > 1 closing frame from the
;; server
(instance? CloseWebSocketFrame msg)
(let [frame ^CloseWebSocketFrame msg]
(when (realized? d)
(swap! desc assoc
:websocket-close-code (.statusCode frame)
:websocket-close-msg (.reasonText frame)))
(netty/release msg)
(netty/close ctx))
:else
(.fireChannelRead ctx msg)))))])))
(defn websocket-connection
[uri
{:keys [raw-stream?
insecure?
ssl-context
ssl-endpoint-id-alg
headers
local-address
bootstrap-transform
pipeline-transform
epoll?
transport
sub-protocols
extensions?
max-frame-payload
max-frame-size
compression?
heartbeats]
:or {bootstrap-transform identity
pipeline-transform identity
raw-stream? false
epoll? false
ssl-endpoint-id-alg netty/default-ssl-endpoint-id-alg
sub-protocols nil
extensions? false
max-frame-payload 65536
max-frame-size 1048576
compression? false}
:as options}]
(when (and (true? (:compression? options))
(false? (:extensions? options)))
(throw (IllegalArgumentException.
"Per-message deflate requires extensions to be allowed")))
(let [uri (URI. uri)
scheme (.getScheme uri)
_ (assert (#{"ws" "wss"} scheme) "scheme must be one of 'ws' or 'wss'")
ssl? (= "wss" scheme)
heartbeats (when (some? heartbeats)
(merge
{:send-after-idle 3e4
:payload nil
:timeout nil}
heartbeats))
[s handler] (websocket-client-handler
raw-stream?
uri
sub-protocols
(or extensions? compression?)
headers
max-frame-payload
heartbeats)
remote-address (InetSocketAddress.
(.getHost uri)
(int
(if (neg? (.getPort uri))
(if ssl? 443 80)
(.getPort uri))))
ssl-context (when ssl?
(if ssl-context
(netty/coerce-ssl-client-context ssl-context)
(if insecure?
(netty/insecure-ssl-client-context)
(netty/ssl-client-context))))
pipeline-builder (fn [^ChannelPipeline p]
(when ssl?
(.addLast p
"ssl-handler"
(netty/ssl-handler (.channel p) ssl-context remote-address ssl-endpoint-id-alg)))
(.addLast p "http-client" (HttpClientCodec.))
(.addLast p "aggregator" (HttpObjectAggregator. 16384))
(.addLast p "websocket-frame-aggregator" (WebSocketFrameAggregator. max-frame-size))
(when compression?
(.addLast p
"websocket-deflater"
WebSocketClientCompressionHandler/INSTANCE))
(ws.common/attach-heartbeats-handler p heartbeats)
(.addLast p "handler" ^ChannelHandler handler)
(pipeline-transform p))]
(d/chain' (netty/create-client-chan
{:pipeline-builder pipeline-builder
:bootstrap-transform bootstrap-transform
:remote-address remote-address
:local-address local-address
:transport (netty/determine-transport transport epoll?)})
(fn [_] s))))