-
Notifications
You must be signed in to change notification settings - Fork 3
/
lmdb.ml
604 lines (494 loc) · 15.9 KB
/
lmdb.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
open Ctypes
open PosixTypes
module S = Lmdb_bindings.Make(Lmdb_generated)
open S
open Lmdb_bindings.T
let dummy_ref = Weak.create 1
(* keep a alive while b is alive *)
let alive_while a b =
Gc.finalise (fun _ -> Weak.set dummy_ref 0 (Some (Obj.repr a))) b
let alloc = allocate_n ~count:1
let opt_iter f = function
| None -> ()
| Some x -> f x
(** {2 High level binding} *)
let version () =
let major = alloc int in
let minor = alloc int in
let patch = alloc int in
let s = mdb_version major minor patch in
(s, !@major, !@minor, !@patch)
type error = int
exception Not_found = Lmdb_bindings.Not_found
exception Exists = Lmdb_bindings.Exists
exception Error = Lmdb_bindings.Error
let () =
Printexc.register_printer @@ function
| Error i -> Some ("Lmdb.Error(" ^ mdb_strerror i ^ ")")
| Exists -> Some "Lmdb.Exists"
| _ -> None
let pp_error fmt i =
Format.fprintf fmt "%s@." (mdb_strerror i)
type 'a cap =
| Ro : [ `Read ] cap
| Rw : [ `Read | `Write ] cap
let ro = Ro
let rw = Rw
module Env = struct
type t = mdb_env
(* exception Assert of (t * string) *)
module Flags = struct
type t = mdb_env_flag
let (+) = Unsigned.UInt.logor
let test f m = Unsigned.UInt.(compare (logand f m) zero <> 0)
let eq f f' = Unsigned.UInt.(compare f f' = 0)
let none = Unsigned.UInt.zero
let fixed_map = mdb_FIXEDMAP
let no_subdir = mdb_NOSUBDIR
let no_sync = mdb_NOSYNC
let read_only = mdb_RDONLY
let no_meta_sync = mdb_NOMETASYNC
let write_map = mdb_NOMETASYNC
let map_async = mdb_MAPASYNC
let no_tls = mdb_NOTLS
let no_lock = mdb_NOLOCK
let no_read_ahead = mdb_NORDAHEAD
let no_mem_init = mdb_NOMEMINIT
end
let create ?max_readers ?map_size ?max_dbs ?(flags=Flags.none) ?(mode=0o755) path =
let mode = Mode.of_int mode in
let env_ptr = alloc mdb_env in
mdb_env_create env_ptr ;
let env = !@env_ptr in
try
opt_iter (mdb_env_set_mapsize env) map_size ;
opt_iter (mdb_env_set_maxdbs env) max_dbs ;
opt_iter (mdb_env_set_maxreaders env) max_readers ;
(* mdb_env_set_assert env (fun env s -> raise (Assert (env,s))) ; *)
mdb_env_open env path flags mode ;
Gc.finalise mdb_env_close env ;
env
with Error _ as exn -> mdb_env_close env; raise exn
module CopyFlags = struct
type t = mdb_copy_flag
let none : t = Unsigned.UInt.zero
let compact = mdb_CP_COMPACT
end
let copy ?(compact=false) db s =
let flag = if compact then CopyFlags.compact else CopyFlags.none in
mdb_env_copy2 db s flag
let copyfd ?(compact=false) env (fd : Unix.file_descr) =
let flag = if compact then CopyFlags.compact else CopyFlags.none in
mdb_env_copyfd2 env fd flag
let set_flags = mdb_env_set_flags
let flags env =
let flags = alloc mdb_env_flag in
mdb_env_get_flags env flags ;
!@flags
let set_map_size = mdb_env_set_mapsize
let path env =
let path = alloc string in
mdb_env_get_path env path ;
!@path
let fd env =
let fd = alloc mdb_filehandle_t in
mdb_env_get_fd env fd ;
!@fd
let sync ?(force=false) env = mdb_env_sync env force
let max_readers env =
let i = alloc int in
mdb_env_get_maxreaders env i ;
!@i
let max_keysize = mdb_env_get_maxkeysize
let reader_list env =
let x = ref [] in
mdb_reader_list env (fun s _ -> x:= s::!x ; 0) null ;
!x
let readers env =
let i = alloc int in
mdb_reader_check env i ;
!@i
(** Stat type *)
type stats = {
psize : int ;
depth : int ;
branch_pages : int ;
leaf_pages : int ;
overflow_pages : int ;
entries : int ;
}
let make_stats stat = {
psize = getf stat ms_psize ;
depth = getf stat ms_depth ;
branch_pages = getf stat ms_branch_pages ;
leaf_pages = getf stat ms_leaf_pages ;
overflow_pages = getf stat ms_overflow_pages ;
entries = getf stat ms_entries ;
}
let stats env =
let stats = make mdb_stat in
mdb_env_stat env (addr stats) ;
make_stats stats
end
module Txn :
sig
type -'a t = mdb_txn constraint 'a = [< `Read | `Write ]
val go :
'a cap ->
Env.t ->
?txn:([< `Read | `Write ] as 'a) t ->
('a t -> 'b) -> 'b option
val abort : 'a t -> 'b
val env : 'a t -> Env.t
(* not exported: *)
val trivial :
'a cap ->
Env.t ->
?txn:'a t ->
('a t -> 'b) -> 'b
end
=
struct
type -'a t = mdb_txn constraint 'a = [< `Read | `Write ]
exception Abort of mdb_txn
let env txn = mdb_txn_env txn
let abort txn = raise (Abort txn)
let go :
'a cap ->
Env.t ->
?txn: 'a t ->
('a t -> 'b) -> 'b option
=
fun (type c) (rw :c cap) env ?txn:parent f ->
let ptr_txn = alloc mdb_txn in
let txn_flag =
match rw with
| Ro -> Env.Flags.read_only
| Rw -> Env.Flags.none
in
mdb_txn_begin env parent txn_flag ptr_txn ;
let txn = !@ptr_txn in
try
let x = f txn in
mdb_txn_commit txn ; Some x
with
| Abort t' when t' == txn || parent = None ->
mdb_txn_abort txn ; None
| exn -> mdb_txn_abort txn ; raise exn
(* Used internally for trivial functions, not exported. *)
let trivial :
'a cap ->
Env.t ->
?txn:'a t ->
('a t -> 'b) -> 'b
=
fun rw e ?txn f ->
match txn with
| Some txn ->
let e' = env txn in
if ptr_compare e' e <> 0
then invalid_arg
"Lmdb: database and transaction are from different environments";
f txn
| None ->
match go rw e f with
| None -> assert false
| Some x -> x
end
module PutFlags = struct
type t = mdb_put_flag
let (+) = Unsigned.UInt.logor
let test f m = Unsigned.UInt.(compare (logand f m) zero <> 0)
let eq f f' = Unsigned.UInt.(compare f f' = 0)
let none = Unsigned.UInt.zero
let no_overwrite = mdb_NOOVERWRITE
let no_dup_data = mdb_NODUPDATA
let current = mdb_CURRENT
let _reserve = mdb_RESERVE
let append = mdb_APPEND
let append_dup = mdb_APPENDDUP
let _multiple = mdb_MULTIPLE
end
module Flags = struct
type t = mdb_env_flag
let (+) = Unsigned.UInt.logor
let test f m = Unsigned.UInt.(compare (logand f m) zero <> 0)
let eq f f' = Unsigned.UInt.(compare f f' = 0)
let none = Unsigned.UInt.zero
let reverse_key = mdb_REVERSEKEY
let dup_sort = mdb_DUPSORT
let dup_fixed = mdb_DUPFIXED
let integer_dup = mdb_INTEGERDUP
let reverse_dup = mdb_REVERSEDUP
let integer_key = mdb_INTEGERKEY
(* Not exported *)
let create = mdb_CREATE
end
module Bigstring = Bigstringaf
type buffer = Bigstring.t
module Values = struct
module Flags = Flags
type db_val = buffer
module type S = sig
type t
val default_flags : Flags.t
val read : db_val -> t
val write : (int -> db_val) -> t -> db_val
end
module Int : S with type t = int = struct
type t = int
let default_flags = Flags.none
let write, read =
match Sys.big_endian, (Sys.int_size + 7) / 8 * 8 with
| true, 32 ->
(fun alloc x ->
let a = alloc 4 in
Bigstring.set_int32_be a 0 Int32.(of_int x);
a),
(fun a -> Bigstring.get_int32_be a 0 |> Int32.to_int)
| true, 64 ->
(fun alloc x ->
let a = alloc 8 in
Bigstring.set_int64_be a 0 Int64.(of_int x);
a),
(fun a -> Bigstring.get_int64_be a 0 |> Int64.to_int)
| false, 32 ->
(fun alloc x ->
let a = alloc 4 in
Bigstring.set_int32_le a 0 Int32.(of_int x);
a),
(fun a -> Bigstring.get_int32_le a 0 |> Int32.to_int)
| false, 64 ->
(fun alloc x ->
let a = alloc 8 in
Bigstring.set_int64_le a 0 Int64.(of_int x);
a),
(fun a -> Bigstring.get_int64_le a 0 |> Int64.to_int)
| _ -> failwith "Lmdb: Unsupported integer size"
end
module String : S with type t = string = struct
type t = string
let default_flags = Flags.none
let write, read =
(fun alloc s ->
let len = String.length s in
let a = alloc len in
Bigstring.blit_from_string s ~src_off:0 a ~dst_off:0 ~len;
a),
(fun a -> Bigstring.substring a ~off:0 ~len:(Bigstring.length a))
end
module Elt = struct
module Int = Int
module String = String
end
module Key = struct
module Int = struct
include Elt.Int
let default_flags = Flags.integer_key
end
module String = String
end
end
module Make (Key : Values.S) (Elt : Values.S) = struct
let def_flags = Flags.(Key.default_flags + Elt.default_flags)
let has_dup_flag = Flags.(test dup_sort) def_flags
type t = {env : Env.t ; db : mdb_dbi }
type key = Key.t
type elt = Elt.t
let dbval_of_buffer b =
let mvp = addr (make mdb_val) in
alive_while b mvp; (* Make sure buffer will stay alive while mvp is in use. *)
(mvp |-> mv_size) <-@ Bigstring.length b ;
(mvp |-> mv_data) <-@ to_voidp @@ bigarray_start array1 b ; (* Reference to b is lost here! *)
(*Gc.full_major () ;*) (* trigger possible use-after-free. *)
mvp
let buffer_of_dbval mvp =
(* no need to keep a reference to mvp here, because the memory the bigarray
* is mapped to is in the lmdb map and only valid as long as the transaction
* is alive. The user knows this. *)
bigarray_of_ptr array1
(!@ (mvp |-> mv_size))
Char
(!@ (mvp |-> mv_data) |> from_voidp char)
let write f v =
f Bigstring.create v
let create ?(create=true) env name =
let db = alloc mdb_dbi in
let flags =
if create
then Flags.(create + def_flags)
else def_flags
in
let f txn = mdb_dbi_open txn name flags db in
if create
then Txn.trivial rw env f
else Txn.trivial ro env f ;
(* We do not put a finaliser here, as it would break with mdb_drop.
Slight memory leak, but nothing terrible. *)
(* Gc.finalise mdb_dbi_close env !@db *)
{ env ; db = !@db }
let stats { env ; db } =
let stats = make mdb_stat in
Txn.trivial ro env (fun t ->
mdb_dbi_stat t db (addr stats)
) ;
Env.make_stats stats
let _flags { env ; db } =
let flags = alloc mdb_dbi_open_flag in
Txn.trivial ro env (fun t ->
mdb_dbi_flags t db flags
) ;
!@flags
let drop ?(delete=false) { env ; db } =
Txn.trivial rw env (fun t ->
mdb_drop t db delete
)
let get { db ; env } ?txn k =
let v = addr @@ make mdb_val in
Txn.trivial ro ?txn env (fun t ->
mdb_get t db (dbval_of_buffer @@ write Key.write k) v;
Elt.read @@ buffer_of_dbval v)
let put { db ; env } ?txn ?(flags=PutFlags.none) k v =
Txn.trivial rw ?txn env (fun t ->
mdb_put t db
(dbval_of_buffer @@ write Key.write k)
(dbval_of_buffer @@ write Elt.write v)
flags
)
let append db ?txn ?(flags=PutFlags.none) k v =
let flags =
let open PutFlags in
flags +
if has_dup_flag then PutFlags.append_dup else PutFlags.append
in
put db ?txn ~flags k v
let remove { db ; env } ?txn ?elt k =
Txn.trivial rw ?txn env (fun t ->
match elt with
| Some v ->
mdb_del t db
(dbval_of_buffer @@ write Key.write k)
(dbval_of_buffer @@ write Elt.write v)
| None ->
mdb_del t db
(dbval_of_buffer @@ write Key.write k)
(from_voidp mdb_val null)
)
let compare_key { db ; env } ?txn x y =
Txn.trivial ro ?txn env @@ fun txn ->
mdb_cmp txn db
(dbval_of_buffer @@ write Key.write x)
(dbval_of_buffer @@ write Key.write y)
let compare_elt { db ; env } ?txn :elt -> elt -> int =
if not has_dup_flag then
invalid_arg "Lmdb: elements are only comparable in a dup_sort db";
fun x y ->
Txn.trivial ro ?txn env @@ fun txn ->
mdb_dcmp txn db
(dbval_of_buffer @@ write Elt.write x)
(dbval_of_buffer @@ write Elt.write y)
let compare = compare_key
module Cursor = struct
type -'a t = mdb_cursor constraint 'a = [< `Read | `Write ]
let go rw db ?txn f =
let ptr_cursor = alloc mdb_cursor in
Txn.trivial rw ?txn db.env @@ fun txn ->
mdb_cursor_open txn db.db ptr_cursor ;
let cursor : _ t = !@ptr_cursor in
try
let res = f cursor in
mdb_cursor_close cursor ;
res
with exn -> mdb_cursor_close cursor ; raise exn
let put cursor ?(flags=PutFlags.none) k v =
mdb_cursor_put cursor
(dbval_of_buffer @@ write Key.write k)
(dbval_of_buffer @@ write Elt.write v)
flags
let put_here cursor ?(flags=PutFlags.none) k v =
put ~flags:PutFlags.(current + flags) cursor k v
let remove cursor ?(all=false) () =
let flag =
if all
then
if has_dup_flag then PutFlags.no_dup_data
else raise @@ Invalid_argument (Printf.sprintf
"Lmdb.Cursor.del: Optional argument ~all unsuported: \
this database does not have the dupsort flag enabled.")
else PutFlags.none
in
mdb_cursor_del cursor flag
let get_prim op cursor =
let k = addr @@ make mdb_val in
let v = addr @@ make mdb_val in
mdb_cursor_get cursor k v op ;
Key.read @@ buffer_of_dbval k,
Elt.read @@ buffer_of_dbval v
let get = get_prim MDB_GET_CURRENT
let first = get_prim MDB_FIRST
let last = get_prim MDB_LAST
let next = get_prim MDB_NEXT
let prev = get_prim MDB_PREV
let seek_prim op cursor k =
let v = addr @@ make mdb_val in
mdb_cursor_get cursor
(dbval_of_buffer @@ write Key.write k)
v op ;
Elt.read @@ buffer_of_dbval v
let seek = seek_prim MDB_SET
let seek_range = seek_prim MDB_SET_RANGE
let test_dup s f op cursor =
if has_dup_flag then f op cursor
else raise @@ Invalid_argument (Printf.sprintf
"Lmdb.Cursor.%s: Operation unsuported: this database does not have the \
dupsort flag enabled." s)
let first_dup = test_dup "first_dup" get_prim MDB_FIRST_DUP
let last_dup = test_dup "last_dup" get_prim MDB_LAST_DUP
let next_dup = test_dup "next_dup" get_prim MDB_NEXT_DUP
let prev_dup = test_dup "prev_dup" get_prim MDB_PREV_DUP
let seek_dup = test_dup "seek_dup" seek_prim MDB_GET_BOTH
let seek_range_dup = test_dup "seek_range_dup" seek_prim MDB_GET_BOTH_RANGE
(* The following two operations are not exposed, due to inherent unsafety:
- MDB_GET_MULTIPLE
- MDB_NEXT_MULTIPLE
*)
end
end
module Db = Make (Values.Key.String) (Values.Elt.String)
module IntDb = Make (Values.Key.Int) (Values.Elt.String)
module type S = sig
type t
type key
type elt
val create : ?create:bool -> Env.t -> string -> t
val get : t -> ?txn:[> `Read] Txn.t -> key -> elt
val put : t -> ?txn:[> `Read] Txn.t -> ?flags:PutFlags.t -> key -> elt -> unit
val append : t -> ?txn:[> `Read] Txn.t -> ?flags:PutFlags.t -> key -> elt -> unit
val remove : t -> ?txn:[> `Read] Txn.t -> ?elt:elt -> key -> unit
module Cursor : sig
type db
type -'a t constraint 'a = [< `Read | `Write ]
val go : 'c cap -> db -> ?txn:'c Txn.t -> ('c t -> 'a) -> 'a
val get : _ t -> key * elt
val put : [> `Write ] t -> ?flags:PutFlags.t -> key -> elt -> unit
val put_here : [> `Write ] t -> ?flags:PutFlags.t -> key -> elt -> unit
val remove : [> `Write ] t -> ?all:bool -> unit -> unit
val first : _ t -> key * elt
val last : _ t -> key * elt
val next : _ t -> key * elt
val prev : _ t -> key * elt
val seek : _ t -> key -> elt
val seek_range : _ t -> key -> elt
val first_dup : _ t -> key * elt
val last_dup : _ t -> key * elt
val next_dup : _ t -> key * elt
val prev_dup : _ t -> key * elt
val seek_dup : _ t -> key -> elt
val seek_range_dup : _ t -> key -> elt
end with type db := t
val stats : t -> Env.stats
val drop : ?delete:bool -> t -> unit
val compare_key : t -> ?txn:_ Txn.t -> key -> key -> int
val compare : t -> ?txn:_ Txn.t -> key -> key -> int
val compare_elt : t -> ?txn:_ Txn.t -> elt -> elt -> int
end