-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
server.clj
341 lines (314 loc) · 12.4 KB
/
server.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
; Copyright (c) Rich Hickey. All rights reserved.
; The use and distribution terms for this software are covered by the
; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
; which can be found in the file epl-v10.html at the root of this distribution.
; By using this software in any fashion, you are agreeing to be bound by
; the terms of this license.
; You must not remove this notice, or any other, from this software.
(ns ^{:doc "Socket server support"
:author "Alex Miller"}
clojure.core.server
(:require [clojure.string :as str]
[clojure.edn :as edn]
[clojure.main :as m])
(:import
[clojure.lang LineNumberingPushbackReader]
[java.net InetAddress Socket ServerSocket SocketException]
[java.io Reader Writer PrintWriter BufferedWriter BufferedReader InputStreamReader OutputStreamWriter]
[java.util Properties]
[java.util.concurrent.locks ReentrantLock]))
(set! *warn-on-reflection* true)
(def ^:dynamic *session* nil)
;; lock protects servers
(defonce ^:private lock (ReentrantLock.))
(defonce ^:private servers {})
(defmacro ^:private with-lock
[lock-expr & body]
`(let [lockee# ~(with-meta lock-expr {:tag 'java.util.concurrent.locks.ReentrantLock})]
(.lock lockee#)
(try
~@body
(finally
(.unlock lockee#)))))
(defmacro ^:private thread
[^String name daemon & body]
`(doto (Thread. (fn [] ~@body) ~name)
(.setDaemon ~daemon)
(.start)))
(defn- required
"Throw if opts does not contain prop."
[opts prop]
(when (nil? (get opts prop))
(throw (ex-info (str "Missing required socket server property " prop) opts))))
(defn- validate-opts
"Validate server config options"
[{:keys [name port accept] :as opts}]
(doseq [prop [:name :port :accept]] (required opts prop))
(when (or (not (integer? port)) (not (<= 0 port 65535)))
(throw (ex-info (str "Invalid socket server port: " port) opts))))
(defn- accept-connection
"Start accept function, to be invoked on a client thread, given:
conn - client socket
name - server name
client-id - client identifier
in - in stream
out - out stream
err - err stream
accept - accept fn symbol to invoke
args - to pass to accept-fn"
[^Socket conn name client-id in out err accept args]
(try
(binding [*in* in
*out* out
*err* err
*session* {:server name :client client-id}]
(with-lock lock
(alter-var-root #'servers assoc-in [name :sessions client-id] {}))
(require (symbol (namespace accept)))
(let [accept-fn (resolve accept)]
(apply accept-fn args)))
(catch SocketException _disconnect)
(finally
(with-lock lock
(alter-var-root #'servers update-in [name :sessions] dissoc client-id))
(.close conn))))
(defn start-server
"Start a socket server given the specified opts:
:address Host or address, string, defaults to loopback address
:port Port, integer, required
:name Name, required
:accept Namespaced symbol of the accept function to invoke, required
:args Vector of args to pass to accept function
:bind-err Bind *err* to socket out stream?, defaults to true
:server-daemon Is server thread a daemon?, defaults to true
:client-daemon Are client threads daemons?, defaults to true
Returns server socket."
[opts]
(validate-opts opts)
(let [{:keys [address port name accept args bind-err server-daemon client-daemon]
:or {bind-err true
server-daemon true
client-daemon true}} opts
address (InetAddress/getByName address) ;; nil returns loopback
socket (ServerSocket. port 0 address)]
(with-lock lock
(alter-var-root #'servers assoc name {:name name, :socket socket, :sessions {}}))
(thread
(str "Clojure Server " name) server-daemon
(try
(loop [client-counter 1]
(when (not (.isClosed socket))
(try
(let [conn (.accept socket)
in (LineNumberingPushbackReader. (InputStreamReader. (.getInputStream conn)))
out (BufferedWriter. (OutputStreamWriter. (.getOutputStream conn)))
client-id (str client-counter)]
(thread
(str "Clojure Connection " name " " client-id) client-daemon
(accept-connection conn name client-id in out (if bind-err out *err*) accept args)))
(catch SocketException _disconnect))
(recur (inc client-counter))))
(finally
(with-lock lock
(alter-var-root #'servers dissoc name)))))
socket))
(defn stop-server
"Stop server with name or use the server-name from *session* if none supplied.
Returns true if server stopped successfully, nil if not found, or throws if
there is an error closing the socket."
([]
(stop-server (:server *session*)))
([name]
(with-lock lock
(let [server-socket ^ServerSocket (get-in servers [name :socket])]
(when server-socket
(alter-var-root #'servers dissoc name)
(.close server-socket)
true)))))
(defn stop-servers
"Stop all servers ignores all errors, and returns nil."
[]
(with-lock lock
(doseq [name (keys servers)]
(future (stop-server name)))))
(defn- parse-props
"Parse clojure.server.* from properties to produce a map of server configs."
[^Properties props]
(reduce
(fn [acc ^String k]
(let [[k1 k2 k3] (str/split k #"\.")]
(if (and (= k1 "clojure") (= k2 "server"))
(let [v (get props k)]
(conj acc (merge {:name k3} (edn/read-string v))))
acc)))
[]
(.stringPropertyNames props)))
(defn start-servers
"Start all servers specified in the system properties."
[system-props]
(doseq [server (parse-props system-props)]
(start-server server)))
(defn repl-init
"Initialize repl in user namespace and make standard repl requires."
[]
(in-ns 'user)
(apply require clojure.main/repl-requires))
(defn repl-read
"Enhanced :read hook for repl supporting :repl/quit."
[request-prompt request-exit]
(or ({:line-start request-prompt :stream-end request-exit}
(m/skip-whitespace *in*))
(let [input (read {:read-cond :allow} *in*)]
(m/skip-if-eol *in*)
(case input
:repl/quit request-exit
input))))
(defn repl
"REPL with predefined hooks for attachable socket server."
[]
(m/repl
:init repl-init
:read repl-read))
(defn- ex->data
[ex phase]
(assoc (Throwable->map ex) :phase phase))
(defn prepl
"a REPL with structured output (for programs)
reads forms to eval from in-reader (a LineNumberingPushbackReader)
Closing the input or passing the form :repl/quit will cause it to return
Calls out-fn with data, one of:
{:tag :ret
:val val ;;eval result, or Throwable->map data if exception thrown
:ns ns-name-string
:ms long ;;eval time in milliseconds
:form string ;;iff successfully read
:exception true ;;iff exception thrown
}
{:tag :out
:val string} ;chars from during-eval *out*
{:tag :err
:val string} ;chars from during-eval *err*
{:tag :tap
:val val} ;values from tap>
You might get more than one :out or :err per eval, but exactly one :ret
tap output can happen at any time (i.e. between evals)
If during eval an attempt is made to read *in* it will read from in-reader unless :stdin is supplied
Alpha, subject to change."
{:added "1.10"}
[in-reader out-fn & {:keys [stdin]}]
(let [EOF (Object.)
tapfn #(out-fn {:tag :tap :val %1})]
(m/with-bindings
(in-ns 'user)
(binding [*in* (or stdin in-reader)
*out* (PrintWriter-on #(out-fn {:tag :out :val %1}) nil)
*err* (PrintWriter-on #(out-fn {:tag :err :val %1}) nil)]
(try
(add-tap tapfn)
(loop []
(when (try
(let [[form s] (read+string {:eof EOF :read-cond :allow} in-reader)]
(try
(when-not (identical? form EOF)
(let [start (System/nanoTime)
ret (eval form)
ms (quot (- (System/nanoTime) start) 1000000)]
(when-not (= :repl/quit ret)
(set! *3 *2)
(set! *2 *1)
(set! *1 ret)
(out-fn {:tag :ret
:val (if (instance? Throwable ret)
(Throwable->map ret)
ret)
:ns (str (.name *ns*))
:ms ms
:form s})
true)))
(catch Throwable ex
(set! *e ex)
(out-fn {:tag :ret :val (ex->data ex (or (-> ex ex-data :clojure.error/phase) :execution))
:ns (str (.name *ns*)) :form s
:exception true})
true)))
(catch Throwable ex
(set! *e ex)
(out-fn {:tag :ret :val (ex->data ex :read-source)
:ns (str (.name *ns*))
:exception true})
true))
(recur)))
(finally
(remove-tap tapfn)))))))
(defn- resolve-fn [valf]
(if (symbol? valf)
(or (resolve valf)
(when-let [nsname (namespace valf)]
(require (symbol nsname))
(resolve valf))
(throw (Exception. (str "can't resolve: " valf))))
valf))
(defn io-prepl
"prepl bound to *in* and *out*, suitable for use with e.g. server/repl (socket-repl).
:ret and :tap vals will be processed by valf, a fn of one argument
or a symbol naming same (default pr-str)
Alpha, subject to change."
{:added "1.10"}
[& {:keys [valf] :or {valf pr-str}}]
(let [valf (resolve-fn valf)
out *out*
lock (Object.)]
(prepl *in*
(fn [m]
(binding [*out* out, *flush-on-newline* true, *print-readably* true]
(locking lock
(prn (if (#{:ret :tap} (:tag m))
(try
(assoc m :val (valf (:val m)))
(catch Throwable ex
(assoc m :val (valf (ex->data ex :print-eval-result))
:exception true)))
m))))))))
(defn remote-prepl
"Implements a prepl on in-reader and out-fn by forwarding to a
remote [io-]prepl over a socket. Messages will be read by readf, a
fn of a LineNumberingPushbackReader and EOF value or a symbol naming
same (default #(read %1 false %2)),
:ret and :tap vals will be processed by valf, a fn of one argument
or a symbol naming same (default read-string). If that function
throws, :val will be unprocessed.
Alpha, subject to change."
{:added "1.10"}
[^String host port ^Reader
in-reader out-fn & {:keys [valf readf] :or {valf read-string, readf #(read %1 false %2)}}]
(let [valf (resolve-fn valf)
readf (resolve-fn readf)
^long port (if (string? port) (Integer/valueOf ^String port) port)
socket (Socket. host port)
rd (-> socket .getInputStream InputStreamReader. BufferedReader. LineNumberingPushbackReader.)
wr (-> socket .getOutputStream OutputStreamWriter.)
EOF (Object.)]
(thread "clojure.core.server/remote-prepl" true
(try (loop []
(let [{:keys [tag val] :as m} (readf rd EOF)]
(when-not (identical? m EOF)
(out-fn
(if (#{:ret :tap} tag)
(try
(assoc m :val (valf val))
(catch Throwable ex
(assoc m :val (ex->data ex :read-eval-result)
:exception true)))
m))
(recur))))
(finally
(.close wr))))
(let [buf (char-array 1024)]
(try (loop []
(let [n (.read in-reader buf)]
(when-not (= n -1)
(.write wr buf 0 n)
(.flush wr)
(recur))))
(finally
(.close rd))))))