/
clomert.clj
192 lines (157 loc) · 6.06 KB
/
clomert.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
(ns clomert
(:import (voldemort.client
ClientConfig
StoreClientFactory
SocketStoreClientFactory
StoreClient
UpdateAction)
(voldemort.versioning Version Versioned VectorClock)))
;; factory functionality
(defmacro get-prop [keyword]
(let [rewrite-keyword (fn [#^String kw]
(str (.replace
(.toUpperCase (.substring kw 1))
"-" "_")
"_PROPERTY"))
keyword-rewritten (rewrite-keyword (str keyword))]
`(. ClientConfig ~(symbol keyword-rewritten))))
(defn make-client-config
([config-map]
(let [props (new java.util.Properties)]
(doseq [[k v] config-map]
(doto props
(.setProperty (get-prop k) (.toString v))))
(new ClientConfig props))))
(defn socket-store-client-factory
"Create a socket store client factory [Deprecated]."
([urls] (socket-store-client-factory (new ClientConfig) urls))
([#^ClientConfig client-config urls]
(new SocketStoreClientFactory (doto client-config
(.setBootstrapUrls urls)))))
(defmulti make-socket-store-client-factory
"Create a socket store client factory."
class)
(defmethod make-socket-store-client-factory ClientConfig [c]
(new SocketStoreClientFactory c))
(defmethod make-socket-store-client-factory :default [urls]
(new SocketStoreClientFactory (make-client-config
{:bootstrap-urls urls})))
(defn make-store-client [#^StoreClientFactory factory store-name]
"Create a store client from a factory."
(.getStoreClient factory store-name))
;; basic store client functions
(defn store-put
"Unconditional put, clobber the existing value if set."
([#^StoreClient client key value]
(.put client key value)))
(defn store-conditional-put
"Conditional put, if version is obsolete throw ObsoleteVersionException."
([#^StoreClient client key #^Versioned value]
(.put client key value)))
(defn store-put-if-not-obsolete
"Like conditional-put, but return false instead of throwing an exception."
([#^StoreClient client key #^Versioned value]
(.putIfNotObsolete client key value)))
(defn store-get
"Get a value and its version from a store."
([#^StoreClient client key]
(.get client key))
([#^StoreClient client key #^Versioned default]
(.get client key default)))
(defn store-get-all
"Perform a multi-get on a store, return map key => (version, value)."
([#^StoreClient client values]
(.getAll client values)))
(defn store-get-value
"Get a value from a store."
([#^StoreClient client key]
(.getValue client key))
([#^StoreClient client key default]
(.getValue client key default)))
(defn store-delete
"Delete a value."
([#^StoreClient client key]
(.delete client key))
([#^StoreClient client key #^Version version]
(.delete client key version)))
(defn store-apply-update
"Perform an update inside an optimistic lock."
([#^StoreClient client update-fn]
(store-apply-update 3 client update-fn))
([#^Integer max-tries #^StoreClient client update-fn]
(store-apply-update max-tries client update-fn (fn [])))
([#^Integer max-tries #^StoreClient client update-fn rollback-fn]
(.applyUpdate client
(proxy [UpdateAction] []
(update [#^StoreClient x]
(update-fn x))
(rollback []
(rollback-fn)))
max-tries)))
(defn store-preflist [key]
"Get a preference list of responsible nodes for a key."
(.getResponsibleNodes key))
(defn store-do-op [#^StoreClient store op & args]
(cond (= op :put)
(let [key (first args)
value (second args)]
(store-put store key value))
(= op :conditional-put)
(let [key (first args)
#^Versioned value (second args)]
(store-conditional-put store key value))
(= op :put-if-not-obsolete)
(let [key (first args)
#^Versioned value (second args)]
(store-put-if-not-obsolete store key value))
(= op :get)
(let [key (first args)]
(store-get store key))
(= op :get-all)
(let [keys (first args)]
(store-get-all store keys))
(= op :get-value)
(let [key (first args)
default (second args)]
(if (nil? default)
(store-get-value store key)
(store-get-value store key default)))
(= op :delete)
(let [key (first args)
version (second args)]
(if (nil? version)
(store-delete store key)
(store-delete store key version)))
true
(throw (new IllegalArgumentException (str "No such operation" op)))))
(defmacro do-store [#^StoreClient store & forms]
(let [forms-rewritten (map (fn [form]
`(store-do-op ~store ~@form))
forms)]
`(do ~@forms-rewritten)))
;; versions and vector clocks
(defn make-versioned
"Create a new (version, value) tupple."
([object] (new Versioned object))
([object #^Version v]
(new Versioned object v)))
(defn versioned-value [#^Versioned v]
"Get the value of a (version, value) tupple."
(.getValue v))
(defn versioned-set-value! [#^Versioned v obj]
"Mutate the value in (version, value) tupple."
(doto v
(.setObject obj)))
(defn vector-clock-increment!
"Mutably increment a vector clock."
([#^VectorClock version #^Integer node]
(vector-clock-increment! version node (System/currentTimeMillis)))
([#^VectorClock version #^Integer node #^Long ts]
(doto version
(.incrementVersion node ts))))
(defn vector-clock-incremented
"Make an incremented copy of a vector clock."
([#^VectorClock version #^Integer node]
(vector-clock-incremented version node (System/currentTimeMillis)))
([#^VectorClock version #^Integer node #^Long ts]
(.incremented version node ts)))