/
capjure.clj
235 lines (195 loc) · 8.68 KB
/
capjure.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
(ns org.rathore.amit.capjure)
(import '(org.apache.hadoop.hbase HBaseConfiguration HColumnDescriptor HTableDescriptor)
'(org.apache.hadoop.hbase.client HTable Scanner HBaseAdmin)
'(org.apache.hadoop.hbase.io BatchUpdate Cell))
(def *mock-mode* false)
(def *hbase-master* "localhost:60000")
(def *primary-keys-config* {})
(declare symbol-name)
(defn encoding-keys []
(*primary-keys-config* :encode))
(defn decoding-keys []
(*primary-keys-config* :decode))
(defn qualifier-for [key-name]
(((encoding-keys) key-name) :qualifier))
(defn encoding-functor-for [key-name]
(((encoding-keys) key-name) :functor))
(defn all-primary-keys []
(map #(symbol-name %) (keys (encoding-keys))))
(defn primary-key [column-family]
(first (filter #(.startsWith column-family (str %)) (all-primary-keys))))
(defn decoding-functor-for [key-name]
(((decoding-keys) (keyword key-name)) :functor))
(defn decode-with-key [key-name value]
((decoding-functor-for key-name) value))
(declare flatten add-to-insert-batch capjure-insert hbase-table read-row read-cell)
(defn capjure-insert [object-to-save hbase-table-name row-id]
(let [table (hbase-table hbase-table-name)
batch-update (BatchUpdate. (str row-id))
flattened (flatten object-to-save)]
(add-to-insert-batch batch-update flattened)
(.commit table batch-update)))
(defn add-to-insert-batch [batch-update flattened-list]
(loop [flattened-pairs flattened-list]
(if (empty? flattened-pairs)
:done
(let [first-pair (first flattened-pairs)
column (first first-pair)
value (last first-pair)]
(.put batch-update column (.getBytes (str value)))
(recur (rest flattened-pairs))))))
(defn symbol-name [prefix]
(cond
(keyword? prefix) (name prefix)
:else (str prefix)))
(defn new-key [part1 separator part2]
(str (symbol-name part1) separator (symbol-name part2)))
(defn prepend-to-keys [prefix separator hash-map]
(let [all-keys (to-array (keys hash-map))]
(areduce all-keys idx ret {}
(assoc ret
(new-key prefix separator (aget all-keys idx))
(hash-map (aget all-keys idx))))))
(defn postpend-to-keys [postfix separator hash-map]
(let [all-keys (to-array (keys hash-map))]
(areduce all-keys idx ret {}
(assoc ret
(new-key (aget all-keys idx) separator postfix)
(hash-map (aget all-keys idx))))))
(declare process-multiple process-maps process-map process-strings)
(defn process-key-value [key value]
(cond
(map? value) (prepend-to-keys key ":" value)
(vector? value) (process-multiple key value)
:else {(new-key key ":" "") value}))
(defn process-multiple [key values]
(let [all (seq values)]
(cond
(map? (first all)) (process-maps key all)
:else (process-strings key (to-array all)))))
(defn process-maps [key maps]
(let [qualifier (qualifier-for key)
encoding-functor (encoding-functor-for key)]
(apply merge (map
(fn [single-map]
(process-map (symbol-name key) (encoding-functor single-map) (dissoc single-map qualifier)))
maps))))
(defn process-map [initial-prefix final-prefix single-map]
(let [all-keys (to-array (keys single-map))]
(areduce all-keys idx ret {}
(assoc ret
(str initial-prefix "_" (symbol-name (aget all-keys idx)) ":" final-prefix)
(single-map (aget all-keys idx))))))
(defn process-strings [key strings]
(areduce strings idx ret {}
(assoc ret (new-key key ":" (aget strings idx)) (aget strings idx))))
(defn flatten [bloated_object]
(apply merge (map
(fn [pair]
(process-key-value (first pair) (last pair)))
(seq bloated_object))))
(declare read-as-hash cell-value-as-string hydrate-pair has-many-strings-hydration has-many-objects-hydration has-one-string-hydration has-one-object-hydration collapse-for-hydration)
(defn is-from-primary-keys [key-name]
(let [key-name-str (symbol-name key-name)]
(some #(.startsWith key-name-str %) (all-primary-keys))))
(defn column-name-empty? [key-name]
(= 1 (count (.split key-name ":"))))
(defn collapse-for-hydration [mostly-hydrated]
(let [primary-keys (to-array (all-primary-keys))]
(areduce primary-keys idx ret mostly-hydrated
(let [primary-key (symbol-name (aget primary-keys idx))
inner-map (ret primary-key)
inner-values (apply vector (vals inner-map))]
(cond
(empty? inner-values) ret
:else (assoc ret primary-key inner-values))))))
(defn hydrate [flattened-object]
(let [flat-keys (to-array (keys flattened-object))]
(collapse-for-hydration (areduce flat-keys idx ret {}
(hydrate-pair (aget flat-keys idx) flattened-object ret)))))
(defn hydrate-pair [key-name flattened hydrated]
(let [value (.trim (str (flattened key-name)))
key-tokens (seq (.split key-name ":"))
column-family (first key-tokens)
column-name (last key-tokens)]
(cond
(= column-name value) (has-many-strings-hydration hydrated column-family value)
(is-from-primary-keys column-family) (has-many-objects-hydration hydrated column-family column-name value)
(column-name-empty? key-name) (has-one-string-hydration hydrated column-family value)
:else (has-one-object-hydration hydrated column-family column-name value))))
(defn has-one-string-hydration [hydrated column-family value]
(assoc hydrated column-family value))
(defn has-one-object-hydration [hydrated column-family column-name value]
(let [value-map (or (hydrated column-family) {})]
(assoc hydrated column-family
(assoc value-map column-name value))))
(defn has-many-strings-hydration [hydrated column-family value]
(let [old-value (hydrated column-family)]
(cond
(nil? old-value) (assoc hydrated column-family [value])
:else (assoc hydrated column-family (apply vector (seq (cons value old-value)))))))
(defn has-many-objects-hydration [hydrated column-family column-name value]
(let [outer-key (primary-key column-family)
inner-key (.substring column-family (+ 1 (count outer-key)) (count column-family))
primary-key-name (qualifier-for (keyword outer-key))
inner-map (or (hydrated outer-key) {})
inner-object (or (inner-map column-name) {(symbol-name primary-key-name) (decode-with-key outer-key column-name)})]
(assoc hydrated outer-key
(assoc inner-map column-name
(assoc inner-object inner-key value)))))
(defn hydrate-hbase-object [hbase-row]
(let [keyset (map #(String. %) (seq (.keySet hbase-row)))
columns-and-values (map (fn [column-name]
{column-name (cell-value-as-string hbase-row column-name)})
keyset)]
(apply merge columns-and-values)))
(defn read-as-hash [hbase-table-name row-id]
(let [row (read-row hbase-table-name row-id)]
(hydrate-hbase-object row)))
(defn read-as-hydrated [hbase-table-name row-id]
(let [as-hash (read-as-hash hbase-table-name row-id)]
(hydrate as-hash)))
(defn row-exists? [hbase-table-name row-id-string]
(let [table (hbase-table hbase-table-name)]
(.exists table (.getBytes row-id-string))))
(defn cell-value-as-string [row column-name]
(String. (.getValue (.get row (.getBytes column-name)))))
(defn read-row [hbase-table-name row-id]
(let [table (hbase-table hbase-table-name)]
(.getRow table (.getBytes row-id))))
(defn read-all-versions [hbase-table-name row-id-string number-of-versions]
(let [table (hbase-table hbase-table-name)]
(.getRow table row-id-string number-of-versions)))
(defn read-cell [hbase-table-name row-id column-name]
(let [row (read-row hbase-table-name row-id)]
(String. (.getValue (.get row (.getBytes column-name))))))
(defn table-iterator [hbase-table-name columns]
(let [table (hbase-table hbase-table-name)]
(iterator-seq (.iterator (.getScanner table (into-array columns))))))
(defn table-scanner
([hbase-table-name columns]
(let [table (hbase-table hbase-table-name)]
(.getScanner table (into-array columns))))
([hbase-table-name columns start-row-string]
(let [table (hbase-table hbase-table-name)]
(.getScanner table (into-array columns) start-row-string))))
(defn rowcount [hbase-table-name & columns]
(count (table-iterator hbase-table-name columns)))
(defn delete-all [hbase-table-name column-name]
(let [table (hbase-table hbase-table-name)]
(.deleteAll table column-name)))
(defn hbase-config []
(let [h-config (HBaseConfiguration.)
_ (.set h-config "hbase.master", *hbase-master*)]
h-config))
(defn create-hbase-table [table-name & column-families]
(let [desc (HTableDescriptor. table-name)
_ (doall (map #(.addFamily desc (HColumnDescriptor. %)) column-families))
admin (HBaseAdmin. (hbase-config))]
(.createTable admin desc)))
(defn drop-hbase-table [hbase-table-name]
(let [admin (HBaseAdmin. (hbase-config))]
(.deleteTable hbase-table-name)))
(defn hbase-table [hbase-table-name]
(let [h-config (hbase-config)]
(HTable. h-config hbase-table-name)))