/
async.clj
305 lines (262 loc) · 12 KB
/
async.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
;; Copyright 2014-2017 Red Hat, Inc, and individual contributors.
;;
;; Licensed under the Apache License, Version 2.0 (the "License");
;; you may not use this file except in compliance with the License.
;; You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns immutant.web.async
"Provides a common interface for WebSockets and HTTP streaming."
(:require [immutant.internal.options :as o]
[immutant.internal.util :as u])
(:import [org.projectodd.wunderboss.web.async Channel Channel$OnComplete HttpChannel]
[org.projectodd.wunderboss.web.async.websocket WebsocketChannel]
[java.io File FileInputStream InputStream]
[java.util Arrays Map]
clojure.lang.ISeq))
(defn ^:internal ^:no-doc streaming-body? [body]
(instance? HttpChannel body))
(defn ^:internal ^:no-doc open-stream [^HttpChannel channel response-map
set-status-fn set-headers-fn]
(doto channel
(.attach :response-map response-map)
(.attach :set-status-fn set-status-fn)
(.attach :set-headers-fn set-headers-fn)
(.notifyOpen nil)))
(let [dispatch-fn (fn [request _] (:handler-type request :servlet))]
(defmulti ^:internal ^:no-doc initialize-stream dispatch-fn)
(defmulti ^:internal ^:no-doc initialize-websocket dispatch-fn))
(defprotocol ^:private MessageDispatch
(dispatch-message [from ch options-map]))
(defn ^:private notify
([ch callback]
(notify ch callback nil))
([^Channel ch callback e]
(if callback
;; catch the case where the callback itself throws,
;; and notify the channel callback instead of letting it
;; bubble up, since that may trigger the same callback
;; being called again
(try
(if e
(callback e)
(callback))
(catch Throwable e'
(.notifyError ch e')))
(when e (.notifyError ch e)))
::notified))
(defmacro ^:private catch-and-notify [ch on-error & body]
`(try
~@body
(catch Throwable e#
(notify ~ch ~on-error e#))))
(def ^:dynamic ^:private *dispatched?* nil)
(defn ^:private maybe-dispatch* [ch f]
(if (or *dispatched?*
;; we can't do async sends under WF 8.x due to WFLY-3715
(and (instance? HttpChannel ch)
(not (.asyncSendSupported ^HttpChannel ch))))
(f)
(binding [*dispatched?* true]
(future (f)))))
(defmacro ^:private maybe-dispatch [ch & body]
`(maybe-dispatch* ~ch (fn [] ~@body)))
(defn ^:private finalize-channel-response
[^Channel ch status headers]
(when (and (instance? HttpChannel ch)
(not (.sendStarted ^HttpChannel ch)))
(let [orig-response (.get ch :response-map)]
((.get ch :set-status-fn) (or status (:status orig-response)))
((.get ch :set-headers-fn) (or headers (:headers orig-response))))))
(defn ^:private wboss-send [^Channel ch message options]
(let [{:keys [close? on-success on-error status headers]} options]
(finalize-channel-response ch status headers)
(.send ch message
(boolean close?)
(when (or on-success on-error)
(reify Channel$OnComplete
(handle [_ error]
(if (and error on-error)
(on-error error)
(when on-success (on-success)))))))))
(defn originating-request
"Returns the request map for the request that initiated the channel."
[^Channel ch]
(.get ch :originating-request))
(defn open?
"Is the channel open?"
[^Channel ch]
(.isOpen ch))
(defn close
"Gracefully close the channel.
This will trigger the :on-close callback if one is registered with
[[as-channel]]."
[^Channel ch]
(finalize-channel-response ch nil nil)
(.close ch))
(extend-protocol MessageDispatch
Object
(dispatch-message [message _ _]
(throw (IllegalStateException. (str "Can't send message of type " (class message)))))
nil
(dispatch-message [_ ch options]
(wboss-send ch nil options))
Map
(dispatch-message [message ch options]
(when (not (instance? HttpChannel ch))
(throw (IllegalArgumentException. "Can't send map: channel is not an HTTP stream channel")))
(when (.sendStarted ^HttpChannel ch)
(throw (IllegalArgumentException. "Can't send map: this is not the first send to the channel")))
(dispatch-message (:body message) ch
(merge options (select-keys message [:status :headers]))))
String
(dispatch-message [message ch options]
(wboss-send ch message options))
ISeq
(dispatch-message [message ch {:keys [on-success on-error close?] :as options}]
(maybe-dispatch ch
(let [result (catch-and-notify ch on-error
(loop [item (first message)
items (rest message)]
(let [latch (promise)]
(dispatch-message item ch
(assoc options
:on-success #(deliver latch nil)
:on-error (partial deliver latch)
:close? (and close? (not (seq items)))))
(if-let [err @latch]
(notify ch on-error err)
(when (seq items)
(recur (first items) (rest items)))))))]
(when-not (= ::notified result)
(notify ch on-success)))))
File
(dispatch-message [message ch options]
(dispatch-message (FileInputStream. message) ch options))
InputStream
(dispatch-message [message ch {:keys [on-success on-error close?] :as options}]
(maybe-dispatch ch
(let [buf-size (* 1024 16) ;; 16k is the undertow default if > 128M RAM is available
buffer (byte-array buf-size)
result (catch-and-notify ch on-error
(with-open [message message]
(loop []
(let [read-bytes (.read message buffer)]
(if (pos? read-bytes)
(let [latch (promise)]
(dispatch-message
(if (< read-bytes buf-size)
(Arrays/copyOfRange buffer 0 read-bytes)
buffer)
ch
(assoc options
:on-success #(deliver latch nil)
:on-error (partial deliver latch)
:close? false))
(if-let [err @latch]
(notify ch on-error err)
(recur))))))))]
(when-not (= ::notified result)
(notify ch on-success)))
(when close?
(close ch)))))
;; this has to be in a separate extend-protocol because we need to
;; extend Object first, and type looked up via Class/forName has to be
;; first in extend-protocol (see CLJ-1381)
(extend-protocol MessageDispatch
(Class/forName "[B")
(dispatch-message [message ch options]
(wboss-send ch message options)))
(defn send!
"Send a message to the channel, asynchronously.
`message` can either be a `String`, `File`, `InputStream`, `ISeq`,
`byte[]`, or map. If it is a `String`, it will be encoded to the
character set of the response for HTTP streams, and as UTF-8 for
WebSockets. `File`s and `InputStream`s will be sent as up to 16k
chunks (each chunk being a `byte[]` message for WebSockets). Each
item in an `ISeq` will pass through `send!`, and can be any of the
valid message types.
If `message` is a map, its :body entry must be one of the other
valid message types, and its :status and :headers entries will be
used to override the status or headers returned from the handler
that called `as-channel` for HTTP streams. A map is *only* a valid
message on the first send to an HTTP stream channel - an exception
is thrown if it is passed on a subsequent send or passed to a
WebSocket channel.
The following options are supported [default]:
* :close? - if `true`, the channel will be closed when the send completes.
Setting this to `true` on the first send to an HTTP stream channel
will cause it to behave like a standard HTTP response, and *not* chunk
the response. [false]
* :on-success - `(fn [] ...)` - called when the send attempt has completed
successfully. If this callback throws an exception, it will be
reported to the [[as-channel]] :on-error callback [nil]
* :on-error - `(fn [throwable] ...)` - Called when an error occurs on the send.
If the error requires the channel to be closed, the [[as-channel]] :on-close
callback will also be invoked. If this callback throws an exception, it will be
reported to the [[as-channel]] :on-error callback [`#(throw %)`]
Returns nil if the channel is closed when the send is initiated, true
otherwise. If the channel is already closed, :on-success won't be
invoked."
[^Channel ch message & options]
(dispatch-message message ch
(-> options
u/kwargs-or-map->raw-map
(o/validate-options send!))))
(o/set-valid-options! send! #{:close? :on-success :on-error})
(defn as-channel
"Converts the current ring `request` in to an asynchronous channel.
The type of channel created depends on the request - if the request
is a Websocket upgrade request (the :websocket? key is true), a
Websocket channel will be created. Otherwise, an HTTP stream channel
is created. You interact with both channel types using the other
functions in this namespace, and through callbacks in `options`.
The callbacks common to both channel types are:
* :on-open - `(fn [ch] ...)` - called when the channel is
available for sending. Will only be invoked once.
* :on-error - `(fn [ch throwable] ...)` - Called for any error
that occurs in relation to the channel. If the error
requires the channel to be closed, :on-close will also be invoked.
To handle [[send!]] errors separately, provide it a completion
callback.
* :on-close - `(fn [ch {:keys [code reason]}] ...)` -
called for *any* close, including a call to [[close]], but will
only be invoked once. `ch` will already be closed by the time
this is invoked.
`code` and `reason` will be the numeric closure code and text reason,
respectively, if the channel is a WebSocket
(see <http://tools.ietf.org/html/rfc6455#section-7.4>). Both will be nil
for HTTP streams.
If the channel is a Websocket, the following callback is also used:
* :on-message - `(fn [ch message] ...)` - Called for each message
from the client. `message` will be a `String` or `byte[]`
You can also specify a `:timeout` option, that will cause a
Websocket to be closed if *idle* more than the timeout, or an HTTP
stream to be closed if *open* more than the timeout. This means that
once opened, an HTTP stream will be closed after :timeout elapses,
regardless of activity. It defaults to 0 (no timeout), and is in
milliseconds.
When the ring handler is called during a WebSocket upgrade request,
any changes to the session in the response map are applied, and any
headers from the response map are included in the upgrade response.
Returns a ring response map, at least the :body of which *must* be
returned in the response map from the calling ring handler."
[request & options]
(let [options (-> options
u/kwargs-or-map->map
(o/validate-options as-channel))
ch (if (:websocket? request)
(initialize-websocket request options)
(initialize-stream request options))]
(when-let [timeout (:timeout options)]
(.setTimeout ^Channel ch timeout))
{:status 200
:body ch}))
(o/set-valid-options! as-channel
#{:on-open :on-close :on-message :on-error :timeout})