-
Notifications
You must be signed in to change notification settings - Fork 2
/
ohmCouchVersioned.ml
374 lines (276 loc) · 10.8 KB
/
ohmCouchVersioned.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
(* Ohm is © 2011 Victor Nicollet *)
open Ohm
open Universal
open BatPervasives
module type VERSIONED = sig
val name : string
module DataDB : Ohm.CouchDB.CONFIG
module Id : Ohm.CouchDB.ID
module VersionDB : Ohm.CouchDB.CONFIG
module Data : Ohm.Fmt.FMT
module Diff : Ohm.Fmt.FMT
type ctx
val couchDB : ctx -> Ohm.CouchDB.ctx
val apply :
Diff.t
-> ( ctx ,
Id.t
-> float
-> Data.t
-> ( ctx, Data.t ) Ohm.Run.t ) Ohm.Run.t
module VersionData : Ohm.Fmt.FMT
module ReflectedData : Ohm.Fmt.FMT
val reflect : Id.t -> Data.t -> ( ctx, ReflectedData.t ) Ohm.Run.t
end
module Make = functor (Versioned:VERSIONED) -> struct
module ObjectId = Versioned.Id
module Data = Versioned.Data
module Diff = Versioned.Diff
module VersionData = Versioned.VersionData
module ReflectedData = Versioned.ReflectedData
module VersionId : Id.PHANTOM = Id.Phantom
let couchDB what = ohm (Run.edit_context Versioned.couchDB what)
(* Basic object management *)
module DataDB = CouchDB.Database(Versioned.DataDB)
module Object = Fmt.Make(struct
module Float = Fmt.Float
type json t = <
initial "i" : Data.t ;
current "c" : Data.t ;
?time "t" : Float.t = 0.0 ;
reflected "r" : ReflectedData.t
>
end)
module Raw = Fmt.Make(struct
type json t = <
current "c" : Data.t ;
reflected "r" : ReflectedData.t
>
end)
module ObjectTable = CouchDB.Table(DataDB)(ObjectId)(Object)
type t = ObjectId.t * Object.t
let get id =
Run.edit_context Versioned.couchDB
(ObjectTable.get id |> Run.map (BatOption.map (fun obj -> id, obj)))
let id (id,_) = id
let current (_,obj) = obj # current
let reflected (_,obj) = obj # reflected
(* Basic version management *)
module VersionDB = CouchDB.Database(Versioned.VersionDB)
module VersionDesign = struct
module Database = VersionDB
let name = "versioned"
end
module Version = Fmt.Make(struct
module Float = Fmt.Float
type json t = <
id "i" : Id.t ;
time "t" : Float.t ;
data "d" : VersionData.t ;
diffs "v" : Diff.t list
>
end)
module VersionTable = CouchDB.Table(VersionDB)(VersionId)(Version)
type version = VersionId.t * Version.t
module VersionByIdView = CouchDB.DocView(struct
module Key = Fmt.Make(struct
type json t = (Id.t * float)
end)
module Value = Fmt.Unit
module Doc = Version
module Design = VersionDesign
let name = "by_id"
let map = "emit([doc.i,doc.t],null)"
end)
let get_versions_before oid ?(since=0.0) time =
Run.edit_context Versioned.couchDB begin
let id = ObjectId.to_id oid in
let! list = ohm $ VersionByIdView.doc_query
~startkey:(id,since) ~endkey:(id,time) ~endinclusive:false ()
in
return $ List.map (fun i -> VersionId.of_id (i # id), i # doc) list
end
let get_versions oid = get_versions_before oid max_float
let get_versions_since time oid = get_versions_before oid ~since:time max_float
let get_version vid =
Run.edit_context Versioned.couchDB begin
let! version = ohm_req_or (return None) $ VersionTable.get vid in
return $ Some (vid, version)
end
let version_time (_,v) = v # time
let version_data (_,v) = v # data
let version_diffs (_,v) = v # diffs
let version_object (_,v) = ObjectId.of_id v # id
let version_id (i,_) = i
(* Diff application *)
let apply_versions versions oid initial =
let diffs_of_version version =
let time = version_time version in
List.map (fun diff -> time, diff) (version_diffs version)
in
let pre_apply (time,diff) =
let! result = ohm $ Versioned.apply diff in
return (time, result)
in
let rec apply data = function
| [] -> return data
| (time,diff) :: diffs -> let! data = ohm $ diff oid time data in
apply data diffs
in
let diffs : (float * Diff.t) list = List.concat (List.map diffs_of_version versions) in
let! compiled_diffs = ohm $ Run.list_map pre_apply diffs in
let! current = ohm $ apply initial compiled_diffs in
return current
(* Advanced version management : snapshots *)
let version_snapshot v =
let oid = version_object v in
let! versions = ohm $ get_versions_before oid (version_time v) in
let versions = List.filter (fun v' -> version_id v <> version_id v') versions in
let! _, obj = ohm_req_or (return None) (get oid) in
let! before = ohm $ apply_versions versions oid (obj # initial) in
let! after = ohm $ apply_versions [v] oid before in
return $ Some (before, after)
(* Triggered signals *)
module Signals = struct
let call, version_create = Sig.make (Run.list_iter identity)
let version_create_call args = call args
let call, update = Sig.make (Run.list_iter identity)
let update_call args = call args
let call, explicit_reflect = Sig.make (Run.list_iter identity)
let explicit_reflect_call args = call args
end
(* Updating reflected data *)
let reflect oid =
let! ctx = ohmctx identity in
let update oid =
let! obj = ohm_req_or (return (None, `keep)) $ ObjectTable.get oid in
let! reflected = ohm $ Run.with_context ctx (Versioned.reflect oid (obj # current)) in
if reflected == obj # reflected then return (None, `keep) else
let obj = object
method initial = obj # initial
method current = obj # current
method time = obj # time
method reflected = reflected
end in
return (Some obj, `put obj)
in
let! obj = ohm_req_or (return ()) $
Run.edit_context Versioned.couchDB (ObjectTable.Raw.transaction oid update) in
let! () = ohm $ Signals.explicit_reflect_call (oid, obj) in
let! () = ohm $ Signals.update_call (oid, obj) in
return ()
(* Creating a new version *)
let refresh ?latest (oid : ObjectId.t) (default:Data.t option) =
let! ctx = ohmctx identity in
let update oid =
(* Determine what the "initial" state of the object should be.
This state will not necessarily be used to compute an upgrade
(because there's a possibility for a quick one-step diff) but
still has to be saved back to the database. *)
let! obj_opt = ohm $ ObjectTable.get oid in
let initial_opt = match obj_opt with
| None -> default
| Some obj -> Some (obj # initial)
in
Run.with_context ctx begin
(* No initial state : this must be an "update" of a non-existent
object. Just do nothing. *)
match initial_opt with
| None -> return (None, `keep)
| Some initial ->
(* Determine what diffs should be applied, and at what element they
should start ; apply them. The default situation is to apply all
diffs to the initial state, but a possible optimization happens
if the latest version is appended to the event history, in which
case it is enough to apply that version to the current state. *)
let! startAt, versions = ohm begin
match latest, obj_opt with
| Some v, Some obj when (snd v) # time > obj # time -> return (obj # current, [v])
| _ -> let! vs = ohm (get_versions oid) in return (initial, vs)
end in
let! current = ohm $ apply_versions versions oid startAt in
(* Finish setting up the object and save it. *)
let! reflected = ohm $ Versioned.reflect oid current in
let time = List.fold_left (fun t (_,v) -> max (v # time) t) 0.0 versions in
let obj = object
method initial = initial
method current = current
method reflected = reflected
method time = time
end in
return (Some (oid, obj), `put obj)
end
in
let! oid, obj = ohm_req_or (return None) $
Run.edit_context Versioned.couchDB (ObjectTable.Raw.transaction oid update) in
let! () = ohm $ Signals.update_call (oid, obj) in
return (Some (oid, obj))
let do_update ~id ~default ~diffs ~info () =
let time = Unix.gettimeofday () in
let vid = VersionId.gen () in
let version = object
method id = ObjectId.to_id id
method time = time
method data = info
method diffs = diffs
end in
let! () = ohm (Run.edit_context Versioned.couchDB (VersionTable.set vid version)) in
let! result = ohm_req_or (return None) (refresh ~latest:(vid,version) id default) in
(* The refresh might create the object, so wait until after the refresh to call
any relevant signals. *)
let! () = ohm (Signals.version_create_call (vid,version)) in
return (Some result)
let update ~id ~diffs ~info () =
do_update ~id ~default:None ~diffs ~info ()
let create ~id ~init ~diffs ~info () =
(* We provide a value, so a value should be returned *)
do_update ~id ~default:(Some init) ~diffs ~info () |> Run.map BatOption.get
let migrate manager name migrator =
let db = Versioned.DataDB.database in
let process id =
let! ctx = ohmctx identity in
let oid = Versioned.Id.of_id id in
let! () = ohm $ Run.edit_context Versioned.couchDB
(ObjectTable.Raw.transaction oid begin fun oid ->
let! original = ohm_req_or (return ((),`keep)) $ ObjectTable.get oid in
Run.with_context ctx begin
let! initial = ohm_req_or (return ((),`keep)) $ migrator oid original # initial in
let () = Util.log "Migrate : %s : %s/%s" name db (Id.to_string id) in
let! versions = ohm $ get_versions oid in
let! current = ohm $ apply_versions versions oid initial in
let! reflected = ohm $ Versioned.reflect oid current in
let time = List.fold_left (fun t (_,v) -> max (v # time) t) 0.0 versions in
let obj = object
method initial = initial
method current = current
method reflected = reflected
method time = time
end in
return ((), `put obj)
end
end) in
return ()
in
let source idopt =
Run.edit_context Versioned.couchDB begin
let! list, next = ohm $ ObjectTable.all_ids ~count:10
(BatOption.map Versioned.Id.of_id idopt) in
return (List.map Versioned.Id.to_id list, BatOption.map Versioned.Id.to_id next)
end
in
let task = Async.Convenience.foreach manager name Id.fmt source process in
task
let obliterate oid =
Run.edit_context Versioned.couchDB begin
(* Remove all versions first. *)
let id = ObjectId.to_id oid in
let! versions = ohm $ VersionByIdView.doc_query
~startkey:(id,0.0) ~endkey:(id,max_float) ()
in
let remove_version vid = VersionTable.delete vid in
let! _ = ohm $ Run.list_iter (#id %> VersionId.of_id %> remove_version) versions in
(* Remove the object itself *)
ObjectTable.delete oid
end
module Id = ObjectId
end