/
http.clj
159 lines (138 loc) · 5.53 KB
/
http.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
(ns exoscale.compute.api.http
"HTTP support for the Exoscale Compute API"
(:require [clojure.string :as str]
[aleph.http :as http]
[byte-streams :as bs]
[manifold.deferred :as d]
[manifold.time :as t]
[exoscale.compute.api.payload :as payload]))
(def default-page-size
"Number of records to fetch by default"
500)
(def default-poll-interval
"Default timeout for async requests"
100)
(def default-max-polls
"Default number of times to poll before timing out"
10)
(def default-endpoint
"Default HTTP endpoint for the Exoscale API"
"https://api.exoscale.com/compute")
(def default-http-opts
"Default HTTP options passed to the underlying HTTP library (aleph)."
{:connection-timeout 10000
:request-timeout 10000
:pool-timeout 1000
:read-timeout 10000})
(defn with-decoded-error-body
"Catches potential deferred error and rethrow with decoded
ex-data.body if there is one, otherwise just rethrow"
[d]
(d/catch d clojure.lang.ExceptionInfo
(fn [e]
(let [d (ex-data e)]
(throw (if-let [body (:body d)]
(ex-info (ex-message e)
(assoc d :body (bs/to-string body))
(ex-cause e))
e))))))
(def list-command?
(memoize
(fn [opcode]
(-> opcode name (str/starts-with? "list")))))
(defn raw-request!!
"Send an HTTP request with manifold"
[{:keys [endpoint http-opts] :as config} payload]
(let [opts (merge default-http-opts http-opts {:as :json})
method (some-> config :request-method name str/lower-case keyword)
reqfn (if (= :get method) http/get http/post)
paramk (if (= :get method) :query-params :form-params)]
(-> (reqfn (or endpoint default-endpoint) (assoc opts paramk payload))
with-decoded-error-body)))
(defn extract-response
"From a response get the result data"
[resp opcode]
(get (:body resp) (-> opcode name str/lower-case (str "response") keyword)))
(defn find-payload
[resp opcode]
(let [list? (list-command? opcode)
ks (set (keys resp))
payload (-> resp (dissoc :count) vals first)
elem-count (:count resp)]
(cond
(and list? (empty? payload)) (with-meta [] {:count elem-count})
list? (with-meta payload {:count elem-count})
(some? (:jobid resp)) resp
(map? payload) payload
(sequential? payload) payload
:else resp)))
(defn text-request!!
[config opcode params]
(raw-request!! config (payload/build-payload config opcode params)))
(defn json-request!!
[config opcode params]
(d/chain
(raw-request!! config (payload/build-payload config opcode params))
#(extract-response % opcode)
#(find-payload % opcode)))
(defn list-request!!
"Perform a paging request. Elements are fetched by chunks of 500.
If both `page` and `pagesize` are provided then we return a single page only,
Otherwise fetch all results page by page.
This functionality is dependent on the api call supporting pagination,
`listZones` and `listSecurityGroups` are examples of this.
Custom Exoscale commands that are not present in Cloudstack such as
`listApiKeys` return the full list and do not support pagination."
[config opcode {:keys [page pagesize] :or {page 1} :as params}]
(let [single-page-only? (and (some? pagesize) (some? page))
pagesize (or pagesize default-page-size)]
(d/loop [page page
acc []]
(d/chain (json-request!! config opcode (assoc params
:page page
:pagesize pagesize))
(fn [resp]
(let [acc (concat acc resp)
meta-count (:count (meta resp))
all-results-present? (= meta-count (count acc))]
(if (or (nil? meta-count)
single-page-only?
all-results-present?
(not (seq resp)))
(with-meta (vec acc) (meta resp))
(d/recur (inc page) acc))))))))
(defn wait-or-return-job!!
[config remaining opcode]
(let [interval (or (:poll-interval config) default-poll-interval)]
(fn [{:keys [jobstatus] :as job}]
(if (zero? jobstatus)
(d/chain (t/in interval (constantly nil))
(fn [_] (d/recur (dec remaining))))
(find-payload (:jobresult job) opcode)))))
(defn job-loop!!
[config opcode]
(fn [{:keys [jobid] :as resp}]
(if (some? jobid)
(d/loop [remaining (or (:max-polls config) default-max-polls)]
(if (pos? remaining)
;; The previous response can be used as input
;; to queryAsyncJobResult directly
(d/chain (json-request!! config "queryAsyncJobResult" {:jobid jobid})
(wait-or-return-job!! config remaining opcode))
resp))
resp)))
(defn job-request!!
[config opcode params]
(d/chain (json-request!! config opcode params)
(job-loop!! config opcode)))
(defn request!!
"Send a request to the API and figure out the best course of action
from there.
Specifically:
- For a list request, fetch and page results
- For an async job request, wait for the job result
- "
[config opcode params]
(if (list-command? opcode)
(list-request!! config opcode params)
(job-request!! config opcode params)))