-
-
Notifications
You must be signed in to change notification settings - Fork 137
/
websockets.cljs
100 lines (95 loc) · 5.53 KB
/
websockets.cljs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
(ns fulcro.websockets
(:require-macros [cljs.core.async.macros :refer (go go-loop)])
(:require [cognitect.transit :as ct]
[cljs.core.async :as async]
[taoensso.sente :as sente :refer (cb-success?)]
[fulcro.client.network :refer [FulcroNetwork]]
[fulcro.logging :as log]
[fulcro.websockets.transit-packer :as tp]))
(defn- make-event-handler
"Probably need to make it possible for extension from outside."
[push-handler]
(fn [{:keys [id ?data] :as event}]
(case id
:api/server-push (when push-handler (push-handler ?data))
nil)))
(defrecord Websockets [queue ready? channel-socket push-handler websockets-uri host state-callback global-error-callback transit-handlers req-params stop app auto-retry?]
FulcroNetwork
(send [this edn ok err] (async/go (async/>! queue {:this this :edn edn :ok ok :err err})))
(start [this]
(let [{:keys [ch-recv state] :as cs} (sente/make-channel-socket! websockets-uri ; path on server
{:packer (tp/make-packer transit-handlers)
:host host
:type :ws ; e/o #{:auto :ajax :ws}
:backoff-ms-fn (fn [attempt] (min (* attempt 1000) 4000))
:params req-params
:wrap-recv-evs? false})
message-received (make-event-handler push-handler)]
(add-watch state ::ready (fn [a k o n]
(if auto-retry?
(do (reset! ready? (:open? n))) ; prevent send attempts until open again.
(when (:open? n) ; not auto-retry: so single-shot. offline down should result in app level network error
(reset! ready? true)))))
(cond
(fn? state-callback) (add-watch state ::state-callback (fn [a k o n] (state-callback o n)))
(instance? Atom state-callback) (add-watch state ::state-callback (fn [a k o n] (@state-callback o n))))
(reset! channel-socket cs)
(sente/start-chsk-router! ch-recv message-received)
(async/go-loop []
(if @ready?
(let [{:keys [this edn ok err]} (async/<! queue)
{:keys [send-fn]} @channel-socket]
(send-fn [:fulcro.client/API edn] 30000
(fn process-response [resp]
(if (cb-success? resp)
(let [{:keys [status body]} resp]
(if (= 200 status)
(ok body)
(do
(err body)
(when global-error-callback
(global-error-callback resp)))))
(if auto-retry?
(do
; retry...sente already does connection back-off, so probably don't need back-off here
(js/setTimeout #(fulcro.client.network/send this edn ok err) 1000))
(let [body {:fulcro.server/error :network-disconnect}]
(err body)
(when global-error-callback
(global-error-callback {:status 408 :body body}))))))))
(do
(log/info "Send attempted before channel ready...waiting")
(async/<! (async/timeout 1000))))
(recur))
this)))
(defn make-websocket-networking
"Creates a websocket-based networking component for use as a Fulcro remote.
Params:
- `websockets-uri` - The uri to handle websocket traffic on. (ex. \"/chsk\", which is the default value)
- `push-handler` - A function (fn [{:keys [topic msg]}] ...) that can handle a push message.
The topic is the server push verb, and the message will be the EDN sent.
- `host` - Host option to send to sente
- `req-params` - Params for sente socket creation
- `transit-handlers` - A map with optional :read and :write keys that given added sente packer.
- `state-callback` (Optional) - Callback that runs when the websocket state of the websocket changes.
The function takes an old state parameter and a new state parameter (arity 2 function).
`state-callback` can be either a function, or an atom containing a function.
- `global-error-callback` - A function (fn [resp] ...) that is called when returned status code from the server is not 200.
- `auto-retry?` - A boolean (default false). If set to true any network disconnects will lead to infinite retries until
the network returns. All remote mutations should be idempotent.
"
([] (make-websocket-networking {}))
([{:keys [websockets-uri global-error-callback push-handler host req-params state-callback transit-handlers auto-retry?]}]
(map->Websockets {:channel-socket (atom nil)
:queue (async/chan)
:ready? (atom false)
:auto-retry? auto-retry?
:websockets-uri (or websockets-uri "/chsk")
:push-handler push-handler
:host host
:state-callback state-callback
:global-error-callback global-error-callback
:transit-handlers transit-handlers
:app (atom nil)
:stop (atom nil)
:req-params req-params})))