Skip to content
This repository
Newer
Older
100644 388 lines (318 sloc) 10.991 kb
991ccf11 »
2012-04-29 Initial import of Ohm
1 (* Ohm is © 2011 Victor Nicollet *)
2
3 open Util
4 open Json_type
5 open BatPervasives
6 open Universal
7
8 exception Error of string
9
10 type 'arg result =
11 | Finished of 'arg
12 | Failed
13 | Initial of 'arg
14 | Waiting of 'arg
15 | Partial of 'arg * int * int
16
17 type 'arg token = 'arg Fmt.t * Id.t
18
19 type 'arg implementation = 'arg -> 'arg token -> (CouchDB.ctx,'arg result) Run.t
20
21 type 'arg action = 'arg Fmt.t * string
22
23 let _actions = Hashtbl.create 100
24
25 let register name fmt impl =
26 let json_impl : Json_type.t implementation = fun json (_,tok) ->
27 match fmt.Fmt.of_json json with
28 | None ->
29 log "Action.process : action %s : argument unserialize error" name ;
30 Run.return Failed
31 | Some arg ->
32 impl arg (fmt,tok) |> Run.map begin function
33 | Finished result -> Finished (fmt.Fmt.to_json result)
34 | Failed -> Failed
35 | Waiting data -> Waiting (fmt.Fmt.to_json data)
36 | Initial data -> Initial (fmt.Fmt.to_json data)
37 | Partial (data, did, todo) -> Partial (fmt.Fmt.to_json data, did, todo)
38 end
39 in
40
41 begin
42 try
43 let _ = Hashtbl.find _actions name in
44 log "Action.register : ERROR : duplicate action %s" name ;
45 with Not_found -> ()
46 end;
47
48 Hashtbl.add _actions name json_impl ;
49
50 (fmt, name)
51
52 let declare name fmt =
53 (fmt,name)
54
55 let define (fmt,name) impl =
56 ignore (register name fmt impl)
57
58 let to_id (_, id) = id
59 let of_id fmt id = (fmt,id)
60
61 module Background = struct
62
63 let todo = ref []
64
65 let register count f =
66 todo := (count,f) :: !todo
67
68 let process () =
69
70 let continue f = try Run.eval (new CouchDB.init_ctx) f with _ -> false in
71
72 let rec aux = function
73 | [] -> false
74 | (0,_) :: t -> aux t
75 | (n,f) :: t -> let continue = continue f in
76 let result = aux (((if continue then n - 1 else 0), f) :: t) in
77 continue || result
78 in
79
80 aux !todo
81
82 end
83
84 module type TASK = sig
85 val call : 'arg action -> 'arg -> (#CouchDB.ctx,'arg token) Run.t
86 val delay : float -> 'arg action -> 'arg -> (#CouchDB.ctx,'arg token) Run.t
87 val prepare : 'arg action -> 'arg -> (#CouchDB.ctx,'arg token) Run.t
88 val start : 'arg token -> (#CouchDB.ctx, unit) Run.t
89 val status : 'arg token -> (#CouchDB.ctx,'arg result) Run.t
90 val process : unit -> bool
91 val loop : (unit -> unit) -> 'a
92 end
93
94 module Make =
95 functor (DB:CouchDB.DATABASE) ->
96 struct
97
98 let delay delay (fmt,name) argument =
99 let json = fmt.Fmt.to_json argument in
100 let id = Id.gen () in
101 let task = Json_type.Build.objekt [
102 "t" , Json_type.Build.string "task" ;
103 "time" , Fmt.Float.to_json (delay +. Unix.gettimeofday ()) ;
104 "what" , json ;
105 "code" , Json_type.Build.string name ;
106 "sta" , Json_type.Build.string "init" ;
107 ] in
108
109 DB.put id task |> Run.map (const (fmt, id))
110
111 let call (fmt,name) argument =
112 delay 0.0 (fmt,name) argument
113
114 let prepare (fmt,name) argument =
115 let json = fmt.Fmt.to_json argument in
116 let id = Id.gen () in
117 let task = Json_type.Build.objekt [
118 "t" , Json_type.Build.string "task" ;
119 "time" , Fmt.Float.to_json (Unix.gettimeofday ()) ;
120 "what" , json ;
121 "code" , Json_type.Build.string name ;
122 "sta" , Json_type.Build.string "wait" ;
123 ] in
124
125 DB.put id task |> Run.map (const (fmt, id))
126
127 let start (fmt,id) =
128
129 let start json =
130 try
131 let list = Json_type.Browse.objekt json in
132 let status = Json_type.Browse.string (List.assoc "sta" list) in
133 if status = "wait" then
134 let list = ( "sta" , Json_type.Build.string "init" )
135 :: ( BatList.remove_assoc "sta" list ) in
136 (), `put (Json_type.Build.objekt list)
137 else
138 (), `keep
139 with _ ->
140 (), `keep
141 in
142
143 DB.transaction id (DB.if_exists start) |> Run.map ignore
144
145 let status (fmt,id) =
146
147 let _status json =
148 let list = Json_type.Browse.objekt json in
149 let status = Json_type.Browse.string (List.assoc "sta" list) in
150 let what f = List.assoc "what" list |> fmt.Fmt.of_json |> BatOption.map f in
151 match status with
152 | "fail" -> Some Failed
153 | "init" -> what (fun d -> Initial d)
154 | "wait" -> what (fun d -> Waiting d)
155 | "fini" -> what (fun d -> Finished d)
156 | "part" ->
157 let did = Json_type.Browse.int (List.assoc "did" list) in
158 let todo = Json_type.Browse.int (List.assoc "todo" list) in
159 what (fun d -> Partial (d, did, todo))
160 | other ->
161 log "Task.status : unknown status %s for task %s"
162 other (Id.str id) ;
163 Some Failed
164 in
165
166 DB.get id |> Run.map begin function
167 | None ->
168 log "Task.status : looking for unknown task %s" (Id.str id) ;
169 Failed
170 | Some json ->
171 try
172 match _status json with
173 | Some value -> value
174 | None ->
175 log "Task.status : problem parsing result %s" (logjson json) ;
176 Failed
177 with
178 | Json_type.Json_error error ->
179 log "Task.status : %s when reading task %s" error (logjson json) ;
180 Failed
181 | Not_found ->
182 log "Task.status : missing field when reading task %s" (logjson json) ;
183 Failed
184 end
185
186 module Design = struct
187 module Database = DB
188 let name = "task"
189 end
190
191 module OldView = CouchDB.DocView(struct
192 module Key = Fmt.Float
193 module Value = Fmt.Unit
194 module Doc = Fmt.Json
195
196 module Design = Design
197
198 let name = "old"
199
200 let map = "if ( doc.t == 'task'
201 && doc.sta == 'fini') emit(doc.time,null)"
202 end)
203
204 module NextView = CouchDB.DocView(struct
205 module Key = Fmt.Float
206 module Value = Fmt.Unit
207 module Doc = Fmt.Json
208
209 module Design = Design
210
211 let name = "next"
212
213 let map = "if ( doc.t == 'task'
214 && doc.sta != 'wait'
215 && doc.sta != 'fini'
216 && doc.sta != 'fail') emit(doc.time,null)"
217 end)
218
219 (* This is an operation that grabs the next available task (name, identifier and
220 payload) if present. *)
221
222 let _next : (CouchDB.ctx,(string * Id.t * Json_type.t) option) Run.t =
223
224 (* Lock and return the next task if it's acceptable (its time should be in
225 the future, and it should be ready to run).
226 *)
227 let lock id =
228 let now = Unix.gettimeofday () in
229 let lock json =
230 let list = Browse.objekt json in
231 let time = Fmt.Float.of_json (List.assoc "time" list) in
232 if time > now then None, `keep else
233 let what = try Some (List.assoc "what" list) with Not_found -> None in
234 match what with
235 | Some what -> let name = Browse.string (List.assoc "code" list) in
236 let tries =
237 try Browse.int (List.assoc "tries" list)
238 with _ -> 0
239 in
240
241 let status = Browse.string (List.assoc "sta" list) in
242
243 if status = "fail" || status = "fini" || status = "wait"
244 then None, `keep
245 else
246
247 let expire = now +. 2.0 +. 10.0 *. (float_of_int tries) in
248
249 let list = list
250 |> BatList.remove_assoc "time"
251 |> BatList.remove_assoc "tries"
252 in
253
254 let list =
255 ("time", Fmt.Float.to_json expire)
256 :: ("tries", Build.int (tries + 1))
257 :: list
258 in
259
260 Some (name, id, what), `put (Build.objekt list)
261
262 | None -> let () = log "Task.process : %s has no 'what' field : giving up" (Id.str id) in
263 let list = ("sta",Build.string "fail") :: BatList.remove_assoc "sta" list in
264 None, `put (Build.objekt list)
265 in
266
267 DB.transaction id (DB.if_exists lock) |> Run.map (BatOption.default None)
268 in
269
270 (* Identify the next task, grab its id, and fetch it. *)
271 NextView.doc_query ~startkey:0.0 ~limit:1 ()
272 |> Run.bind (Util.first
273 |- BatOption.map (#id |- lock)
274 |- BatOption.default (Run.return None))
275
276 (* The prcoessing function itself, grabs a new task, initializes a database context, and
277 performs the task.
278 *)
279
280 let process () =
281
282 (* This function writes the task back. To be used as part of a transaction. *)
283 let _writeback_task id result task =
284 try
285 let list =
286 Json_type.Browse.objekt task
287 |> BatList.remove_assoc "did"
288 |> BatList.remove_assoc "todo"
289 |> BatList.remove_assoc "what"
290 |> BatList.remove_assoc "sta"
291 |> BatList.remove_assoc "time"
292 in
293 let list = ("time", Fmt.Float.to_json (Unix.gettimeofday ())) :: list in
294 let list =
295 match result with
296 | Failed ->
297 ("sta", Json_type.Build.string "fail") :: list
298 | Finished arg ->
299 ("sta", Json_type.Build.string "fini")
300 :: ("what", arg) :: list
301 | Initial arg ->
302 ("sta", Json_type.Build.string "init")
303 :: ("what", arg) :: list
304 | Waiting arg ->
305 ("sta", Json_type.Build.string "wait")
306 :: ("what", arg) :: list
307 | Partial (arg, did, todo) ->
308 ("sta", Json_type.Build.string "part")
309 :: ("what", arg)
310 :: ("did", Json_type.Build.int did)
311 :: ("todo", Json_type.Build.int todo)
312 :: list
313 in
314 true, `put (Json_type.Build.objekt list)
315 with exn ->
316 log "Task.process : %s on task %s : %s"
317 (Printexc.to_string exn) (Id.str id) (logjson task) ;
318 false, `keep
319 in
320
321 (* The actual writeback operation. *)
322 let _writeback id result =
323 DB.transaction id (DB.if_exists (_writeback_task id result))
324 |> Run.map (BatOption.default false)
325 in
326
327 let _process name id what =
328 try
329 log "Task.process : %s : %s ( %s )" (Id.str id) name (logjson what) ;
330 let impl = Hashtbl.find _actions name in
331 impl what (Fmt.Json.fmt,id) |> Run.bind (_writeback id)
332 with Not_found ->
333 Util.log "Task.process : %s : action %s not defined" (Id.str id) name ;
334 Run.return false
335 in
336
337 let _fetch_and_process =
338 _next |> Run.bind begin function
339 | None -> Run.return false
340 | Some (name,id,task) -> _process name id task
341 end
342 in
343
344 try Run.eval (new CouchDB.init_ctx) _fetch_and_process with exn ->
345 Util.log "Task.process : failed with exception %s" (Printexc.to_string exn) ;
346 false
347
348 let cleanup () =
349 let _cleanup =
350 Run.context |> Run.bind begin fun ctx ->
351 let now = ctx # time in
352 let lifetime = 3600. in
353 OldView.doc_query ~startkey:0.0 ~endkey:(now -. lifetime) ~limit:1 ()
354 |> Run.map Util.first
355 |> Run.bind begin function
356 | None -> Run.return ()
357 | Some item ->
358 Util.log "Cleaning up task %s" (Id.str item # id) ;
359 DB.transaction (item # id) (DB.remove) |> Run.map ignore
360 end
361 end
362 in
363 Run.eval (new CouchDB.init_ctx) (_cleanup)
364
365 let rec loop prepare =
366
367 prepare () ;
368
369 let ran =
370 try process ()
371 with exn ->
372 log "Task.process : failed with exception %s"
373 (Printexc.to_string exn) ;
374 false
375 in
376 if not ran then begin
377 let ran = Background.process () in
378 if not ran then begin
379 cleanup () ;
380 log "Nothing to do..." ;
381 Unix.sleep 2 ;
382 end
383 end ;
384
385 loop prepare
386
387 end
Something went wrong with that request. Please try again.