forked from http-kit/http-kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.clj
321 lines (288 loc) · 14.5 KB
/
client.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
(ns org.httpkit.client
(:refer-clojure :exclude [get proxy])
(:require [clojure.string :as str]
[org.httpkit.encode :refer [base64-encode]])
(:use [clojure.walk :only [prewalk]])
(:import [org.httpkit.client HttpClient HttpClient$AddressFinder HttpClient$SSLEngineURIConfigurer
IResponseHandler RespListener IFilter RequestConfig]
[org.httpkit.logger ContextLogger EventLogger EventNames]
[org.httpkit HttpMethod PrefixThreadFactory HttpUtils]
[java.util.concurrent ThreadPoolExecutor LinkedBlockingQueue TimeUnit]
[java.net URI URLEncoder]
[org.httpkit.client ClientSslEngineFactory MultipartEntity]
[javax.net.ssl SSLContext SSLEngine]))
;;;; Utils
(defn- utf8-bytes [s] (.getBytes (str s) "utf8"))
(defn url-encode [s] (URLEncoder/encode (str s) "utf8"))
(defn- basic-auth-value [basic-auth]
(let [basic-auth (if (string? basic-auth)
basic-auth
(str (first basic-auth) ":" (second basic-auth)))]
(str "Basic " (base64-encode (utf8-bytes basic-auth)))))
(defn- prepare-request-headers
[{:keys [headers form-params basic-auth oauth-token user-agent] :as req}]
(cond-> headers
form-params (assoc "Content-Type" "application/x-www-form-urlencoded")
basic-auth (assoc "Authorization" (basic-auth-value basic-auth))
oauth-token (assoc "Authorization" (str "Bearer " oauth-token))
user-agent (assoc "User-Agent" user-agent)))
(defn- prepare-response-headers [headers]
(reduce (fn [m [k v]] (assoc m (keyword k) v)) {} headers))
;;; {:a {:b 1 :c [1 2 3]}} => {"a[b]" 1, "a[c]" [1 2 3]}
(defn- nested-param [params] ; code copyed from clj-http
(prewalk (fn [d]
(if (and (vector? d) (map? (second d)))
(let [[fk m] d]
(reduce (fn [m [sk v]]
(assoc m (str (name fk) \[ (name sk) \]) v))
{} m))
d))
params))
(defn query-string
"Returns URL-encoded query string for given params map."
[m]
(let [m (nested-param m)
param (fn [k v] (str (url-encode (name k)) "=" (url-encode v)))
join (fn [strs] (str/join "&" strs))]
(join (for [[k v] m] (if (sequential? v)
(join (map (partial param k) (or (seq v) [""])))
(param k v))))))
(comment (query-string {:k1 "v1" :k2 "v2" :k3 nil :k4 ["v4a" "v4b"] :k5 []}))
(defn- coerce-req
[{:keys [url method body sslengine insecure? query-params form-params multipart] :as req}]
(let [r (assoc req
:url (if query-params
(if (neg? (.indexOf ^String url (int \?)))
(str url "?" (query-string query-params))
(str url "&" (query-string query-params)))
url)
:sslengine (or sslengine (when insecure? (ClientSslEngineFactory/trustAnybody)))
:method (HttpMethod/fromKeyword (or method :get))
:headers (prepare-request-headers req)
;; :body ring body: null, String, seq, InputStream, File, ByteBuffer
:body (if form-params (query-string form-params) body))]
(if multipart
(let [entities (into (map (fn [{:keys [name content filename content-type]}]
(MultipartEntity. name content filename content-type)) multipart)
(map (fn [[k v]] (MultipartEntity. k v nil nil)) form-params))
boundary (MultipartEntity/genBoundary entities)]
(-> r
(assoc-in [:headers "Content-Type"]
(str "multipart/form-data; boundary=" boundary))
(assoc :body (MultipartEntity/encode boundary entities))))
r)))
;; thread pool for executing callbacks, since they may take a long time to execute.
;; protect the IO loop thread: no starvation
(def default-pool (let [max (.availableProcessors (Runtime/getRuntime))
queue (LinkedBlockingQueue.)
factory (PrefixThreadFactory. "client-worker-")]
(ThreadPoolExecutor. max max 60 TimeUnit/SECONDS queue factory)))
;;;; Public API
(defn max-body-filter "reject if response's body exceeds size in bytes"
[size] (org.httpkit.client.IFilter$MaxBodyFilter. (int size)))
;;; "Get the default client. Normally, you only need one client per application. You can config parameter per request basic"
(defonce default-client (delay (HttpClient.)))
(defn make-ssl-engine
"Returns an SSLEngine using default or given SSLContext."
(^SSLEngine [ ] (make-ssl-engine (SSLContext/getDefault)))
(^SSLEngine [^SSLContext ctx] (.createSSLEngine ctx)))
(defonce
^{:dynamic true
:doc "Specifies the default HttpClient used by the `request` function.
Value may be a delay.
A common use case is to replace the default (non-SNI-capable) client with
an SNI-capable one, e.g.:
(:require [org.httpkit.sni-client :as sni-client]) ; Needs Java >= 8
;; Change default client for your whole application:
(alter-var-root #'org.httpkit.client/*default-client* (fn [_] sni-client/default-client))
;; or temporarily change default client for a particular thread context:
(binding [org.httpkit.client/*default-client* sni-client/default-client]
<...>)
See also `make-client`."}
*default-client* default-client)
(defn make-client
"Returns an HttpClient with specified options:
:max-connections ; Max connection count, default is unlimited (-1)
:address-finder ; (fn [java.net.URI]) -> java.net.InetSocketAddress
:ssl-configurer ; (fn [javax.net.ssl.SSLEngine java.net.URI])
:error-logger ; (fn [text ex])
:event-logger ; (fn [event-name])
:event-names ; {<http-kit-event-name> <loggable-event-name>}
:bind-address ; when present will pass local address to SocketChannel.bind()"
[{:keys [max-connections
address-finder
ssl-configurer
error-logger
event-logger
event-names
bind-address]}]
(HttpClient.
(or max-connections -1)
(if address-finder
(reify HttpClient$AddressFinder
(findAddress [this uri] (address-finder uri)))
HttpClient$AddressFinder/DEFAULT)
(if ssl-configurer
(reify HttpClient$SSLEngineURIConfigurer
(configure [this ssl-engine uri] (ssl-configurer ssl-engine uri)))
HttpClient$SSLEngineURIConfigurer/NOP)
(if error-logger
(reify ContextLogger
(log [this message error] (error-logger message error)))
ContextLogger/ERROR_PRINTER)
(if event-logger
(reify EventLogger
(log [this event] (event-logger event)))
EventLogger/NOP)
(cond
(nil? event-names) EventNames/DEFAULT
(map? event-names) (EventNames. event-names)
(instance? EventNames
event-names) event-names
:otherwise (throw (IllegalArgumentException.
(format "Invalid event-names: (%s) %s"
(class event-names) (pr-str event-names)))))
bind-address))
(def ^:dynamic ^:private *in-callback* false)
(defn ^:private deadlock-guard [response]
(let [e #(Exception. "http-kit client deadlock-guard: refusing to deref a request callback from inside a callback. This feature can be disabled with the request's `:deadlock-guard?` option.")]
(reify
clojure.lang.IPending
(isRealized [_] (realized? response))
clojure.lang.IDeref
(deref [_] (if *in-callback* (throw (e)) (deref response)))
clojure.lang.IBlockingDeref
(deref [_ ms value] (if *in-callback* (throw (e)) (deref response ms value))))))
(defn request
"Issues an async HTTP request and returns a promise object to which the value
of `(callback {:opts _ :status _ :headers _ :body _})` or
`(callback {:opts _ :error _})` will be delivered.
The latter will be delivered on client errors only, not on http errors which will be
contained in the :status of the first.
When unspecified, `callback` is the identity
;; Asynchronous GET request (returns a promise)
(request {:url \"http://www.cnn.com\"})
;; Asynchronous GET request with callback
(request {:url \"http://www.cnn.com\" :method :get}
(fn [{:keys [opts status body headers error] :as resp}]
(if error
(println \"Error on\" opts)
(println \"Success on\" opts))))
;; Synchronous requests
@(request ...) or (deref (request ...) timeout-ms timeout-val)
;; Issue 2 concurrent requests, then wait for results
(let [resp1 (request ...)
resp2 (request ...)]
(println \"resp1's status: \" (:status @resp1))
(println \"resp2's status: \" (:status @resp2)))
Output coercion:
;; Return the body as a byte stream
(request {:url \"http://site.com/favicon.ico\" :as :stream})
;; Coerce as a byte-array
(request {:url \"http://site.com/favicon.ico\" :as :byte-array})
;; return the body as a string body
(request {:url \"http://site.com/string.txt\" :as :text})
;; Try to automatically coerce the output based on the content-type header, currently supports :text :stream, (with automatic charset detection)
(request {:url \"http://site.com/string.txt\" :as :auto})
;; return the body as is with no unzipping or coercion whatsoever. returns as org.httpkit.DynamicBytes
(request {:url \"http://site.com/favicon.ico\" :as :none})
Request options:
:url :method :headers :timeout :connect-timeout :idle-timeout :query-params
:as :form-params :client :body :basic-auth :user-agent :filter :worker-pool"
[{:keys [client timeout connect-timeout idle-timeout filter worker-pool keepalive as follow-redirects
max-redirects response trace-redirects allow-unsafe-redirect-methods proxy-host proxy-port
proxy-url tunnel? deadlock-guard?]
:as opts
:or {connect-timeout 60000
idle-timeout 60000
follow-redirects true
max-redirects 10
filter IFilter/ACCEPT_ALL
worker-pool default-pool
response (promise)
keepalive 120000
as :auto
tunnel? false
deadlock-guard? true
proxy-host nil
proxy-port -1
proxy-url nil}}
& [callback]]
(let [client (or client (force *default-client*))
{:keys [url method headers body sslengine]} (coerce-req opts)
deliver-resp #(deliver response ;; deliver the result
(try
(binding [*in-callback* true]
((or callback identity) %1))
(catch Throwable e
;; dump stacktrace to stderr
(HttpUtils/printError (str method " " url "'s callback") e)
;; return the error
{:opts opts :error e})))
handler (reify IResponseHandler
(onSuccess [this status headers body]
(if (and follow-redirects
(#{301 302 303 307 308} status)) ; should follow redirect
(if (>= max-redirects (count trace-redirects))
(if-let [^String location-header (.get headers "location")]
(let [redirect-location (str (.resolve (URI. url) location-header))
change-to-get (and (not allow-unsafe-redirect-methods)
(#{301 302 303} status))]
(request (assoc opts ; follow 301 and 302 redirect
:url redirect-location
:response response
:query-params (if change-to-get nil (:query-params opts))
:form-params (if change-to-get nil (:form-params opts))
:method (if change-to-get
:get ;; change to :GET
(:method opts)) ;; do not change
:trace-redirects (conj trace-redirects url))
callback))
(deliver-resp {:opts (dissoc opts :response)
:error (Exception. (str "No location header is present on redirect response"))}))
(deliver-resp {:opts (dissoc opts :response)
:error (Exception. (str "too many redirects: "
(count trace-redirects)))}))
(deliver-resp {:opts (dissoc opts :response)
:body body
:headers (prepare-response-headers headers)
:status status})))
(onThrowable [this t]
(deliver-resp {:opts opts :error t})))
listener (RespListener. handler filter worker-pool
;; 0 will return as DynamicBytes - i.e. you will need to handle unzip yourself
;; otherwise, there are 4 coercions supported for now
(case as :none 0 :auto 1 :text 2 :stream 3 :byte-array 4))
effective-proxy-url (if proxy-host (str proxy-host ":" proxy-port) proxy-url)
connect-timeout (or timeout connect-timeout)
idle-timeout (or timeout idle-timeout)
cfg (RequestConfig. method headers body connect-timeout idle-timeout
keepalive effective-proxy-url tunnel?)]
(.exec ^HttpClient client url cfg sslengine listener)
(if deadlock-guard?
(deadlock-guard response)
response)))
(defmacro ^:private defreq [method]
`(defn ~method
~(str "Issues an async HTTP " (str/upper-case method) " request. "
"See `request` for details.")
~'{:arglists '([url & [opts callback]] [url & [callback]])}
~'[url & [s1 s2]]
(if (or (instance? clojure.lang.MultiFn ~'s1) (fn? ~'s1) (keyword? ~'s1))
(request {:url ~'url :method ~(keyword method)} ~'s1)
(request (merge ~'s1 {:url ~'url :method ~(keyword method)}) ~'s2))))
(defreq get)
(defreq delete)
(defreq head)
(defreq post)
(defreq put)
(defreq options)
(defreq patch)
(defreq propfind)
(defreq proppatch)
(defreq lock)
(defreq unlock)
(defreq report)
(defreq acl)
(defreq copy)
(defreq move)