/
common.ml
404 lines (341 loc) · 10.9 KB
/
common.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
(*
This file is part of Arakoon, a distributed key-value store. Copyright
(C) 2010 Incubaid BVBA
Licensees holding a valid Incubaid license may use this file in
accordance with Incubaid's Arakoon commercial license agreement. For
more information on how to enter into this agreement, please contact
Incubaid (contact details can be found on www.arakoon.org/licensing).
Alternatively, this file may be redistributed and/or modified under
the terms of the GNU Affero General Public License version 3, as
published by the Free Software Foundation. Under this license, this
file is distributed in the hope that it will be useful, but WITHOUT
ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
FITNESS FOR A PARTICULAR PURPOSE.
See the GNU Affero General Public License for more details.
You should have received a copy of the
GNU Affero General Public License along with this program (file "COPYING").
If not, see <http://www.gnu.org/licenses/>.
*)
open Lwt
open Interval
open Routing
open Client_cfg
open Ncfg
let _MAGIC = 0xb1ff0000l
let _MASK = 0x0000ffffl
let _VERSION = 2
(*
let _STRLEN_SIZE = 4
let _CMD_SIZE = 4
let _BOOL_SIZE = 1
let _INT_SIZE = 4
*)
open Baardskeerder
type client_command =
| PING
| WHO_MASTER
| EXISTS
| GET
| ASSERT
| SET
| DELETE
| RANGE
| PREFIX_KEYS
| TEST_AND_SET
| LAST_ENTRIES
| RANGE_ENTRIES
| MIGRATE_RANGE
| SEQUENCE
| MULTI_GET
| EXPECT_PROGRESS_POSSIBLE
| STATISTICS
| COLLAPSE_TLOGS
| USER_FUNCTION
| GET_INTERVAL
| SET_INTERVAL
| GET_ROUTING
| SET_ROUTING
| SET_ROUTING_DELTA
| GET_KEY_COUNT
| GET_DB
| CONFIRM
| GET_FRINGE
| SET_NURSERY_CFG
| GET_NURSERY_CFG
| REV_RANGE_ENTRIES
| SYNCED_SEQUENCE
let code2int = [
PING, 0x1l ;
WHO_MASTER, 0x2l ;
EXISTS, 0x7l ;
GET, 0x8l ;
SET, 0x9l ;
DELETE, 0xal ;
RANGE, 0xbl ;
PREFIX_KEYS, 0xcl ;
TEST_AND_SET, 0xdl ;
LAST_ENTRIES, 0xel ;
RANGE_ENTRIES, 0xfl ;
SEQUENCE, 0x10l;
MULTI_GET, 0x11l;
EXPECT_PROGRESS_POSSIBLE, 0x12l;
STATISTICS , 0x13l;
COLLAPSE_TLOGS , 0x14l;
USER_FUNCTION , 0x15l;
ASSERT , 0x16l;
SET_INTERVAL , 0x17l;
GET_ROUTING , 0x18l;
SET_ROUTING , 0x19l;
GET_KEY_COUNT , 0x1al;
GET_DB , 0x1bl;
CONFIRM , 0x1cl;
GET_FRINGE , 0x1dl;
GET_INTERVAL , 0x1el;
SET_NURSERY_CFG , 0x1fl;
GET_NURSERY_CFG , 0x20l;
SET_ROUTING_DELTA , 0x21l;
MIGRATE_RANGE , 0x22l;
REV_RANGE_ENTRIES , 0x23l;
SYNCED_SEQUENCE , 0x24l;
]
let int2code =
let r = Hashtbl.create 41 in
let () = List.iter (fun (a,b) -> Hashtbl.add r b a) code2int in
r
let lookup_code i32 = Hashtbl.find int2code i32
let command_to output command =
let c = List.assoc command code2int in
let masked = Int32.logor c _MAGIC in
let masked_i = Int32.to_int masked in
Pack.size_to output masked_i
let nothing = fun ic -> Lwt.return ()
let input_nothing = fun pack -> ()
let value_array ic =
Llio.input_int ic >>= fun size ->
let result = Array.create size "" in
let rec loop i =
if i = size
then Lwt.return result
else
begin
Llio.input_string ic >>= fun s ->
result.(i) <- s;
loop (i+1)
end
in loop 0
let kv_array ic =
Llio.input_int ic >>= fun size ->
let result = Array.create size ("","") in
let rec loop i =
if i = size
then Lwt.return result
else
begin
Llio.input_string_pair ic >>= fun p ->
result.(i) <- p;
loop (i+1)
end
in loop 0
let request oc f =
let out = Pack.make_output 64 in
let () = f out in
let s = Pack.close_output out in
Lwt_io.write oc s >>= fun () ->
Lwt_io.flush oc
let response_limited ic (f) =
Llio.input_int ic >>= fun size ->
let buffer = String.create size in
Lwt_io.read_into_exactly ic buffer 0 size >>= fun () ->
let pack = Pack.make_input buffer 0 in
let rc = Pack.input_vint pack in
match rc with
| 0 -> Client_log.debug "Client operation succeeded" >>= fun () ->
let a = f pack in Lwt.return a
| rc -> Client_log.debug_f "Client operation failed: %d" rc >>= fun () ->
let msg = Pack.input_string pack in
let rc = Arakoon_exc.rc_of_int rc in
Lwt.fail (Arakoon_exc.Exception (rc, msg))
let response_old ic f =
Llio.input_int ic >>= function
| 0 -> (Client_log.debug "Client operation succeeded" >>= fun () -> f ic)
| rc ->
Client_log.debug_f "Client operation failed: %x" rc >>= fun () ->
Llio.input_string ic >>= fun msg ->
let rc = Arakoon_exc.rc_of_int rc in
Lwt.fail (Arakoon_exc.Exception (rc, msg))
let exists_to out ~allow_dirty key =
command_to out EXISTS;
Pack.bool_to out allow_dirty;
Pack.string_to out key
let get_to ~allow_dirty out key =
command_to out GET;
Pack.bool_to out allow_dirty;
Pack.string_to out key
let assert_to ~allow_dirty out key vo =
command_to out ASSERT;
Pack.bool_to out allow_dirty;
Pack.string_to out key;
Pack.string_option_to out vo
let set_to out key value =
command_to out SET;
Pack.string_to out key;
Pack.string_to out value
let confirm_to out key value =
command_to out CONFIRM;
Pack.string_to out key;
Pack.string_to out value
let delete_to out key =
command_to out DELETE;
Pack.string_to out key
let _range_params b cmd ~allow_dirty first finc last linc max =
command_to b cmd;
Pack.bool_to b allow_dirty;
Pack.string_option_to b first;
Pack.bool_to b finc;
Pack.string_option_to b last;
Pack.bool_to b linc;
Pack.option_to b Pack.vint_to max
let range_to b ~allow_dirty first finc last linc (max:int option) =
_range_params b RANGE ~allow_dirty first finc last linc max
let range_entries_to b ~allow_dirty first finc last linc max =
_range_params b RANGE_ENTRIES ~allow_dirty first finc last linc max
let rev_range_entries_to b ~allow_dirty first finc last linc max =
_range_params b REV_RANGE_ENTRIES ~allow_dirty first finc last linc max
let prefix_keys_to b ~allow_dirty prefix max =
command_to b PREFIX_KEYS;
Pack.bool_to b allow_dirty;
Pack.string_to b prefix;
Pack.vint_to b max
let test_and_set_to b key expected wanted =
command_to b TEST_AND_SET;
Pack.string_to b key;
Pack.string_option_to b expected;
Pack.string_option_to b wanted
let user_function_to b name po =
command_to b USER_FUNCTION;
Pack.string_to b name;
Pack.string_option_to b po
let multiget_to b ~allow_dirty keys =
command_to b MULTI_GET;
Pack.bool_to b allow_dirty;
Pack.list_to b Pack.string_to keys
let who_master_to b =
command_to b WHO_MASTER
let expect_progress_possible_to b =
command_to b EXPECT_PROGRESS_POSSIBLE
let ping_to b client_id cluster_id =
command_to b PING;
Pack.string_to b client_id;
Pack.string_to b cluster_id
let get_key_count_to b =
command_to b GET_KEY_COUNT
let get_nursery_cfg_to b =
command_to b GET_NURSERY_CFG
let set_nursery_cfg_to b cluster_id client_cfg =
command_to b SET_NURSERY_CFG;
Pack.string_to b cluster_id;
ClientCfg.cfg_to b client_cfg
let prologue cluster (_,oc) =
Llio.output_int32 oc _MAGIC >>= fun () ->
Llio.output_int oc _VERSION >>= fun () ->
Llio.output_string oc cluster
let who_master (ic,oc) =
request oc (fun buf -> who_master_to buf) >>= fun () ->
response_limited ic Pack.input_string_option
let set (ic,oc) key value =
request oc (fun buf -> set_to buf key value) >>= fun () ->
response_limited ic (fun _ -> ())
let get (ic,oc) ~allow_dirty key =
request oc (fun buf -> get_to ~allow_dirty buf key) >>= fun () ->
response_limited ic Pack.input_string
let get_fringe (ic,oc) boundary direction =
let outgoing buf =
command_to buf GET_FRINGE;
Pack.string_option_to buf boundary;
match direction with
| Routing.UPPER_BOUND -> Pack.vint_to buf 0
| Routing.LOWER_BOUND -> Pack.vint_to buf 1
in
request oc outgoing >>= fun () ->
Client_log.debug "get_fringe request sent" >>= fun () ->
response_old ic Llio.input_kv_list
let set_interval(ic,oc) iv =
Client_log.debug "set_interval" >>= fun () ->
let outgoing buf =
command_to buf SET_INTERVAL;
Interval.interval_to buf iv
in
request oc outgoing >>= fun () ->
Client_log.debug "set_interval request sent" >>= fun () ->
response_old ic nothing
let get_interval (ic,oc) =
Client_log.debug "get_interval" >>= fun () ->
let outgoing buf =
command_to buf GET_INTERVAL
in
request oc outgoing >>= fun () ->
Client_log.debug "get_interval request sent" >>= fun () ->
response_old ic Interval.input_interval
let get_routing (ic,oc) =
let outgoing buf = command_to buf GET_ROUTING
in
request oc outgoing >>= fun () ->
response_old ic Routing.input_routing
let set_routing (ic,oc) r =
let outgoing out =
command_to out SET_ROUTING;
Routing.routing_to out r;
in
request oc outgoing >>= fun () ->
response_old ic nothing
let set_routing_delta (ic,oc) left sep right =
let outgoing out =
command_to out SET_ROUTING_DELTA;
Pack.string_to out left;
Pack.string_to out sep;
Pack.string_to out right;
in
Client_log.debug "Changing routing" >>= fun () ->
request oc outgoing >>= fun () ->
Client_log.debug "set_routing_delta sent" >>= fun () ->
response_old ic nothing
let _build_sequence_request output changes =
let update_buf = Buffer.create (32 * List.length changes) in
let rec c2u = function
| Arakoon_client.Set (k,v) -> Core.SET(k,v)
| Arakoon_client.Delete k -> Core.DELETE k
| Arakoon_client.Sequence cs -> Core.SEQUENCE (List.map c2u cs)
| Arakoon_client.Assert(k,vo) -> Core.ASSERT (k,vo)
in
let updates = List.map c2u changes in
let seq = Core.SEQUENCE updates in
let () = Core.update_to update_buf seq in
let () = Pack.string_to output (Buffer.contents update_buf)
in ()
let migrate_range (ic,oc) interval changes =
let outgoing out =
command_to out MIGRATE_RANGE;
Interval.interval_to out interval;
_build_sequence_request out changes
in
request oc (fun buf -> outgoing buf) >>= fun () ->
response_old ic nothing
let _sequence (ic,oc) changes cmd =
let outgoing buf =
command_to buf cmd;
_build_sequence_request buf changes
in
request oc (fun buf -> outgoing buf) >>= fun () ->
response_limited ic input_nothing
let sequence conn changes = _sequence conn changes SEQUENCE
let synced_sequence conn changes = _sequence conn changes SYNCED_SEQUENCE
let get_nursery_cfg (ic,oc) =
request oc get_nursery_cfg_to >>= fun () ->
response_limited ic NCFG.ncfg_from
let set_nursery_cfg (ic,oc) clusterid cfg =
let outgoing buf =
set_nursery_cfg_to buf clusterid cfg
in
request oc outgoing >>= fun () ->
response_old ic nothing
exception XException of Arakoon_exc.rc * string