Permalink
Browse files

[enhance] database client/server: much more efficient protocol

  • Loading branch information...
Louis Gesbert
Louis Gesbert committed Jul 1, 2011
1 parent 3321f32 commit 114c5eca8c30c2f35fb7b4dfbb2db123f13c96e5
Showing with 112 additions and 121 deletions.
  1. +38 −31 database/badop_client.ml
  2. +37 −40 database/badop_protocol.ml
  3. +37 −50 database/badop_server.ml
View
@@ -56,69 +56,76 @@ let status db k =
| _ -> assert false)
| _ -> assert false
+let tr_next tr k =
+ if tr.last then
+ let tr' = { tr with version = succ tr.version } in tr.last <- false; tr' |> k
+ else
+ N.sendreceive tr.channel (Fork (Dialog.query tr.version))
+ @> function
+ | Fork (Dialog.Response tr_channel) ->
+ { channel = tr_channel; version = succ tr.version; last = true } |> k
+ | _ -> N.panic tr.channel
+
module Tr = struct
let start db errk k =
N.sendreceive' db (Transaction (Dialog.query ())) errk
@> function
- | Transaction (Dialog.Response tr) ->
- N.on_disconnect tr (fun () -> N.Disconnected (N.remote_of_channel db) |> errk);
- tr |> k
+ | Transaction (Dialog.Response tr_channel) ->
+ N.on_disconnect tr_channel (fun () -> N.Disconnected (N.remote_of_channel db) |> errk);
+ { channel = tr_channel; version = 0; last = true } |> k
| _ -> N.panic db
let start_at_revision db rev errk k =
N.sendreceive' db (Transaction_at (Dialog.query rev)) errk
@> function
- | Transaction_at (Dialog.Response tr) ->
- N.on_disconnect tr (fun () -> N.Disconnected (N.remote_of_channel db) |> errk);
- tr |> k
+ | Transaction_at (Dialog.Response tr_channel) ->
+ N.on_disconnect tr_channel (fun () -> N.Disconnected (N.remote_of_channel db) |> errk);
+ { channel = tr_channel; version = 0; last = true } |> k
| _ -> N.panic db
let prepare tr k =
- N.sendreceive tr (Prepare (Dialog.query ()))
+ tr_next tr
+ @> fun tr ->
+ N.sendreceive tr.channel (Prepare (Dialog.query tr.version))
@> function
- | Prepare (D.Response (tr',success)) -> (tr', success) |> k
- | _ -> N.panic tr
+ | Prepare (D.Response success) -> (tr, success) |> k
+ | _ -> N.panic tr.channel
let commit tr k =
- N.sendreceive tr (Commit (Dialog.query ()))
+ N.sendreceive tr.channel (Commit (Dialog.query tr.version))
@> function
| Commit (D.Response success) -> success |> k
- | _ -> N.panic tr
+ | _ -> N.panic tr.channel
let abort tr k =
- N.sendreceive tr (Abort (Dialog.query ()))
+ N.sendreceive tr.channel (Abort (Dialog.query tr.version))
@> function
| Abort (D.Response ()) -> () |> k
- | _ -> N.panic tr
+ | _ -> N.panic tr.channel
end
let read tr path query k =
- N.sendreceive tr (Read (path, Dialog.query query))
+ N.sendreceive tr.channel (Read (path, Dialog.query (tr.version, query)))
@> function
| Read (path', D.Response result) -> assert (path = path'); result |> k
- | _ -> N.panic tr
+ | _ -> N.panic tr.channel
let write tr path query k =
- N.sendreceive tr (Write (path, query))
- @> function
- | Write (path', result) ->
- assert (path = path');
- Badop.Aux.map_write_op
- ~revision:(fun x k -> x |> k)
- ~transaction:(fun chan k -> chan |> k)
- result
- @> k
- | _ -> N.panic tr
+ Badop.Aux.map_write_op ~transaction:(fun _tr k -> () |> k) ~revision:(fun x k -> x |> k) query
+ @> fun query1 -> tr_next tr
+ @> fun tr ->
+ N.send tr.channel (Write (path, tr.version, query1));
+ Badop.Aux.respond_set_transaction query tr |> k
let write_list tr l_path_query k =
- N.sendreceive tr (WriteList (Dialog.query l_path_query))
- @> function
- | WriteList (D.Response l_path_result) ->
- l_path_result
- |> k
- | _ -> N.panic tr
+ let paths,queries = List.split l_path_query in
+ Badop.Aux.map_write_list_op ~transaction:(fun _tr k -> () |> k) ~revision:(fun x k -> x |> k) queries
+ @> fun queries1 -> tr_next tr
+ @> fun tr ->
+ N.send tr.channel (WriteList (tr.version, Dialog.query (List.combine paths queries1)));
+ tr |> k
let node_properties _db _config k =
#<If:TESTING> () |> k #<Else>
View
@@ -40,65 +40,62 @@ module F
struct
type revision = Host.revision
+ type tr_version = int
+
type 'which read_op = ('which,revision) Badop.generic_read_op
- type ('which,'transaction) poly_write_op = ('which,'transaction,revision) Badop.generic_write_op
+ type 'which internal_write_op = ('which,unit,revision) Badop.generic_write_op
(* 'transaction is a parameter, because we change it to string when we want to serialise *)
- type ('which,'transaction) poly_transaction_op =
+ type ('which,'transaction_channel) poly_transaction_op =
| Read of Badop.path
- * ('which, Dialog.query read_op, Dialog.response read_op Badop.answer) Dialog.t
- | Write of Badop.path
- * ('which,'transaction) poly_write_op
- | WriteList of ('which, (Badop.path * (Dialog.query,'transaction) poly_write_op) list, 'transaction) Dialog.t
- | Prepare of ('which, unit, 'transaction * bool) Dialog.t
- | Commit of ('which, unit, bool) Dialog.t
- | Abort of ('which, unit, unit) Dialog.t
-
- type transaction = (* needs rectypes *)
- ((Host.spoken,transaction) poly_transaction_op, (Host.understood,transaction) poly_transaction_op)
+ * ('which, tr_version * Dialog.query read_op, Dialog.response read_op Badop.answer) Dialog.t
+ | Write of Badop.path * tr_version * 'which internal_write_op
+ | WriteList of tr_version * ('which, (Badop.path * Dialog.query internal_write_op) list, unit) Dialog.t
+ | Prepare of ('which, tr_version, bool) Dialog.t
+ | Commit of ('which, tr_version, bool) Dialog.t
+ | Abort of ('which, tr_version, unit) Dialog.t
+ | Fork of ('which, tr_version, 'transaction_channel) Dialog.t
+
+ type transaction_channel =
+ ((Host.spoken,transaction_channel) poly_transaction_op, (Host.understood,transaction_channel) poly_transaction_op)
Hlnet.channel
- type 'which write_op = ('which,transaction) poly_write_op
+ type 'which transaction_op =
+ ('which,transaction_channel) poly_transaction_op
+
+ type transaction = {
+ channel : transaction_channel;
+ version : tr_version;
+ mutable last : bool;
+ }
- type 'which transaction_op = ('which,transaction) poly_transaction_op
+ type 'which write_op = ('which,transaction,revision) Badop.generic_write_op
type 'which database_query =
- | Transaction of ('which, unit, transaction) Dialog.t
- | Transaction_at of ('which, revision, transaction) Dialog.t
+ | Transaction of ('which, unit, transaction_channel) Dialog.t
+ | Transaction_at of ('which, revision, transaction_channel) Dialog.t
| Status of ('which, unit, Badop.status) Dialog.t
+
(* Just maps on transactions *)
let map_transaction_op
: 'which 'transaction1 'transaction2.
('transaction1 -> 'transaction2) -> ('which,'transaction1) poly_transaction_op
-> ('which,'transaction2) poly_transaction_op
= fun f op ->
- let map_write_op
- : 'which 't1 't2. ('t1 -> 't2) -> ('which,'t1) poly_write_op -> ('which,'t2) poly_write_op
- = fun f op ->
- nocps
- (Badop.Aux.map_write_op ~transaction:(fun tr k -> k (f tr)) ~revision:(fun x k -> k x) op)
- in
match op with
- | Write (path, write_op) ->
- Write (path, map_write_op f write_op)
- | WriteList (dialog) ->
- let dialog = nocps
- (Dialog_aux.map_dialog
- ~query:(fun oplist k -> k (List.map (fun (path,op) -> path, map_write_op f op) oplist))
- ~response:(fun tr k -> k (f tr))
- dialog)
- in
- WriteList dialog
- | Prepare dialog ->
- let dialog = nocps
- (Dialog_aux.map_dialog ~query:(fun x k -> k x) ~response:(fun (tr,ok) k -> k (f tr, ok)) dialog)
- in
- Prepare dialog
+ | Write (path, v, internal_write_op) -> Write (path, v, internal_write_op)
+ | WriteList (v, dialog) -> WriteList (v, dialog)
+ | Prepare dialog -> Prepare dialog
| Read (path, op) -> Read (path, op)
| Commit op -> Commit op
| Abort op -> Abort op
+ | Fork dialog ->
+ let dialog = nocps
+ (Dialog_aux.map_dialog ~query:(fun x k -> k x) ~response:(fun tr k -> k (f tr)) dialog)
+ in
+ Fork dialog
(* We need to expand this functions even if we use marshal internally, because
embedded transactions need to be processed through
@@ -123,19 +120,19 @@ struct
: (Host.spoken transaction_op, Host.understood transaction_op) Hlnet.channel_spec
= {
Hlnet.
- service = Hlnet.make_service_id ~name:"badop/trans" ~version:1;
+ service = Hlnet.make_service_id ~name:"badop/trans" ~version:2;
out_serialise = transaction_op_serialise;
in_unserialise = transaction_op_unserialise;
}
-
let database_op_serialise = function
| Transaction (Dialog.Query ()) -> "\000"
| Transaction (Dialog.Response transaction) -> "\100" ^ Hlnet.serialise_channel transaction
| Transaction_at (Dialog.Query rev) -> "\001" ^ Marshal.to_string rev []
| Transaction_at (Dialog.Response transaction) -> "\101" ^ Hlnet.serialise_channel transaction
| Status (Dialog.Query ()) -> "\002"
| Status (Dialog.Response status) -> "\102" ^ Marshal.to_string status []
+
let database_op_unserialise channel s offset = match s.[offset] with
| '\000' -> `data (Transaction (Dialog_aux.make_unsafe_query ()), offset + 1)
| '\100' ->
@@ -161,7 +158,7 @@ struct
: (Host.spoken database_query, Host.understood database_query) Hlnet.channel_spec
= {
Hlnet.
- service = Hlnet.make_service_id ~name:"badop/db" ~version:1;
+ service = Hlnet.make_service_id ~name:"badop/db" ~version:2;
out_serialise = database_op_serialise;
in_unserialise = database_op_unserialise;
}
View
@@ -40,86 +40,73 @@ struct
scheduler: Scheduler.t;
}
- let rec transaction_callback (backend_trans: Backend.transaction) transaction
+ let rec transaction_callback (transmap: Backend.transaction IntMap.t ref) channel
: (D.Dialog.query transaction_op
-> (D.Dialog.response transaction_op -> unit) -> unit)
=
fun request k ->
match request with
- | Read (path, (D.Query op as query)) ->
- Backend.read backend_trans path op
+ | Read (path, (D.Query (tr_version, op) as query)) ->
+ Backend.read (IntMap.find tr_version !transmap) path op
@> fun resp -> Read (path, D.Dialog_aux.respond query resp) |> k
- | Write (path, op) ->
+ | Write (path, tr_next_version, op) ->
Badop.Aux.map_write_op (* From Protocol.transaction to Backend.transaction *)
~revision:(fun r k -> r |> k)
- ~transaction:(fun _ k -> assert(not true); backend_trans |> k)
+ ~transaction:(fun () k -> assert false |> k)
op
@> fun op ->
- Backend.write backend_trans path op
- @> fun backend_response ->
- Badop.Aux.map_write_op (* From Backend.transaction to Protocol.transaction *)
- ~revision:(fun r k -> r |> k)
- ~transaction:(fun backend_trans k ->
- let transaction = N.dup transaction transaction_channel_spec in
- N.setup_respond transaction (transaction_callback backend_trans transaction);
- transaction |> k)
- backend_response
- @> fun resp -> Write (path,resp) |> k
- | WriteList l ->
- let l_q =
- match l with
- | D.Query l_q -> l_q
- | D.Response _ -> assert false
- in
- let l_op = List.map snd l_q in
+ Backend.write (IntMap.find (pred tr_next_version) !transmap) path op
+ @> fun backend_response ->
+ let tr = Badop.Aux.result_transaction backend_response in
+ transmap := IntMap.add tr_next_version tr !transmap (* no continuation needed *)
+ | WriteList (tr_next_version, D.Query l_q) ->
+ let l_paths,l_op = List.split l_q in
Badop.Aux.map_write_list_op (* From Protocol.transaction to Backend.transaction *)
~revision:(fun r k -> r |> k)
- ~transaction:(fun _ k -> assert(not true);
- backend_trans |> k)
+ ~transaction:(fun _ k -> assert false |> k)
l_op
- @> (fun l_op ->
- let l_zip = List.combine (List.map fst l_q) l_op in
- Backend.write_list backend_trans l_zip
- @> (fun backend_trans ->
- (* From Backend.transaction to Protocol.transaction *)
- let transaction = N.dup transaction transaction_channel_spec in
- N.setup_respond
- transaction
- (transaction_callback backend_trans transaction);
- WriteList (D.Dialog_aux.respond l transaction) |> k))
- | Prepare (D.Query () as query) ->
- Backend.Tr.prepare backend_trans
- @> fun (backend_trans,success) ->
- let transaction = N.dup transaction transaction_channel_spec in
- N.setup_respond transaction (transaction_callback backend_trans transaction);
- Prepare (D.Dialog_aux.respond query (transaction, success)) |> k
- | Commit (D.Query () as query) ->
- Backend.Tr.commit backend_trans
+ @> fun l_op ->
+ Backend.write_list (IntMap.find (pred tr_next_version) !transmap) (List.combine l_paths l_op)
+ @> fun tr ->
+ transmap := IntMap.add tr_next_version tr !transmap (* no continuation needed *)
+ | Prepare (D.Query tr_next_version as query) ->
+ Backend.Tr.prepare (IntMap.find (pred tr_next_version) !transmap)
+ @> fun (tr,success) ->
+ transmap := IntMap.add tr_next_version tr !transmap;
+ Prepare (D.Dialog_aux.respond query success) |> k
+ | Commit (D.Query tr_version as query) ->
+ Backend.Tr.commit (IntMap.find tr_version !transmap)
@> fun resp -> Commit (D.Dialog_aux.respond query resp) |> k
- | Abort (D.Query () as query) ->
- Backend.Tr.abort backend_trans
+ | Abort (D.Query tr_version as query) ->
+ Backend.Tr.abort (IntMap.find tr_version !transmap)
@> fun resp -> Abort (D.Dialog_aux.respond query resp) |> k
- | Read (_, D.Response _) | Prepare (D.Response _) | Commit (D.Response _) | Abort (D.Response _) ->
+ | Fork (D.Query tr_version as query) ->
+ let channel = N.dup channel transaction_channel_spec in
+ let transmap = ref (IntMap.filter_keys ((<=) tr_version) !transmap) in
+ N.setup_respond channel (transaction_callback transmap channel);
+ Fork (D.Dialog_aux.respond query channel) |> k
+ | Read (_, D.Response _) | WriteList (_, D.Response _) | Prepare (D.Response _)
+ | Commit (D.Response _) | Abort (D.Response _) | Fork (D.Response _) ->
assert false
let main_callback db (channel: database) :
D.Dialog.query database_query -> D.Dialog.response database_query Cps.t
=
fun request k ->
let init_tr backend_tr k =
- let transaction = N.dup channel transaction_channel_spec in
- N.setup_respond transaction (transaction_callback backend_tr transaction);
- transaction |> k
+ let channel = N.dup channel transaction_channel_spec in
+ N.setup_respond channel (transaction_callback (ref (IntMap.singleton 0 backend_tr)) channel);
+ channel |> k
in
match request with
| Transaction (D.Query () as query) ->
Backend.Tr.start db
- (fun _exc -> N.alert_channel channel)
+ (fun _exc -> N.panic channel)
@> fun backend_tr -> init_tr backend_tr
@> fun tr -> Transaction (D.Dialog_aux.respond query tr) |> k
| Transaction_at (D.Query rev as query) ->
Backend.Tr.start_at_revision db rev
- (fun _exc -> N.alert_channel channel)
+ (fun _exc -> N.panic channel)
@> fun backend_tr -> init_tr backend_tr
@> fun tr -> Transaction_at (D.Dialog_aux.respond query tr) |> k
| Status (D.Query () as query) ->

0 comments on commit 114c5ec

Please sign in to comment.