-
Notifications
You must be signed in to change notification settings - Fork 1
/
client.cljc
254 lines (239 loc) · 10.5 KB
/
client.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
(ns deercreeklabs.tube.client
(:refer-clojure :exclude [send])
(:require
[clojure.core.async :as ca]
[deercreeklabs.async-utils :as au]
[deercreeklabs.baracus :as ba]
[deercreeklabs.log-utils :as lu :refer [debugs]]
[deercreeklabs.tube.connection :as connection]
[deercreeklabs.tube.utils :as u]
#?(:clj [gniazdo.core :as ws])
#?(:cljs [goog.object])
[schema.core :as s]
[taoensso.timbre :as timbre
#?(:clj :refer :cljs :refer-macros) [debugf errorf infof]])
#?(:clj
(:import
(java.net ConnectException URI))))
#?(:cljs
(set! *warn-on-infer* true))
#?(:clj
(primitive-math/use-primitive-operators))
(def default-keepalive-secs 25)
(defprotocol ITubeClient
(send [this data] "Send binary bytes over this tube")
(close [this] [this code reason] "Close this tube"))
(deftype TubeClient [conn *shutdown?]
ITubeClient
(send [this data]
(connection/send conn data))
(close [this]
(reset! *shutdown? true)
(connection/close conn)))
#?(:clj
(defn <make-ws-client-clj
[url connected-ch on-error *handle-rcv *close-client log-conn-failure?]
(ca/go
(try
(let [fragment-size 31999
on-bin (fn [bs offset length]
(let [data (ba/slice-byte-array
bs offset (+ (int offset) (int length)))]
(@*handle-rcv data)))
socket (ws/connect
url
:on-close (fn [code reason]
(@*close-client code reason true))
:on-error on-error
:on-binary on-bin
:on-connect (fn [session]
(ca/put! connected-ch true)))
closer #(ws/close socket)
sender (fn [data]
(try
;; Send-msg mutates binary data, so we make a copy
(ws/send-msg socket (ba/slice-byte-array data))
(catch Exception e
(on-error
(lu/get-exception-msg-and-stacktrace e)))))]
(u/sym-map sender closer fragment-size))
(catch Exception e
(when log-conn-failure?
(debugf "Websocket failed to connect. Error: %s" e))
(ca/put! connected-ch false)
nil)))))
#?(:cljs
(defn <make-ws-client-node
[url connected-ch on-error *handle-rcv *close-client log-conn-failure?]
(au/go
(let [fragment-size 32000
WSC (goog.object.get (js/require "websocket") "client")
^js/WebSocketClient client (WSC.)
*conn (atom nil)
msg-handler (fn [msg-obj]
(let [data (-> (goog.object.get msg-obj "binaryData")
(js/Int8Array.))]
(@*handle-rcv data)))
closer #(if @*conn
(.close ^js/WebSocketConnection @*conn)
(.abort client))
sender (fn [data]
(.sendBytes ^js/WebSocketConnection @*conn
(js/Buffer. data)))
conn-handler (fn [^js/WebSocketConnection conn]
(.on conn "close" (fn [code reason]
(@*close-client
code reason true)))
(.on conn "error" on-error)
(.on conn "message" msg-handler)
(reset! *conn conn)
(ca/put! connected-ch true))]
(.on client "connectFailed"
(fn [err]
(when log-conn-failure?
(debugf "Websocket failed to connect. Error: %s"
err))
(ca/put! connected-ch false)))
(.on client "connect" conn-handler)
(.connect ^js/WebSocketClient client url)
(u/sym-map sender closer fragment-size)))))
#?(:cljs
(defn <make-ws-client-browser
[url connected-ch on-error *handle-rcv *close-client log-conn-failure?]
(au/go
(let [fragment-size 32000
client (js/WebSocket. url)
*connected? (atom false)
msg-handler (fn [msg-obj]
(let [data (js/Int8Array. (.-data msg-obj))]
(@*handle-rcv data)))
closer #(.close client)
sender (fn [data]
(.send client (.-buffer data)))]
(set! (.-binaryType client) "arraybuffer")
(set! (.-onopen client) (fn [event]
(reset! *connected? true)
(ca/put! connected-ch true)))
(set! (.-onclose client) (fn [event]
(@*close-client (.-code event)
(.-reason event) true)))
(set! (.-onerror client)
(fn [err]
(if @*connected?
(on-error err)
(do
(when log-conn-failure?
(debugf "Websocket failed to connect. Error: %s"
err))
(ca/put! connected-ch false)))))
(set! (.-onmessage client) msg-handler)
(u/sym-map sender closer fragment-size)))))
(defn start-keep-alive-loop [conn keep-alive-secs *shutdown?]
(au/go
(while (not @*shutdown?)
(ca/<! (ca/timeout (* 1000 (int keep-alive-secs))))
;; check again in case shutdown happened while we were waiting
(when-not @*shutdown?
(connection/send-ping conn)))))
(defn <connect [wsc url options *handle-rcv *close-client connected-ch]
(au/go
(let [{:keys [sender closer fragment-size]} wsc
{:keys [compression-type keep-alive-secs on-disconnect on-rcv
log-conn-failure? connect-timeout-ms]
:or {compression-type :smart
keep-alive-secs default-keepalive-secs
on-disconnect (constantly nil)
on-rcv (constantly nil)
log-conn-failure? true
connect-timeout-ms 5000}} options
*shutdown? (atom false)
ready-ch (ca/chan)
on-connect (fn [conn]
(ca/put! ready-ch true))
conn-id 0 ;; There is only one
conn (connection/make-connection
conn-id url url on-connect sender closer nil compression-type
true on-rcv)
close-client (fn [code reason ws-already-closed?]
(reset! *shutdown? true)
(connection/close conn code reason ws-already-closed?)
(on-disconnect conn code reason))
_ (reset! *handle-rcv #(connection/handle-data conn %))
_ (reset! *close-client close-client)
[connected? ch] (ca/alts! [connected-ch
(ca/timeout connect-timeout-ms)])]
(if-not (and (= connected-ch ch)
connected?)
(do
(when log-conn-failure?
(errorf "Websocket to %s failed to connect." url))
(connection/close conn 1000 "Failure to connect" false)
nil)
(do
(sender (ba/encode-int fragment-size))
(let [expiry-ms (+ (#?(:clj long :cljs identity) ;; ensure primitive
(u/get-current-time-ms))
(#?(:clj long :cljs identity) connect-timeout-ms))]
(loop []
(when-not @*shutdown?
(let [[ready? ch] (ca/alts! [ready-ch (ca/timeout 100)])]
(cond
(= ready-ch ch)
(when-not @*shutdown?
(start-keep-alive-loop conn keep-alive-secs *shutdown?)
(->TubeClient conn *shutdown?))
(> (#?(:clj long :cljs identity) (u/get-current-time-ms))
(#?(:clj long :cljs identity) expiry-ms))
(do
(errorf
(str "Websocket to %s connected, but did not complete "
"negotiation before timeout (%s ms)")
url connect-timeout-ms)
(connection/close conn 1002
"Protocol negotiation timed out" false)
nil)
:else
;; Wait for the protocol negotiation to happen
(recur)))))))))))
(defn <make-ws-client* [& args]
(let [factory #?(:clj <make-ws-client-clj
:cljs (case (u/get-platform-kw)
:node <make-ws-client-node
:browser <make-ws-client-browser))]
(apply factory args)))
(s/defn <make-tube-client
([url :- s/Str
connect-timeout-ms :- s/Int]
(<make-tube-client url connect-timeout-ms {}))
([url :- s/Str
connect-timeout-ms :- s/Int
options :- {(s/optional-key :compression-type) (s/enum :none :smart
:deflate)
(s/optional-key :keep-alive-secs) s/Int
(s/optional-key :on-disconnect) (s/=> s/Any)
(s/optional-key :on-rcv) (s/=> s/Any)
(s/optional-key :log-conn-failure?) s/Bool
(s/optional-key :connect-timeout-ms) s/Int
(s/optional-key :<make-ws-client) (s/=> s/Any)}]
"Will return a connected client or a closed channel (nil) on connection
failure or timeout."
(au/go
(let [*handle-rcv (atom nil)
*close-client (atom nil)
connected-ch (ca/chan)
on-error (fn [msg]
(try
(errorf "Error in websocket: %s" msg)
(when-let [close-client @*close-client]
(close-client 1011 msg true))
(catch #?(:clj Exception :cljs js/Error) e
(errorf "Unexpected error in on-error.")
(lu/log-exception e))))
<make-ws-client (or (:<make-ws-client options)
<make-ws-client*)
wsc (au/<? (<make-ws-client
url connected-ch on-error *handle-rcv
*close-client (:log-conn-failure? options)))]
(when wsc
(au/<? (<connect wsc url options *handle-rcv *close-client
connected-ch)))))))