-
Notifications
You must be signed in to change notification settings - Fork 1
/
server.clj
162 lines (150 loc) · 6.25 KB
/
server.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
(ns deercreeklabs.tube.server
(:gen-class)
(:require
[clojure.core.async :as ca]
[clojure.java.io :as io]
[deercreeklabs.async-utils :as au]
[deercreeklabs.baracus :as ba]
[deercreeklabs.log-utils :as lu :refer [debugs]]
[deercreeklabs.tube.connection :as connection]
[deercreeklabs.tube.utils :as u]
[org.httpkit.server :as http]
[taoensso.timbre :as timbre :refer [debugf errorf infof]])
(:import
(java.nio HeapByteBuffer)
(java.security Security)))
(primitive-math/use-primitive-operators)
(defprotocol ITubeServer
(start [this] "Start serving")
(stop [this] "Stop serving")
(get-conn-count [this] "Return the number of current connections"))
(deftype TubeServer [*conn-count starter *stopper]
ITubeServer
(start [this]
(if @*stopper
(infof "Server is already started.")
(reset! *stopper (starter))))
(stop [this]
(if-let [stopper @*stopper]
(do
(stopper)
(reset! *stopper nil))
(infof "Server is not running.")))
(get-conn-count [this]
@*conn-count))
(defn make-ws-handler
[on-connect on-disconnect compression-type *conn-count *conn-id]
(fn handle-ws [req channel]
(try
(let [{:keys [uri remote-addr]} req
fragment-size 65000 ;; TODO: Figure out this size
conn-id (swap! *conn-id #(inc (int %)))
_ (swap! *conn-count #(inc (int %)))
sender (fn [data]
(http/send! channel data))
closer #(http/close channel)
conn (connection/make-connection
conn-id uri remote-addr on-connect sender closer fragment-size
compression-type false)
on-close (fn [reason]
(swap! *conn-count #(dec (int %)))
(connection/on-disconnect* conn 1000 reason)
(on-disconnect conn 1000 reason))
on-rcv (fn [data]
(connection/handle-data conn data))]
(http/on-receive channel on-rcv)
(http/on-close channel on-close))
(catch Exception e
(errorf "Unexpected exception in handle-ws")
(lu/log-exception e)))))
(defn make-http-handler [handle-http http-timeout-ms]
(fn [req channel]
(au/go
(try
(let [ret (handle-http (update req :body #(if % (slurp %) "")))
rsp (if-not (au/channel? ret)
ret
(let [timeout-ch (ca/timeout (or http-timeout-ms 1000))
[ch-ret ch] (au/alts? [ret timeout-ch])]
(if (= timeout-ch ch)
{:status 504 :body ""}
ch-ret)))]
(http/send! channel (cond
(map? rsp) rsp
(string? rsp) {:status 200 :body rsp}
:else {:status 500 :body "Bad response"})))
(catch Exception e
(let [msg "Unexpected exception in HTTP handler."]
(errorf msg)
(lu/log-exception e)
(http/send! channel {:status 500 :body msg})))))))
(defn handle-http-test [req]
(let [{:keys [body]} req]
(if (pos? (count body))
(clojure.string/upper-case (slurp body))
"")))
;; TODO: Add schema to clarify opts
(defn make-tube-server
([port on-connect on-disconnect compression-type]
(make-tube-server port on-connect on-disconnect compression-type {}))
([port on-connect on-disconnect compression-type opts]
(let [{:keys [handle-http
http-timeout-ms
dns-cache-secs]
:or {dns-cache-secs 60}} opts
_ (Security/setProperty "networkaddress.cache.ttl"
(str dns-cache-secs))
*conn-count (atom 0)
*stopper (atom nil)
*conn-id (atom 0)
ws-handler (make-ws-handler on-connect on-disconnect compression-type
*conn-count *conn-id)
http-handler (if handle-http
(make-http-handler handle-http http-timeout-ms)
(make-http-handler handle-http-test 1000))
handler (fn [req]
(http/with-channel req channel
(if (http/websocket? channel)
(ws-handler req channel)
(http-handler req channel))))
starter (fn []
(reset! *stopper (http/run-server handler (u/sym-map port)))
(infof "Started server on port %s." port))]
(->TubeServer *conn-count starter *stopper))))
(defn run-test-server
([] (run-test-server 8080))
([port]
(u/configure-logging)
(let [handle-http (fn [req]
{:status 200
:headers {"content-type" "text/plain"}
:body "Yo"})
*server (atom nil)
on-connect (fn [conn]
(let [conn-id (connection/get-conn-id conn)
uri (connection/get-uri conn)
remote-addr (connection/get-remote-addr conn)
conn-count (get-conn-count @*server)
on-rcv (fn [conn data]
(connection/send
conn (ba/reverse-byte-array data)))]
(infof "Opened conn %s on %s from %s. Conn count: %s"
conn-id uri remote-addr conn-count)
(connection/set-on-rcv conn on-rcv)))
on-disconnect (fn [conn code reason]
(let [conn-id (connection/get-conn-id conn)
uri (connection/get-uri conn)
remote-addr (connection/get-remote-addr conn)
conn-count (get-conn-count @*server)]
(infof (str "Closed conn %s on %s from %s. "
"Conn count: %s")
conn-id uri remote-addr conn-count)))
compression-type :smart
opts (u/sym-map handle-http)
server (make-tube-server port on-connect on-disconnect
compression-type opts)]
(reset! *server server)
(start server))))
(defn -main
[& args]
(run-test-server))