Permalink
Browse files

[enhance] database: fatal database errors now trigger the fail-transa…

…ction handler in opa
  • Loading branch information...
1 parent e128237 commit 3321f323c68ec5d9a8526a6b732a7bf2ac6aa70e Louis Gesbert committed Jun 29, 2011
View
@@ -157,9 +157,11 @@ module type S = sig
(** Transaction-handling functions are grouped in this module *)
module Tr : sig
- val start: database -> transaction Cps.t
+ (** Takes a hook that can be triggered at any point during the life of the
+ transaction if a fatal database error occurs (disconnection...) *)
+ val start: database -> (exn -> unit) -> transaction Cps.t
- val start_at_revision: database -> revision -> transaction Cps.t
+ val start_at_revision: database -> revision -> (exn -> unit) -> transaction Cps.t
(** In [prepare] and [commit], the returned boolean is [true] for success. *)
val prepare: transaction -> (transaction * bool) Cps.t
View
@@ -40,8 +40,8 @@ module F (Bk: Badop.S) = struct
module Tr = struct
- let start db k = Bk.Tr.start db @> k
- let start_at_revision db rev k = Bk.Tr.start_at_revision db rev @> k
+ let start db errk k = Bk.Tr.start db errk @> k
+ let start_at_revision db rev errk k = Bk.Tr.start_at_revision db rev errk @> k
let prepare tr k = Bk.Tr.prepare tr @> k
let commit tr k = Bk.Tr.commit tr @> k
let abort tr k = Bk.Tr.abort tr @> k
View
@@ -48,7 +48,8 @@ let close_database db k = N.close_channel db |> k
let status db k =
match N.local_of_channel db, N.remote_of_channel db with
| N.Tcp (local_addr,_), N.Tcp (remote_addr,remote_port) ->
- (N.sendreceive db (Status (Dialog.query ()))
+ (N.sendreceive' db (Status (Dialog.query ()))
+ (fun _ -> Badop.Client (local_addr, (remote_addr, remote_port), Badop.Other "disconnected") |> k)
@> function
| Status (Dialog.Response st) ->
Badop.Client (local_addr, (remote_addr, remote_port), st) |> k
@@ -57,43 +58,47 @@ let status db k =
module Tr = struct
- let start db k =
- N.sendreceive db (Transaction (Dialog.query ()))
+ let start db errk k =
+ N.sendreceive' db (Transaction (Dialog.query ())) errk
@> function
- | Transaction (Dialog.Response tr) -> tr |> k
- | _ -> assert false
+ | Transaction (Dialog.Response tr) ->
+ N.on_disconnect tr (fun () -> N.Disconnected (N.remote_of_channel db) |> errk);
+ tr |> k
+ | _ -> N.panic db
- let start_at_revision db rev k =
- N.sendreceive db (Transaction_at (Dialog.query rev))
+ let start_at_revision db rev errk k =
+ N.sendreceive' db (Transaction_at (Dialog.query rev)) errk
@> function
- | Transaction_at (Dialog.Response tr) -> tr |> k
- | _ -> assert false
+ | Transaction_at (Dialog.Response tr) ->
+ N.on_disconnect tr (fun () -> N.Disconnected (N.remote_of_channel db) |> errk);
+ tr |> k
+ | _ -> N.panic db
let prepare tr k =
N.sendreceive tr (Prepare (Dialog.query ()))
@> function
| Prepare (D.Response (tr',success)) -> (tr', success) |> k
- | _ -> assert false
+ | _ -> N.panic tr
let commit tr k =
N.sendreceive tr (Commit (Dialog.query ()))
@> function
| Commit (D.Response success) -> success |> k
- | _ -> assert false
+ | _ -> N.panic tr
let abort tr k =
N.sendreceive tr (Abort (Dialog.query ()))
@> function
| Abort (D.Response ()) -> () |> k
- | _ -> assert false
+ | _ -> N.panic tr
end
let read tr path query k =
N.sendreceive tr (Read (path, Dialog.query query))
@> function
| Read (path', D.Response result) -> assert (path = path'); result |> k
- | _ -> assert false
+ | _ -> N.panic tr
let write tr path query k =
N.sendreceive tr (Write (path, query))
@@ -105,15 +110,15 @@ let write tr path query k =
~transaction:(fun chan k -> chan |> k)
result
@> k
- | _ -> assert false
+ | _ -> N.panic tr
let write_list tr l_path_query k =
N.sendreceive tr (WriteList (Dialog.query l_path_query))
@> function
| WriteList (D.Response l_path_result) ->
l_path_result
|> k
- | _ -> assert false
+ | _ -> N.panic tr
let node_properties _db _config k =
#<If:TESTING> () |> k #<Else>
View
@@ -60,13 +60,13 @@ module F (Bk: Badop.S) = struct
let status xdb k = Bk.status xdb.db @> fun st -> Badop.Layer("Wrapper_template", st) |> k
module Tr = struct
- let start xdb k =
+ let start xdb errk k =
let id = new_id xdb in print xdb.db_pfx "Transaction %d: START" id;
- Bk.Tr.start xdb.db
+ Bk.Tr.start xdb.db errk
@> fun tr -> { tr = tr; id = id; pfx = xdb.db_pfx } |> k
- let start_at_revision xdb rev k =
+ let start_at_revision xdb rev errk k =
let id = new_id xdb in print xdb.db_pfx "Transaction %d: START (at revision %s)" id (Bk.Debug.revision_to_string rev);
- Bk.Tr.start_at_revision xdb.db rev
+ Bk.Tr.start_at_revision xdb.db rev errk
@> fun tr -> { tr = tr; id = id; pfx = xdb.db_pfx } |> k
let prepare xtr k =
print xtr.pfx "Transaction %d: PREPARE" xtr.id;
@@ -82,7 +82,7 @@ module Node_property = Badop_structure.Node_property
module F (Bk: Badop.S) = struct
type local_transaction =
- | Tr_notstarted of Bk.database * Bk.revision option
+ | Tr_notstarted of Bk.database * Bk.revision option * (exn -> unit)
| Tr_started of Bk.transaction
| Tr_changed of Bk.transaction
@@ -117,17 +117,17 @@ module F (Bk: Badop.S) = struct
Badop.Layer_multi("Dispatcher", L.to_list stloc) |> k
module Tr = struct
- let start xdb k =
- L.map xdb (fun db k -> Tr_notstarted (db,None) |> k)
+ let start xdb errk k =
+ L.map xdb (fun db k -> Tr_notstarted (db,None,errk) |> k)
@> fun loc -> {
loc = loc;
status = ref Fresh;
} |> k
- let start_at_revision xdb rev k =
+ let start_at_revision xdb rev errk k =
L.mapi xdb
(fun key db k ->
let rev_opt = match L.get_key rev key with Rev_now -> None | Rev_fixed r -> Some r in
- Tr_notstarted (db, rev_opt) |> k)
+ Tr_notstarted (db, rev_opt, errk) |> k)
@> fun loc -> {
loc = loc;
status = ref Fresh;
@@ -180,12 +180,12 @@ module F (Bk: Badop.S) = struct
let get_tr (push: local_transaction -> unit) ltr k = match ltr with
| Tr_started tr | Tr_changed tr -> tr |> k
- | Tr_notstarted (db,None) ->
+ | Tr_notstarted (db,None,errk) ->
(* FIXME: start at a revision guaranteed consistent with the transactions that xtr already contains *)
- Bk.Tr.start db
+ Bk.Tr.start db errk
@> fun tr -> push (Tr_started tr); tr |> k
- | Tr_notstarted (db,Some rev) ->
- Bk.Tr.start_at_revision db rev
+ | Tr_notstarted (db,Some rev,errk) ->
+ Bk.Tr.start_at_revision db rev errk
@> fun tr -> push (Tr_started tr); tr |> k
let get_local_rev key rev k = match L.get_key rev key with
View
@@ -63,10 +63,10 @@ let close_database db k =
let status db k = Badop.Local db.file |> k
module Tr = struct
- let start db k =
+ let start db _errk k =
{ db = db; tr = Session.new_trans db.session } |> k
- let start_at_revision db rev k =
+ let start_at_revision db rev _errk k =
{ db = db; tr = Session.new_trans ~read_only:(true, Some rev) db.session } |> k
let prepare trans k =
View
@@ -114,10 +114,12 @@ struct
match request with
| Transaction (D.Query () as query) ->
Backend.Tr.start db
+ (fun _exc -> N.alert_channel channel)
@> fun backend_tr -> init_tr backend_tr
@> fun tr -> Transaction (D.Dialog_aux.respond query tr) |> k
| Transaction_at (D.Query rev as query) ->
Backend.Tr.start_at_revision db rev
+ (fun _exc -> N.alert_channel channel)
@> fun backend_tr -> init_tr backend_tr
@> fun tr -> Transaction_at (D.Dialog_aux.respond query tr) |> k
| Status (D.Query () as query) ->
View
@@ -29,6 +29,7 @@ module F (Bk: Badop.S) = struct
db: Bk.database;
status: transaction_status;
tr: Bk.transaction option;
+ errk: exn -> unit;
stash: (Badop.path * Dialog.query Bk.write_op) list;
}
type revision = Bk.revision
@@ -38,7 +39,7 @@ module F (Bk: Badop.S) = struct
let status db k = Bk.status db @> fun st -> Badop.Layer("Stash", st) |> k
let get_tr xtr k = match xtr.tr with
- | None -> Bk.Tr.start xtr.db @> k
+ | None -> Bk.Tr.start xtr.db xtr.errk @> k
| Some tr -> tr |> k
let flush xtr k = match xtr.stash with
@@ -50,11 +51,11 @@ module F (Bk: Badop.S) = struct
{ xtr with tr = Some tr; status = Changed; stash = [] } |> k
module Tr = struct
- let start db k =
- { db = db; tr = None; status = Fresh; stash = [] } |> k
- let start_at_revision db rev k =
- Bk.Tr.start_at_revision db rev
- @> fun tr -> { db = db; tr = Some tr; status = Fresh; stash = [] } |> k
+ let start db errk k =
+ { db = db; tr = None; errk = errk; status = Fresh; stash = [] } |> k
+ let start_at_revision db rev errk k =
+ Bk.Tr.start_at_revision db rev errk
+ @> fun tr -> { db = db; tr = Some tr; errk = errk; status = Fresh; stash = [] } |> k
let prepare xtr k =
flush xtr
@> fun xtr ->
View
@@ -55,8 +55,8 @@ module F (Bk: Badop.S) = struct
module Tr = struct
- let start db k = incr transactions; Bk.Tr.start db @> k
- let start_at_revision db rev k = incr transactions; Bk.Tr.start_at_revision db rev @> k
+ let start db errk k = incr transactions; Bk.Tr.start db errk @> k
+ let start_at_revision db rev errk k = incr transactions; Bk.Tr.start_at_revision db rev errk @> k
let prepare tr k = Bk.Tr.prepare tr @> k
let commit tr k = incr commits; Bk.Tr.commit tr @> k
let abort tr k = incr aborts; Bk.Tr.abort tr @> k
@@ -33,8 +33,8 @@ module F (Bk: Badop.S) = struct
module Tr = struct
- let start db k = Bk.Tr.start db @> k
- let start_at_revision db rev k = Bk.Tr.start_at_revision db rev @> k
+ let start db errk k = Bk.Tr.start db errk @> k
+ let start_at_revision db rev errk k = Bk.Tr.start_at_revision db rev errk @> k
let prepare tr k = Bk.Tr.prepare tr @> k
let commit tr k = Bk.Tr.commit tr @> k
let abort tr k = Bk.Tr.abort tr @> k
@@ -37,8 +37,8 @@ module F (Bk: Badop.S) = struct
module Tr = struct
- let start db k = Bk.Tr.start db @> k
- let start_at_revision db rev k = Bk.Tr.start_at_revision db rev @> k
+ let start db errk k = Bk.Tr.start db errk @> k
+ let start_at_revision db rev errk k = Bk.Tr.start_at_revision db rev errk @> k
let prepare tr k = Bk.Tr.prepare tr @> k
let commit tr k = Bk.Tr.commit tr @> k
let abort tr k = Bk.Tr.abort tr @> k
View
@@ -144,9 +144,13 @@ let read_schema_from_db tr k =
let get_transaction_at_revision db k =
match options.time with
- | None -> Db.Tr.start db @> k
+ | None ->
+ Db.Tr.start db
+ (fun exc -> Logger.critical "Database error: %s" (Printexc.to_string exc); exit 4)
+ @> k
| Some revision_timestamp ->
Db.Tr.start db
+ (fun exc -> Logger.critical "Database error: %s" (Printexc.to_string exc); exit 4)
@> fun tr ->
Db.read tr Badop.Path.root (Badop.Revisions (Badop.Dialog.query (None,0)))
@> function
@@ -162,7 +166,9 @@ let get_transaction_at_revision db k =
(Db.Debug.revision_to_string revision)
(Time.local_mday ts) (Time.local_mon ts) (Time.local_year ts)
(Time.local_hour ts) (Time.local_min ts) (Time.local_sec ts);
- Db.Tr.start_at_revision db revision @> k)
+ Db.Tr.start_at_revision db revision
+ (fun exc -> Logger.critical "Database error: %s" (Printexc.to_string exc); exit 4)
+ @> k)
| _ ->
Printf.eprintf "Error while looking for revisions of the database"; exit 2
View
@@ -108,7 +108,12 @@ module F (B: Badop.S) (C: Xml_dump.SigCpsBackend) = struct
() |> k
in
let write (tropt,n) path write_op k =
- (fun k -> match tropt with Some tr -> tr |> k | None -> B.Tr.start db @> fun tr -> tr |> k)
+ (fun k -> match tropt with
+ | Some tr -> tr |> k
+ | None ->
+ B.Tr.start db
+ (fun err -> error "database error: %s" (Printexc.to_string err) |> emergency_cont)
+ @> fun tr -> tr |> k)
@> mkcont k
@> fun tr ->
B.write tr path write_op
View
@@ -853,16 +853,16 @@ end = struct
match connection_opt with
| Some connection -> (* the structure exists, but is disconnected *)
let reconnect cont =
- #<If$minlevel 10> debug "Reconnecting to %s" (endpoint_to_string remote) #<End>;
+ Logger.info "Reconnecting to %s" (endpoint_to_string remote);
Network.connect sched (Network.make_port_spec ~protocol addr port) encryption
~socket_flags:[Unix.SO_KEEPALIVE]
~err_cont:(
fun _ ->
- #<If$minlevel 20> debug "Reconnection to %s failed" (endpoint_to_string remote) #<End>;
+ Logger.info "Reconnection to %s failed" (endpoint_to_string remote);
disconnect connection; None |> cont
)
@> fun connection_info ->
- #<If$minlevel 20> debug "Reconnected to %s" (endpoint_to_string remote) #<End>;
+ Logger.info "Reconnected to %s" (endpoint_to_string remote);
connection.local <- (local_of_conn_info connection_info);
connection.finalised <- false;
Some connection_info |> cont
@@ -878,7 +878,7 @@ end = struct
~socket_flags:[Unix.SO_KEEPALIVE]
~err_cont:(fun _ -> !disconnect_ref (); None |> cont)
@> fun connection_info ->
- #<If$minlevel 20> debug "Connected to %s" (endpoint_to_string remote) #<End>;
+ Logger.info "Connected to %s" (endpoint_to_string remote);
!update_local_ref (local_of_conn_info connection_info);
Some connection_info |> cont
in
@@ -1588,6 +1588,9 @@ let close_channel chan =
#<If> debug "Closing channel %s" (channel_id_to_debug_string chan.id) #<End>;
ChanH.remove chan.id
+let panic chan =
+ #<If> debug "Alert on channel %s" (channel_id_to_debug_string chan.id) #<End>;
+ Connection.disconnect chan.connection
let dup : ('out','in') channel -> ('out'2,'in2) channel_spec -> ('out'2,'in2) channel =
fun (chan : ('out', 'in') channel) spec ->
View
@@ -154,6 +154,10 @@ val setup_respond: ('out','in') channel -> ('in' -> ('out' -> unit) -> unit) ->
(** Closes given channel *)
val close_channel: ('out','in') channel -> unit
+(** Closes the given channel and the underlying connection, triggering any
+ registered handlers *)
+val panic: ('out','in') channel -> unit
+
(** Registers a function to be called if the connection the channel relies on is lost.
It's not called if the channel is closed normally (by hand, or when we know nobody
may write to it anymore) *)
@@ -42,8 +42,8 @@ type t0 = {
close_database: db -> unit Cps.t;
status: db -> Badop.status Cps.t;
- tr_start: db -> tr Cps.t;
- tr_start_at_revision: db -> rv -> tr Cps.t;
+ tr_start: db -> (exn -> unit) -> tr Cps.t;
+ tr_start_at_revision: db -> rv -> (exn -> unit) -> tr Cps.t;
tr_prepare: tr -> (tr * bool) Cps.t;
tr_commit: tr -> bool Cps.t;
tr_abort: tr -> unit Cps.t;
@@ -229,7 +229,8 @@ let get options =
in
BslServer_event.send (Obj.magic event) (QmlCpsServerLib.cont_ml (fun _ -> ()))
| None ->
- Logger.error "Database connection error before OPA runtime initialisation")
+ Logger.error "Database connection error before OPA runtime initialisation";
+ exit 3)
;
#<If:DATABASE_RECONNECT$minlevel 0>
`retry (Time.seconds (int_of_string (Option.get DebugVariables.database_reconnect)))
Oops, something went wrong.

0 comments on commit 3321f32

Please sign in to comment.