-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.cljc
496 lines (465 loc) · 20.2 KB
/
client.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
(ns deercreeklabs.capsule.client
(:require
[clojure.core.async :as ca]
[deercreeklabs.async-utils :as au]
[deercreeklabs.baracus :as ba]
[deercreeklabs.capsule.utils :as u]
[deercreeklabs.lancaster :as l]
[deercreeklabs.log-utils :as lu :refer [debugs]]
[deercreeklabs.tube.client :as tc]
[deercreeklabs.tube.connection :as connection]
[schema.core :as s]
[taoensso.timbre :as timbre :refer [debugf errorf infof]]))
(def conn-wait-ms-multiplier 1.5)
(def initial-conn-wait-ms 1000)
(def max-conn-wait-ms 30000)
(def default-client-options
{:default-rpc-timeout-ms 15000
:get-credentials-timeout-ms 15000
:get-url-timeout-ms 15000
:rcv-queue-size 1000
:send-queue-size 1000
:silence-log? false
:on-connect (fn [capsule-client]
(infof "Client connected.")
nil)
:on-disconnect (fn [capsule-client]
(infof "Client disconnected.")
nil)})
(defprotocol ICapsuleClient
(<send-msg
[this msg-name-kw arg]
[this msg-name-kw arg timeout-ms])
(send-msg
[this msg-name-kw arg]
[this msg-name-kw arg timeout-ms]
[this msg-name-kw arg success-cb failure-cb]
[this msg-name-kw arg success-cb failure-cb timeout-ms])
(set-handler [this msg-name-kw handler])
(shutdown [this]))
(defprotocol ICapsuleClientInternals
(send-rpc* [this msg-name-kw arg success-cb failure-cb timeout-ms])
(send-msg* [this msg-name-kw msg timeout-ms])
(<do-login* [this tube-client rcv-chan credentials])
(<do-auth-w-creds* [this tube-client rcv-chan credentials])
(<get-credentials* [this])
(<get-credentials-and-do-auth* [this tube-client rcv-chan])
(<do-schema-negotiation* [this tube-client rcv-chan url])
(<do-auth* [this tube-client rcv-chan])
(<get-url* [this])
(<connect* [this <make-ws-client])
(start-connect-loop* [this <make-ws-client])
(start-gc-loop* [this])
(start-send-loop* [this])
(start-rcv-loop* [this]))
(defrecord CapsuleClient
[get-url get-url-timeout-ms get-credentials get-credentials-timeout-ms
*rcv-chan send-chan reconnect-chan rpc-name->req-name msg-name->rec-name
msgs-union-schema client-fp client-pcf default-rpc-timeout-ms
rcv-queue-size send-queue-size silence-log? on-connect on-disconnect
role peer-role peer-name-maps
*url->server-fp *server-pcf *rpc-id *tube-client *credentials
*shutdown? *rpc-id->rpc-info *msg-rec-name->handler]
ICapsuleClient
(<send-msg [this msg-name-kw arg]
(<send-msg this msg-name-kw arg default-rpc-timeout-ms))
(<send-msg [this msg-name-kw arg timeout-ms]
(let [ch (ca/chan)
cb (fn [arg]
(if (nil? arg)
(ca/close! ch)
(ca/put! ch arg)))]
(send-msg this msg-name-kw arg cb cb timeout-ms)
ch))
(send-msg [this msg-name-kw arg]
(send-msg this msg-name-kw arg default-rpc-timeout-ms))
(send-msg [this msg-name-kw arg timeout-ms]
(send-msg this msg-name-kw arg nil nil timeout-ms))
(send-msg [this msg-name-kw arg success-cb failure-cb]
(send-msg this msg-name-kw arg success-cb failure-cb
default-rpc-timeout-ms))
(send-msg [this msg-name-kw arg success-cb failure-cb timeout-ms]
(when @*shutdown?
(throw (ex-info "Client is shut down" {})))
(cond
(rpc-name->req-name msg-name-kw)
(send-rpc* this msg-name-kw arg success-cb failure-cb timeout-ms)
(msg-name->rec-name msg-name-kw)
(do
(send-msg* this msg-name-kw arg timeout-ms)
(when success-cb
(success-cb nil)))
:else
(throw
(ex-info (str "Role `" role "` is not a sender for msg `"
msg-name-kw "`.")
(u/sym-map role msg-name-kw arg)))))
(set-handler [this msg-name-kw handler]
(u/set-handler msg-name-kw handler peer-name-maps *msg-rec-name->handler
peer-role))
(shutdown [this]
(reset! *shutdown? true)
(when-let [tube-client @*tube-client]
(tc/close tube-client)
(reset! *tube-client nil)))
ICapsuleClientInternals
(send-rpc* [this rpc-name-kw arg success-cb failure-cb timeout-ms]
(let [msg-rec-name (rpc-name->req-name rpc-name-kw)
rpc-id (u/get-rpc-id* *rpc-id)
failure-time-ms (+ (u/get-current-time-ms) timeout-ms)
rpc-info (u/sym-map rpc-name-kw arg rpc-id success-cb
failure-cb timeout-ms failure-time-ms)
msg (u/sym-map rpc-id timeout-ms arg)
msg-info (u/sym-map msg-rec-name msg failure-time-ms failure-cb)]
(if (ca/offer! send-chan msg-info)
(swap! *rpc-id->rpc-info assoc rpc-id rpc-info)
(when failure-cb
(failure-cb (ex-info "RPC cannot be sent. Send queue is full."
{:rpc-info msg-info})))))
nil)
(send-msg* [this msg-name-kw msg timeout-ms]
(let [msg-rec-name (msg-name->rec-name msg-name-kw)
msg {:arg msg}
failure-time-ms (+ (u/get-current-time-ms) timeout-ms)
msg-info (u/sym-map msg-rec-name msg failure-time-ms)]
(ca/offer! send-chan msg-info))
nil)
(<do-login* [this tube-client rcv-chan credentials]
(au/go
(tc/send tube-client
(l/serialize msgs-union-schema
(l/wrap u/login-req-schema credentials)))
(let [data (au/<? rcv-chan)
[msg-name msg] (l/deserialize msgs-union-schema
@*server-pcf data)]
(if-not (= ::u/login-rsp msg-name)
(do
(errorf "Got wrong login rsp msg: %s" msg-name)
false)
(let [{:keys [was-successful]} msg]
(when-not silence-log?
(if was-successful
(infof "Login succeeded.")
(infof "Login failed.")))
was-successful)))))
(<do-auth-w-creds* [this tube-client rcv-chan credentials]
(au/go
(let [timeout-ch (ca/timeout default-rpc-timeout-ms)
login-ch (<do-login* this tube-client rcv-chan credentials)
[success? ch] (au/alts? [timeout-ch login-ch])]
(if (= timeout-ch ch)
(do
(errorf "Authentication timed out.")
false)
(if-not success?
(do
(reset! *credentials nil)
false)
(do
(reset! *credentials credentials)
true))))))
(<get-credentials* [this]
(ca/go
(try
(let [creds-ret (get-credentials)]
(if-not (au/channel? creds-ret)
creds-ret
(let [timeout-ch (ca/timeout get-credentials-timeout-ms)
[credentials ch] (au/alts? [creds-ret timeout-ch])]
(if (= timeout-ch ch)
false
credentials))))
(catch #?(:clj Exception :cljs js/Error) e
(errorf "Error in <get-credentials: %s"
(lu/get-exception-msg-and-stacktrace e))
false))))
(<get-credentials-and-do-auth* [this tube-client rcv-chan]
(au/go
(if-let [credentials (au/<? (<get-credentials* this))]
(do
(when (not (map? credentials))
(throw (ex-info (str "get-credentials did not return a map. Got "
credentials)
(u/sym-map credentials))))
(when (not (:subject-id credentials))
(throw (ex-info (str "get-credentials returned a map without a "
"valid :subject-id. Got: " credentials)
(u/sym-map credentials))))
(when (not (:credential credentials))
(throw (ex-info (str "get-credentials returned a map without a "
"valid :credential. Got: " credentials)
(u/sym-map credentials))))
(if (au/<? (<do-auth-w-creds* this tube-client rcv-chan
credentials))
true
(do
(errorf "Authentication failed.")
false)))
(do
(errorf "<get-credentials* failed.")
false))))
(<do-auth* [this tube-client rcv-chan]
(au/go
(if-not @*credentials
(au/<? (<get-credentials-and-do-auth* this tube-client rcv-chan))
(or (au/<? (<do-auth-w-creds* this tube-client rcv-chan @*credentials))
(au/<? (<get-credentials-and-do-auth* this tube-client
rcv-chan))))))
(<get-url* [this]
(ca/go
(try
(let [url-ret (get-url)]
(if-not (au/channel? url-ret)
url-ret
(let [timeout-ch (ca/timeout get-url-timeout-ms)
[url ch] (au/alts? [url-ret timeout-ch])]
(if (= timeout-ch ch)
false
url))))
(catch #?(:clj Exception :cljs js/Error) e
(errorf "Error in <get-url: %s"
(lu/get-exception-msg-and-stacktrace e))
false))))
(<connect* [this <make-ws-client]
(au/go
(loop [wait-ms initial-conn-wait-ms]
(when-not @*shutdown?
(let [rand-mult (+ 0.5 (rand))
new-wait-ms (-> (* wait-ms conn-wait-ms-multiplier)
(min max-conn-wait-ms)
(* rand-mult)
(Math/floor)
(int))]
(let [url (ca/<! (<get-url* this))]
(if-not url
(do
(ca/<! (ca/timeout wait-ms))
(recur new-wait-ms))
(if-not (string? url)
(do
(errorf "<get-url* did not return a string, returned: %s"
url)
(ca/<! (ca/timeout wait-ms))
(recur new-wait-ms))
(let [_ (when-not silence-log?
(infof (str "Got url: %s. Attempting "
"websocket connection.") url))
rcv-chan (ca/chan rcv-queue-size)
opts {:on-disconnect
(fn [conn code reason]
(on-disconnect this)
(when-not silence-log?
(infof
(str "Connection to %s disconnected: "
"%s (%s)")
(connection/get-uri conn) reason code))
(when-let [tube-client @*tube-client]
(tc/close tube-client)
(reset! *tube-client nil)
(when-not @*shutdown?
(ca/put! reconnect-chan true))))
:on-rcv (fn on-rcv [conn data]
(ca/put! rcv-chan data))}
opts (cond-> opts
<make-ws-client (assoc :<make-ws-client
<make-ws-client))
tube-client (au/<? (tc/<make-tube-client
url wait-ms opts))]
(if-not tube-client
(when-not @*shutdown?
(ca/<! (ca/timeout wait-ms))
(recur new-wait-ms))
(if @*shutdown?
(do
(tc/close tube-client)
false)
(do
(au/<? (<do-schema-negotiation* this tube-client
rcv-chan url))
(if-not (au/<? (<do-auth* this tube-client rcv-chan))
(do
(tc/close tube-client)
(shutdown this)
false)
(do
(reset! *rcv-chan rcv-chan)
(reset! *tube-client tube-client)
true))))))))))))))
(start-connect-loop* [this <make-ws-client]
(ca/go
(try
(when (au/<? (<connect* this <make-ws-client))
(on-connect this))
(while (not @*shutdown?)
(let [[reconnect? ch] (ca/alts! [reconnect-chan
(ca/timeout initial-conn-wait-ms)])]
(when (and (= reconnect-chan ch) reconnect?)
(let [success? (au/<? (<connect* this <make-ws-client))]
(if success?
(on-connect this)
(when-not @*shutdown?
(errorf "Client failed to reconnect. Shutting down.")
(shutdown this)))))))
(catch #?(:clj Exception :cljs js/Error) e
(errorf "Unexpected error in connect loop: %s"
(lu/get-exception-msg-and-stacktrace e))
(shutdown this)))))
(<do-schema-negotiation* [this tube-client rcv-chan url]
(ca/go
(try
(loop [retry? false]
(if @*shutdown?
(tc/close tube-client)
(let [known-server-fp (@*url->server-fp url)
req (cond-> {:client-fp client-fp
:server-fp (or known-server-fp client-fp)}
retry? (assoc :client-pcf client-pcf))
_ (tc/send tube-client (l/serialize
u/handshake-req-schema req))
rsp (l/deserialize u/handshake-rsp-schema
(l/get-parsing-canonical-form
u/handshake-rsp-schema)
(au/<? rcv-chan))
{:keys [match server-fp server-pcf]} rsp]
(case match
:both (do
(swap! *url->server-fp assoc url known-server-fp)
(when-not known-server-fp
(reset! *server-pcf client-pcf))
true)
:client (do
(swap! *url->server-fp assoc url server-fp)
(reset! *server-pcf server-pcf)
true)
:none (do
(when-not (nil? server-fp)
(swap! *url->server-fp assoc url server-fp)
(reset! *server-pcf server-pcf))
(recur true))))))
(catch #?(:clj Exception :cljs js/Error) e
(errorf "Schema negotiation failed: %s"
(lu/get-exception-msg-and-stacktrace e))
false))))
(start-send-loop* [this]
(ca/go
(try
(while (not @*shutdown?)
(let [[msg-info ch] (ca/alts! [send-chan (ca/timeout 100)])]
(when (= send-chan ch)
(let [{:keys [msg-rec-name msg
failure-time-ms failure-cb]} msg-info]
(loop []
(when (not @*shutdown?)
(if-let [tube-client @*tube-client]
(tc/send tube-client (l/serialize msgs-union-schema
[msg-rec-name msg]))
(do
(when failure-time-ms
(if (> (u/get-current-time-ms) failure-time-ms)
(when failure-cb
(failure-cb
(ex-info
"Send timed out waiting for connection."
(u/sym-map msg-info))))
(do
(ca/<! (ca/timeout 100))
(recur))))))))))))
(catch #?(:clj Exception :cljs js/Error) e
(errorf "Unexpected error in send loop: %s"
(lu/get-exception-msg-and-stacktrace e))))))
(start-gc-loop* [this]
(u/start-gc-loop *shutdown? *rpc-id->rpc-info))
(start-rcv-loop* [this]
(ca/go
(while (not @*shutdown?)
(try
(if-let [rcv-chan @*rcv-chan]
(let [[data ch] (au/alts? [rcv-chan (ca/timeout 1000)])]
(when (= rcv-chan ch)
(let [conn-id 0 ;; there is only one connection
sender (fn [msg-rec-name msg]
(when-not (ca/offer! send-chan
(u/sym-map msg-rec-name
msg))
(errorf
"RPC rsp cannot be sent. Queue is full.")))]
(u/handle-rcv :client conn-id sender (name peer-role)
(name peer-role) data
msgs-union-schema @*server-pcf
*msg-rec-name->handler))))
(ca/<! (ca/timeout 100))) ;; Wait for rcv-chan to be set
(catch #?(:clj Exception :cljs js/Error) e
(errorf "Unexpected error in rcv-loop: %s"
(lu/get-exception-msg-and-stacktrace e))
;; Rate limit
(ca/<! (ca/timeout 1000))))))))
(s/defn make-client :- (s/protocol ICapsuleClient)
([get-url :- u/GetURLFn
get-credentials :- u/GetCredentialsFn
protocol :- u/Protocol
role :- u/Role]
(make-client get-url get-credentials protocol role {}))
([get-url :- u/GetURLFn
get-credentials :- u/GetCredentialsFn
protocol :- u/Protocol
role :- u/Role
options :- u/ClientOptions]
(when-not (ifn? get-url)
(throw (ex-info "`get-url` parameter must be a function."
(u/sym-map get-url))))
(when-not (ifn? get-credentials)
(throw (ex-info "`get-credentials` parameter must be a function."
(u/sym-map get-credentials))))
(u/check-protocol protocol)
(when-not (keyword? role)
(throw (ex-info "`role` parameter must be a keyword." (u/sym-map role))))
(when-not (map? options)
(throw (ex-info "`options` parameter must be a map."
(u/sym-map options))))
(let [opts (merge default-client-options options)
{:keys [default-rpc-timeout-ms
get-credentials-timeout-ms
get-url-timeout-ms
rcv-queue-size
send-queue-size
silence-log?
on-connect
on-disconnect
handlers
<make-ws-client]} opts
*rcv-chan (atom nil)
send-chan (ca/chan send-queue-size)
reconnect-chan (ca/chan)
peer-role (u/get-peer-role protocol role)
my-name-maps (u/make-name-maps protocol role)
peer-name-maps (u/make-name-maps protocol peer-role)
{:keys [rpc-name->req-name msg-name->rec-name]} my-name-maps
msgs-union-schema (u/make-msgs-union-schema protocol)
client-fp (l/get-fingerprint64 msgs-union-schema)
client-pcf (l/get-parsing-canonical-form msgs-union-schema)
*url->server-fp (atom {})
*server-pcf (atom {})
*rpc-id (atom 0)
*tube-client (atom nil)
*credentials (atom nil)
*shutdown? (atom false)
*rpc-id->rpc-info (atom {})
*msg-rec-name->handler (atom (u/make-msg-rec-name->handler
my-name-maps peer-name-maps
*rpc-id->rpc-info silence-log?))
client (->CapsuleClient
get-url get-url-timeout-ms get-credentials
get-credentials-timeout-ms *rcv-chan send-chan reconnect-chan
rpc-name->req-name msg-name->rec-name
msgs-union-schema client-fp client-pcf default-rpc-timeout-ms
rcv-queue-size send-queue-size silence-log? on-connect
on-disconnect role peer-role peer-name-maps
*url->server-fp *server-pcf *rpc-id *tube-client *credentials
*shutdown? *rpc-id->rpc-info *msg-rec-name->handler)]
(doseq [[msg-name-kw handler] handlers]
(set-handler client msg-name-kw handler))
(start-connect-loop* client <make-ws-client)
(start-gc-loop* client)
(start-rcv-loop* client)
(start-send-loop* client)
client)))