-
Notifications
You must be signed in to change notification settings - Fork 4.1k
/
util.clj
267 lines (245 loc) · 10.1 KB
/
util.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
;; Licensed to the Apache Software Foundation (ASF) under one
;; or more contributor license agreements. See the NOTICE file
;; distributed with this work for additional information
;; regarding copyright ownership. The ASF licenses this file
;; to you under the Apache License, Version 2.0 (the
;; "License"); you may not use this file except in compliance
;; with the License. You may obtain a copy of the License at
;;
;; http://www.apache.org/licenses/LICENSE-2.0
;;
;; Unless required by applicable law or agreed to in writing, software
;; distributed under the License is distributed on an "AS IS" BASIS,
;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.util
(:import [java.net InetAddress])
(:import [java.util Map Map$Entry List ArrayList Collection Iterator HashMap])
(:import [java.io FileReader FileNotFoundException])
(:import [java.nio.file Paths])
(:import [org.apache.storm Config])
(:import [org.apache.storm.generated ErrorInfo])
(:import [org.apache.storm.utils Time ClojureTimerTask Utils
MutableObject])
(:import [org.apache.storm.security.auth NimbusPrincipal])
(:import [javax.security.auth Subject])
(:import [java.util UUID Random ArrayList List Collections])
(:import [java.util.zip ZipFile])
(:import [java.util.concurrent.locks ReentrantReadWriteLock])
(:import [java.util.concurrent Semaphore])
(:import [java.nio.file Files Paths])
(:import [java.nio.file.attribute FileAttribute])
(:import [java.io File FileOutputStream RandomAccessFile StringWriter
PrintWriter BufferedReader InputStreamReader IOException])
(:import [java.lang.management ManagementFactory])
(:import [org.apache.commons.exec DefaultExecutor CommandLine])
(:import [org.apache.commons.io FileUtils])
(:import [org.apache.commons.exec ExecuteException])
(:import [org.json.simple JSONValue])
(:import [org.yaml.snakeyaml Yaml]
[org.yaml.snakeyaml.constructor SafeConstructor])
(:require [clojure [string :as str]])
(:import [clojure.lang RT])
(:require [clojure [set :as set]])
(:require [clojure.java.io :as io])
(:use [clojure walk])
(:require [ring.util.codec :as codec])
(:use [org.apache.storm log]))
(defmacro defalias
"Defines an alias for a var: a new var with the same root binding (if
any) and similar metadata. The metadata of the alias is its initial
metadata (as provided by def) merged into the metadata of the original."
([name orig]
`(do
(alter-meta!
(if (.hasRoot (var ~orig))
(def ~name (.getRawRoot (var ~orig)))
(def ~name))
;; When copying metadata, disregard {:macro false}.
;; Workaround for http://www.assembla.com/spaces/clojure/tickets/273
#(conj (dissoc % :macro)
(apply dissoc (meta (var ~orig)) (remove #{:macro} (keys %)))))
(var ~name)))
([name orig doc]
(list `defalias (with-meta name (assoc (meta name) :doc doc)) orig)))
;; name-with-attributes by Konrad Hinsen:
(defn name-with-attributes
"To be used in macro definitions.
Handles optional docstrings and attribute maps for a name to be defined
in a list of macro arguments. If the first macro argument is a string,
it is added as a docstring to name and removed from the macro argument
list. If afterwards the first macro argument is a map, its entries are
added to the name's metadata map and the map is removed from the
macro argument list. The return value is a vector containing the name
with its extended metadata map and the list of unprocessed macro
arguments."
[name macro-args]
(let [[docstring macro-args] (if (string? (first macro-args))
[(first macro-args) (next macro-args)]
[nil macro-args])
[attr macro-args] (if (map? (first macro-args))
[(first macro-args) (next macro-args)]
[{} macro-args])
attr (if docstring
(assoc attr :doc docstring)
attr)
attr (if (meta name)
(conj (meta name) attr)
attr)]
[(with-meta name attr) macro-args]))
(defmacro defnk
"Define a function accepting keyword arguments. Symbols up to the first
keyword in the parameter list are taken as positional arguments. Then
an alternating sequence of keywords and defaults values is expected. The
values of the keyword arguments are available in the function body by
virtue of the symbol corresponding to the keyword (cf. :keys destructuring).
defnk accepts an optional docstring as well as an optional metadata map."
[fn-name & fn-tail]
(let [[fn-name [args & body]] (name-with-attributes fn-name fn-tail)
[pos kw-vals] (split-with symbol? args)
syms (map #(-> % name symbol) (take-nth 2 kw-vals))
values (take-nth 2 (rest kw-vals))
sym-vals (apply hash-map (interleave syms values))
de-map {:keys (vec syms) :or sym-vals}]
`(defn ~fn-name
[~@pos & options#]
(let [~de-map (apply hash-map options#)]
~@body))))
(defmacro thrown-cause?
[klass & body]
`(try
~@body
false
(catch Throwable t#
(Utils/exceptionCauseIsInstanceOf ~klass t#))))
(defmacro forcat
[[args aseq] & body]
`(mapcat (fn [~args]
~@body)
~aseq))
(defmacro try-cause
[& body]
(let [checker (fn [form]
(or (not (sequential? form))
(not= 'catch (first form))))
[code guards] (split-with checker body)
error-local (gensym "t")
guards (forcat [[_ klass local & guard-body] guards]
`((Utils/exceptionCauseIsInstanceOf ~klass ~error-local)
(let [~local ~error-local]
~@guard-body
)))]
`(try ~@code
(catch Throwable ~error-local
(cond ~@guards
true (throw ~error-local)
)))))
(defn clojurify-structure
[s]
(prewalk (fn [x]
(cond (instance? Map x) (into {} x)
(instance? List x) (vec x)
;; (Boolean. false) does not evaluate to false in an if.
;; This fixes that.
(instance? Boolean x) (boolean x)
true x))
s))
; move this func form convert.clj due to cyclic load dependency
(defn clojurify-error [^ErrorInfo error]
(if error
{
:error (.get_error error)
:time-secs (.get_error_time_secs error)
:host (.get_host error)
:port (.get_port error)
}
))
;TODO: We're keeping this function around until all the code using it is properly tranlated to java
;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
(defn map-val
[afn amap]
(into {}
(for [[k v] amap]
[k (afn v)])))
;TODO: We're keeping this function around until all the code using it is properly tranlated to java
;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
(defn filter-val
[afn amap]
(into {} (filter (fn [[k v]] (afn v)) amap)))
;TODO: We're keeping this function around until all the code using it is properly tranlated to java
;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
(defn filter-key
[afn amap]
(into {} (filter (fn [[k v]] (afn k)) amap)))
;TODO: We're keeping this function around until all the code using it is properly tranlated to java
;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function.
(defn map-key
[afn amap]
(into {} (for [[k v] amap] [(afn k) v])))
;TODO: Once all the other clojure functions (100+ locations) are translated to java, this function becomes moot.
(def not-nil? (complement nil?))
(defmacro dofor [& body]
`(doall (for ~@body)))
;; The following two will go away when worker, task, executor go away.
(defn assoc-apply-self [curr key afn]
(assoc curr key (afn curr)))
; These seven following will go away later. To be replaced by idiomatic java.
(defmacro recursive-map
[& forms]
(->> (partition 2 forms)
(map (fn [[key form]] `(assoc-apply-self ~key (fn [~'<>] ~form))))
(concat `(-> {}))))
(defmacro fast-list-iter
[pairs & body]
(let [pairs (partition 2 pairs)
lists (map second pairs)
elems (map first pairs)
iters (map (fn [_] (gensym)) lists)
bindings (->> (map (fn [i l] (let [lg (gensym)] [lg l i `(if ~lg (.iterator ~lg))])) iters lists)
(apply concat))
tests (map (fn [i] `(and ~i (.hasNext ^Iterator ~i))) iters)
assignments (->> (map (fn [e i] [e `(.next ^Iterator ~i)]) elems iters)
(apply concat))]
`(let [~@bindings]
(while (and ~@tests)
(let [~@assignments]
~@body)))))
(defmacro fast-list-for
[[e alist] & body]
`(let [ret# (ArrayList.)]
(fast-list-iter [~e ~alist]
(.add ret# (do ~@body)))
ret#))
(defmacro fast-map-iter
[[bind amap] & body]
`(let [iter# (if ~amap (.. ^Map ~amap entrySet iterator))]
(while (and iter# (.hasNext ^Iterator iter#))
(let [entry# (.next ^Iterator iter#)
~bind [(.getKey ^Map$Entry entry#) (.getValue ^Map$Entry entry#)]]
~@body))))
(defn fast-group-by
[afn alist]
(let [ret (HashMap.)]
(fast-list-iter
[e alist]
(let [key (afn e)
^List curr (let [curr (.get ret key)]
(if curr
curr
(let [default (ArrayList.)]
(.put ret key default)
default)))]
(.add curr e)))
ret))
(defmacro -<>
([x] x)
([x form] (if (seq? form)
(with-meta
(let [[begin [_ & end]] (split-with #(not= % '<>) form)]
(concat begin [x] end))
(meta form))
(list form x)))
([x form & more] `(-<> (-<> ~x ~form) ~@more)))
(defn hashmap-to-persistent [^HashMap m]
(zipmap (.keySet m) (.values m)))