-
Notifications
You must be signed in to change notification settings - Fork 11
/
core.clj
66 lines (57 loc) · 1.96 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
(ns framework.sse.core
(:require
[clojure.core.async :as async :refer (<! go-loop)]
[clojure.data.json :as json]
[clojure.tools.logging :as log]
[org.httpkit.server :as server]
[xiana.core :as xiana])
(:import
(java.lang
AutoCloseable)))
(def headers {"Content-Type" "text/event-stream"})
(def EOL "\n")
(defn ->message [data]
(str "data: " (json/write-str data) EOL EOL))
(defrecord closable-events-channel
[channel clients]
AutoCloseable
(close [this]
(.close! (:channel this))
(doseq [c @(:clients this)]
(server/close c))))
(defn init [config]
(let [channel (async/chan 5)
clients (atom #{})]
(go-loop []
(when-let [data (<! channel)]
(log/debug "Sending data via SSE: " data)
(doseq [c @clients]
(server/send! c (->message data) false))
(recur)))
(assoc config :events-channel (->closable-events-channel
channel
clients))))
(defn server-event-channel [state]
(let [clients (get-in state [:deps :events-channel :clients])]
(server/as-channel (:request state)
{:init (fn [ch]
(swap! clients conj ch)
(server/send! ch {:headers headers :body (json/write-str {})} false))
:on-receive (fn [ch message])
:on-ping (fn [ch data])
:on-close (fn [ch status] (swap! clients disj ch))
:on-open (fn [ch])})))
(defn stop-heartbeat-loop
[state]
(when-let [channel (get-in state [:deps :events-channel :channel])]
(async/close! channel)))
(defn put!
[state message]
(let [events-channel (get-in state [:deps :events-channel :channel])]
(async/put! events-channel message)))
(defn sse-action
[state]
(xiana/ok
(assoc state
:response
(server-event-channel state))))