-
Notifications
You must be signed in to change notification settings - Fork 0
/
search.clj
434 lines (397 loc) · 25.6 KB
/
search.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
(ns lambdaroyal.memory.abstraction.search
(:require [clojure.core.async :refer [>! <! alts! timeout chan go]]
[lambdaroyal.memory.core.tx :as tx])
(:import [lambdaroyal.memory.core.tx Index ReferrerIntegrityConstraint]))
;; --------------------------------------------------------------------
;; TYPE ABSTRACTIONS
;; --------------------------------------------------------------------
(defn break-condition-state "creates a map with :result-count, :max-result-count as well as :stop-feeding-aggregator that can be optionally feeded to a combined search in order to dynamically stop async searches when individual search function returned enough results"
[max-result-count]
{:max-result-count max-result-count
:result-count (atom 0)
:stop-feeding-aggregator (atom false)})
(defn check-break-condition-reached [break-condition-state]
(if break-condition-state
(> @(:result-count break-condition-state) (:max-result-count break-condition-state))
false))
(defn inc-and-check-break-condition-reached [break-condition-state]
(if break-condition-state
(if (> (swap! (:result-count break-condition-state) inc) (:max-result-count break-condition-state))
(do (reset! (:stop-feeding-aggregator break-condition-state) true) true)
false)
false))
(defn is-stop-feeding-aggregator [break-condition-state]
(if break-condition-state
@(:stop-feeding-aggregator break-condition-state)
false))
(defn abstract-search
"higher order function - takes a function [fn] that returns a lazy sequence [s] of user-scope tuples from lambdaroyal memory. This function returns a function that returns a channel where the result of the function fn, s is pushed to."
[λ]
(fn [args]
(let [c (chan)]
(do
(go (>! c (λ args)))
c))))
(defn combined-search
"higher order function - takes a sequence of concrete functions (abstract-search) [fns] a aggregator function [agr], a query parameter [query] (can be a sequence) and optional parameters [opts] and returns go block. all the search functions [fns] are called with parameter [query] executed in parallel and feed their result (a sequence of user-scope tuples from lambdaroyal memory) to the aggregator function [agr]. By default the resulting go routine waits for all fn in [fns] for delivering a result.
The following options are accepted\n
:timeout value in ms after which the aggregator channel is closed, no more search results are considered.\n
:minority-report number of search function fn in [fns] that need to result in order to close the aggregator channel is closed and no more search results are considered.\n
:finish-callback a function with no params that gets called when the aggregator go block stops.
:break-condition-state containing ...
:max-result-count a long denoting the max number of results to be considered. MUST BE USED IN CONJUNCTION WITH :result-count. Is relevant iff all individual search functions return a sequence. Keep in mind that the individual search functions
are responsible for altering the reference of :result-count
:result-count an atom referencing a long denoting the current number of results fetched so far. MUST BE USED IN CONJUNCTION WITH :max-result-count. Keep in mind that the individual search functions
are responsible for altering the reference of :result-count
:stop-feeding-aggregator an atom referencing a long. set by aggregator function in order to stop the loop receiving results from individual search functions. CAN BE USED IN CONJUNCTION WITH :max-result-count and :result-count."
[fns agr query & opts]
(let [opts (apply hash-map opts)
c (chan)
t (if (:timeout opts) (timeout (:timeout opts)) nil)
limit (or (:minority-report opts) (count fns))
;;fire of all searches in parallel - don't run them if we got too much so far
fn-count (reduce
(fn [acc λ]
(if (not (check-break-condition-reached (:break-condition-state opts)))
(do
(go (>! c (<! (λ query))))
(inc acc))
acc))
0 fns)
;;jerk the jokers
limit (min limit fn-count)]
;;fire of the aggregator (controller) go block
(go
(loop [i 0 stop false]
(if
;;recur until we reach timeout or all searches returned or we got enough juice
(or
(>= i limit)
stop
(is-stop-feeding-aggregator (:break-condition-state opts)))
(if (:finish-callback opts) ((:finish-callback opts)))
(let [next (if t
;;take result or bump timeout
(first (alts! [c t]))
;;else : wait indef for result
(<! c))]
;;send most recent search result (next) to the aggregator
(if next
(agr next))
;;loop - wait for the next
(recur (inc i) (nil? next))))))))
(defn concat-aggregator
"assumes ref to be a vector reference"
[ref data]
(dosync
(commute ref concat data)))
(defn set-aggregator
"assumes that data to be aggregated in are collection tupels where the document (second element) contains a key :coll denoting the collection the tuple belongs to. yields a set of collection tuples. assumes the ref to be a set"
[ref data]
(dosync
(apply commute ref conj data)))
(defn gen-sorted-set
"generates a STM ref on a sorted set that can be used in conjunction with set-aggregator"
[]
(ref (sorted-set-by
(fn [x x']
(let [coll (-> x last :coll)
coll' (-> x' last :coll)]
(if
(= coll coll')
(compare (first x) (first x'))
(compare coll coll')))))))
(defn sorted-set-aggregator
"assumes that data to be aggregated in are collection tupels where the document (second element) contains a key :coll denoting the collection the tuple belongs to and all the keys (first element) are comparable with each other yields a set of collection tuples. assumes that ref denotes a sorted-set-by. the keys of the map are (collection tuple-key)"
[ref data]
(doseq [x data]
(dosync
(commute ref assoc (list (-> data second :coll) (-> data first)) data))))
(defn combined-search'
"derives from lambdaroyal.memory.abstraction.search/combined-search -
higher order function - takes a aggregator function [agr], a sequence of concrete functions (abstract-search), a query parameter [query] (can be a sequence) and optional parameters [opts] and returns go block. all the search functions [fns] are called with parameter [query] executed in parallel and feed their result (a sequence of user-scope tuples from lambdaroyal memory) to the aggregator function [agr]. By default the resulting go routine waits for all fn in [fns] for delivering a result.
The following options are accepted\n
:timeout value in ms after which the aggregator channel is closed, no more search results are considered.\n
:minority-report number of search function fn in [fns] that need to result in order to close the aggregator channel is closed and no more search results are considered.\n
:finish-callback a function with no params that gets called when the aggregator go block stops."
[agr fns query & opts]
(apply combined-search fns agr query opts))
(defn combined-search''
"SET SEARCH - derives from lambdaroyal.memory.abstraction.search/combined-search -
higher order function - takes a sequence of concrete functions (abstract-search), a query parameter [query] (can be a sequence), a function [finish-callback] and optional parameters [opts] and returns go block. all the search functions [fns] are called with parameter [query] executed in parallel and feed their result (a sequence of user-scope tuples from lambdaroyal memory) to the aggregator function sorted-set-aggregator. the finish-callback takes one paremeter, the dereferenced aggregator value, in layman terms, all the documents that were found by the aggregator. By default the resulting go routine waits for all fn in [fns] for delivering a result.
The following options are accepted\\n
:timeout value in ms after which the aggregator channel is closed, no more search results are considered.\\n
:minority-report number of search function fn in [fns] that need to result in order to close the aggregator channel is closed and no more search results are consfidered."
[fns query finish-callback & opts]
(let [acc (ref #{})
agr (partial set-aggregator acc)]
(apply combined-search' agr fns query :finish-callback #(finish-callback @acc) opts)))
;; --------------------------------------------------------------------
;; BUILDING A DATA HIERARCHIE
;; CR - https://github.com/gixxi/lambdaroyal-memory/issues/1
;; This allows to build a hierarchie of documents as per hierarchie
;; levels. A hierarchie level denotes an attribute of the document -
;; either directly (first level attribute) or indirectly by applying
;; a function to the document
;; --------------------------------------------------------------------
(defn hierarchie
"[level] is variable arity set of keywords or function taking a document into account and providing back a category. [handler] is a function applied to the leafs of the hierarchie. Using identity as function will result the documents as leafs."
[xs handler & levels]
(if levels
(let [level (first levels)
next (rest levels)
xs' (group-by #(level %) xs)
xs'' (pmap
(fn [[k v]]
;;consider partial hierarchies, where a level is not present
(if k
[[k (count v)]
(apply hierarchie v handler next)]
;;else
(apply hierarchie v handler next)))
xs')
;;special handling for partial hierarchies, if their is just one result in the bucket we
;;use this rather than the bucket
xs'' (if (and (= (count xs'') 1) next)
(first xs'') xs'') ]
xs'')
;;else
(if handler (handler xs)
;;else
xs)))
(defn hierarchie-backtracking-int
[initial xs handler λbacktracking & levels]
(let [res (if levels
(let [level (first levels)
next (rest levels)
xs' (group-by #(level %) xs)
xs'' (map
(fn [[k v]]
;;atomic denotes whether a seq with just one element was present -> only the single element is returned
(let [res-from-recursion (apply hierarchie-backtracking-int false v handler λbacktracking next)]
;;consider partial hierarchies, where a level is not present
(if k
(let [;;denotes whether the data came from leaf
leaf (if (and (meta res-from-recursion) (-> res-from-recursion meta :leaf true?) (instance? clojure.lang.IPending res-from-recursion))
true false)
res-from-recursion (if leaf
@res-from-recursion
res-from-recursion)]
[(λbacktracking leaf [k (count v)] res-from-recursion) res-from-recursion])
;;else
res-from-recursion)))
xs')
;;special handling for partial hierarchies, if their is just one result in the bucket we
;;use this rather than the bucket
xs'' (if (= (count xs'') 1) (first xs'') xs'')]
xs'')
;;else
;;what the hack, we need to know whether we got data from the final recursion
;;furthermore this data might be provided back without using grouping it due to
;;partial hierarchies, types would be nice
(deliver (with-meta (promise) {:leaf true})
(if handler (handler xs)
;;else
xs)))]
(if (and initial (meta res) (-> res meta :leaf) (instance? clojure.lang.IPending res))
@res
res)))
(defn hierarchie-backtracking
"[level] is variable arity set of keywords or function taking a document into account and providing back a category. [handler] is a function applied to the leafs of the hierarchie. Using identity as function will result the documents as leafs.
λbacktracking-fn must accept boolean flag that denotes whether we inspect leafs, the group key k, and a sequence of elements that result from applying a level discriminator to xs. k is [level-val count], where level-val denotes the result of applying the level discrimator function, count the number of elements WITHIN the next recursion matching the category. The function must return a adapted version of k that reflects the information necessary to the user."
[xs handler λbacktracking & levels]
(apply hierarchie-backtracking-int true xs handler λbacktracking levels))
(defn hierarchie-ext
"builds up a hierarchie where a node is given by it's key (level discriminator), a map containing extra info that are characteristic for (an arbitrary) document that fits into this hierarchie as well as all the matching documents classified by the values of the next category (if any) or the matching documents as subnodes.
[level] is variable arity set of taking a document into account and providing back a tuple [category ext], where category is a keyword or function providing back the category of a document whereas ext is a keyword or function providing back the the characteristics of a document with respect to the category. [handler] is a function applied to the leafs of the hierarchie. Using identity as function will result the documents as leafs."
[xs handler & levels]
(if levels
(let [level (first levels)
next (rest levels)]
(let [[level-category level-ext] level
xs' (group-by #(level-category %) xs)
xs'' (pmap
(fn [[k v]]
;;consider partial hierarchies, where a level is not present
(if k
[[k (count v) (if level-ext (level-ext (first v)) nil)]
(apply hierarchie-ext v handler next)]
(apply hierarchie-ext v handler next)))
xs')]
xs''))
;;else
(if handler (handler xs)
;;else
xs)))
(defn ric
"returns the first identified ReferrerIntegrityConstraint from a collection [source] that refers to a collection [target]. If the [foreign-key] is given then we explicitly seach for a ric with a certain foreign-key."
([tx source target]
{:pre [(contains? (-> tx :context deref) source)
(contains? (-> tx :context deref) target)]}
(let [ctx (-> tx :context deref)
source-coll (get ctx source)
target-coll (get ctx target)
constraints (map last (-> source-coll :constraints deref))
rics (filter
#(instance? ReferrerIntegrityConstraint %) constraints)]
(some #(if (= (.foreign-coll %) target) %) rics)))
([tx source target foreign-key]
{:pre [(contains? (-> tx :context deref) source)
(contains? (-> tx :context deref) target)]}
(let [ctx (-> tx :context deref)
source-coll (get ctx source)
target-coll (get ctx target)
constraints (map last (-> source-coll :constraints deref))
rics (filter
#(instance? ReferrerIntegrityConstraint %) constraints)]
(some #(if (and
(= (.foreign-coll %) target)
(= (.foreign-key %) foreign-key)) %) rics))))
(defn by-ric
"returns all user scope tuples from collection [source] that refer to collection [target] by some ReferrerIntegrityConstraint and have foreign-key of the sequence [keys]. the sequence is supposed to be redundancy free (set). opts might contain :ratio-full-scan iff greater or equal to the ratio (count keys / number of tuples in target of [0..1]) then the source collection is fully scanned for matching tuples rather than queried by index lookups. If not given, 0.4 is the default barrier. If the :foreign-key is given within the opts then we explicitly seach for a ric with a certain foreign-key. :abs-full-scan iff the number of keys is greater than the abs-full-scan, then the source collection is fully scanned for matching tuples rather and queries by index lookups"
([tx source target keys & opts]
(with-meta
(let [opts (if opts (apply hash-map opts))
{foreign-key :foreign-key verbose :verbose parallel :parallel, ratio-full-scan :ratio-full-scan abs-full-scan :abs-full-scan reverse' :reverse, :or {parallel true ratio-full-scan 0.2 verbose false abs-full-scan 512 reverse' false}} opts
ric (or (if foreign-key
(ric tx source target foreign-key) (ric tx source target))
(throw (IllegalStateException. (str "Failed to build data projection - no ReferrerIntegrityConstraint from collection " source " to collection " target " defined."))))
target-count (-> tx :context deref target :data deref count)
keys-to-collsize-ratio (if (= target-count 0) 0 (/ (count keys) target-count))
full-scan (or
(> (count keys) abs-full-scan)
(> keys-to-collsize-ratio ratio-full-scan))
_ (if verbose (println :search-by-ric-keys :ric source (.foreign-key ric) "->" target :keys (count keys) :target-count target-count :ratio keys-to-collsize-ratio))
_ (if (and verbose full-scan) (println :full-scan source target keys-to-collsize-ratio))
ctx (-> tx :context deref)
source-coll (get ctx source)]
(if (= target-count 0)
[]
;;else
(if full-scan
;;in full-scan mode we just build a set of the keys provided and filter
(let [keys (into #{} keys)
xs ((if-not reverse' tx/select tx/rselect) tx source)]
(filter #(contains? keys (get (last %) (.foreign-key ric))) xs))
(let [find-fn (fn [key]
(take-while
#(= key (get (last %) (.foreign-key ric)))
(map tx/user-scope-tuple
((if-not reverse' tx/select-from-coll tx/rselect-from-coll)
source-coll
[(.foreign-key ric)]
(if reverse' <= >=)
[key]))))
;;one seq with the results for each key
xs ((if parallel pmap map) find-fn keys)
result (persistent! (loop [res (transient []) xs (filter #(-> % empty? not) xs)]
(if-let [xs' (first xs)]
(recur (reduce conj! res xs') (rest xs))
res)))]
(if (and reverse' (> (count xs) 1))
(reverse (sort-by first result))
result)))))
{:coll-name target})))
(defn by-referencees
"returns all user scope tuples from collection [target] that are referenced to collection [target] by some ReferencedIntegrityConstraint and have primary key as per the foreign keys within xs. the sequence is supposed to be redundancy free (set)."
([tx source target xs & opts]
(with-meta
(let [opts (if opts (apply hash-map opts))
{foreign-key :foreign-key verbose :verbose parallel :parallel reverse' :reverse, :or {parallel true verbose false reverse' false}} opts
ric (or (if foreign-key
(ric tx source target foreign-key) (ric tx source target))
(throw (IllegalStateException. (str "[by-referencees] Failed to build data projection - no ReferrerIntegrityConstraint from collection " source " to collection " target " defined."))))
_ (if verbose (println (format "[by-referencees] to collection %s -> %s -> %s " source (.foreign-key ric) target)))
res (distinct (filter some? ((if parallel pmap map)
(fn [x]
(let [fk (get (last x) (.foreign-key ric))]
(tx/select-first tx target fk)))
xs)))]
(if reverse' (reverse (sort-by first res)) res))
{:coll-name target})))
(defn >>
"Pipe, Convenience Function. A higher order function that returns a function taking a transaction [tx] and a set of user scope tupels into account that MUST aggregate as meta data key :coll-name the collection the tupels belong to, the lambda returns itself a by-ric from source collection as per the input xs to target location. The filter is applied to the xs. If the :foreign-key is given within the opts then we explicitly seach for a ric with a certain foreign-key. opts can also contain :reverse true to yield the tupels of the target collection in reverse order"
[target filter-fn & opts]
(fn [tx xs]
{:pre [(-> xs meta :coll-name)]}
(with-meta (filter filter-fn
(apply by-ric tx target (-> xs meta :coll-name) (map first xs) opts))
{:coll-name target})))
(defn << "Reverse Pipe. Returns all tupels from target that match the keys."
[target filter-fn & opts]
(fn [tx xs]
{:pre [(-> xs meta :coll-name)]}
(with-meta (filter filter-fn
(apply by-referencees
tx
(-> xs meta :coll-name)
target
xs
opts))
{:coll-name target})))
(defn >>>
"Pipe, Convenience Function. A higher order function that returns a function taking a transaction [tx] and a set of user scope tupels into account that MUST aggregate as meta data key :coll-name the collection the tupels belong to, the lambda returns itself a by-ric from source collection as per the input xs to target location. If the :foreign-key is given within the opts then we explicitly seach for a ric with a certain foreign-key."
[target & opts]
(apply >> target (fn [x] true) opts))
(defn <<<
"Pipe, Convenience Function. Returns all tupels from target that match the keys."
[target & opts]
(apply << target (fn [x] true) opts))
(defn filter-all
"higher order function that returns a function that returns a sequence of all tuples within the collection with name [coll-name]"
[tx coll-name]
(let [meta {:coll-name coll-name}]
(with-meta (fn []
(with-meta
(tx/select tx coll-name) meta)) meta)))
(defn filter-xs
"higher order function that returns a function that returns a sequence of all tuples within xs that need to belong the collection with name [coll-name]"
[coll-name xs]
(let [meta {:coll-name coll-name}]
(with-meta (fn []
(with-meta
xs meta)) meta)))
(defn filter-key
"higher order function that returns a function that returns a sequence of all tupels whose key is equal to [key]"
([tx coll-name key]
(let [meta {:coll-name coll-name}]
(with-meta (fn [& opts]
(with-meta
(take-while
#(= key (first %))
(tx/select tx coll-name >= key))
meta)) meta)))
([tx coll-name start-test start-key]
(let [meta {:coll-name coll-name}]
(with-meta (fn [& opts]
(with-meta (tx/select tx coll-name start-test start-key) meta)) meta)))
([tx coll-name start-test start-key stop-test stop-key]
(let [meta {:coll-name coll-name}]
(with-meta (fn [& opts]
(with-meta (tx/select tx coll-name start-test start-key stop-test stop-key))
meta) meta))))
(defn filter-index
"higher order function that returns a function that returns a sequence of all tupels that are resolved using a index lookup using the attribute seq [attr] and the comparator [start-test] as well as the attribute value sequence [start-key]. The second version 'lambdas' the index search for a range additionally taking [stop-test] and [stop-key] into account"
([tx coll-name attr start-test start-key]
(let [meta {:coll-name coll-name}]
(with-meta (fn [& opts]
(with-meta (tx/select tx coll-name attr start-test start-key)
meta))
meta)))
([tx coll-name attr start-test start-key stop-test stop-key]
(let [meta {:coll-name coll-name}]
(with-meta
(fn [& opts]
(with-meta (tx/select tx coll-name attr start-test start-key stop-test stop-key) meta))
meta))))
(defn proj
"data projection - takes a higher order functions λ into account that that returns a function whose application results in a seq of user scope tupels AND metadata with :coll-name denoting the collection the tupels belong to. Furthermore this function takes a variable number of path functions [path-fns] into account. The first one is supposed to take the outcome of application of λ into account, all others are supposes to take the outcome of the respective predessor path-fn into account. All are supposed to produce a seq of user scope tupels into account that is consumable be the respective successor path-fn AND metadata denoting the collection name by key :coll-name."
[tx λ & path-fns]
;;this fuss means we copy the meta data from the function to the result of its application
(let [xs (with-meta (λ) (meta λ))]
(if-not path-fns
xs
(loop [xs xs path-fns path-fns]
(if
(empty? path-fns) xs
(recur ((first path-fns) tx xs) (rest path-fns)))))))