-
Notifications
You must be signed in to change notification settings - Fork 1
/
aggregates.clj
54 lines (48 loc) · 2.06 KB
/
aggregates.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
(ns exoscale.vinyl.aggregates
(:require [exoscale.vinyl.fn :as fn]
[exoscale.vinyl.store :as store]
[exoscale.vinyl.tuple :as tuple])
(:import
com.apple.foundationdb.record.provider.foundationdb.FDBRecordStore
com.apple.foundationdb.record.metadata.IndexAggregateFunction
com.apple.foundationdb.record.IsolationLevel
com.apple.foundationdb.record.FunctionNames
com.apple.foundationdb.record.metadata.Key$Evaluated
com.apple.foundationdb.record.metadata.Index
java.util.concurrent.CompletableFuture
java.util.List))
(def function-names
{:count-not-null FunctionNames/COUNT_NOT_NULL
:sum FunctionNames/SUM})
;; If this gets bigger let's move to auspex
(defn ^CompletableFuture then-apply
[^CompletableFuture ftr f]
(.thenApply ftr (fn/make-fun f)))
(defn ^Index get-index
[^FDBRecordStore store index-name]
(let [record-meta (.getRecordMetaData store)]
(.getIndex record-meta (name index-name))))
(defn evaluate-aggregate-function
[^FDBRecordStore store ^String record-type ^IndexAggregateFunction agg param]
(.evaluateAggregateFunction store
(List/of record-type)
agg
(Key$Evaluated/scalar param)
IsolationLevel/SERIALIZABLE))
(defn ^IndexAggregateFunction index-aggregate-function
[store function-name index-name]
(let [index (get-index store index-name)]
(IndexAggregateFunction. (str (if (string? function-name)
function-name
(get function-names function-name)))
(.getRootExpression index)
(.getName index))))
(defn compute
[txn function-name record-type index-name param]
(store/run-in-context
txn
(fn [^FDBRecordStore store]
(let [aggfn (index-aggregate-function store function-name index-name)]
(-> (evaluate-aggregate-function store (name record-type) aggfn param)
(then-apply tuple/get-long)
(.get))))))