Skip to content

Commit

Permalink
[feature] database: Implemented read cache update on write.
Browse files Browse the repository at this point in the history
  • Loading branch information
nrs135 authored and Louis Gesbert committed Aug 10, 2011
1 parent 0e712e1 commit 666a08e
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 23 deletions.
13 changes: 12 additions & 1 deletion database/badop.ml
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ module Aux : sig
end = struct

module D = Badop_lib
open Cps.Ops
let (@>) = Cps.Ops.(@>)
let (|>) = Cps.Ops.(|>)

(** Helper function to extract the resulting transaction from write operations *)
let result_transaction write_op_response =
Expand Down Expand Up @@ -312,6 +313,16 @@ end = struct
D.Dialog_aux.map_dialog ~query:(fun x k -> x |> k) ~response:(fun l k -> l |> k) dialog
@> fun dialog -> Search dialog |> k

let map_read_list_op
~(revision: 'revision1 -> 'revision2 Cps.t)
(l_op: ('which, 'revision1) generic_read_op list)
: ('which, 'revision2) generic_read_op list Cps.t =
fun k ->
let rd acc op k =
map_read_op ~revision op @> fun op -> op::acc |> k
in
Cps.List.fold rd [] l_op k

let map_write_op
~(transaction: 'transaction1 -> 'transaction2 Cps.t)
~(revision: 'revision1 -> 'revision2 Cps.t)
Expand Down
3 changes: 2 additions & 1 deletion database/badop/badop_lib.ml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ module Dialog = struct
end

module Dialog_aux = struct
open Cps.Ops
let (@>) = Cps.Ops.(@>)
let (|>) = Cps.Ops.(|>)

let respond _ r = Response r

Expand Down
127 changes: 108 additions & 19 deletions database/badop_cache.ml
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,41 @@
@author Louis Gesbert
**)

module String = Base.String
module List = Base.List
module Hashtbl = Base.Hashtbl
module Dialog = Badop_lib.Dialog
module Dialog_aux = Badop_lib.Dialog_aux
let (@>) = Cps.Ops.(@>)
let (|>) = Cps.Ops.(|>)
let sprintf = Printf.sprintf

module F (Bk: Badop.S) =
struct

module F (Bk: Badop.S) = struct
type database = Bk.database

type transaction_status = Fresh | Changed | Prepared | Committed | Failed

type revision = Bk.revision

type 'which read_op = ('which,revision) Badop.generic_read_op

type ans = Badop.Dialog.response Bk.read_op Badop.answer

type cache_entry =
| CacheAnswer of (Dialog.query read_op * ans) list
| CacheLink of Path.t

type transaction = { (* Extended transaction (called xtr below) *)
db: Bk.database;
status: transaction_status;
tr: Bk.transaction option;
stash: (Badop.path * Dialog.query Bk.write_op) list;
cache: (Badop.path, cache_entry) Hashtbl.t;
}
type revision = Bk.revision

type 'which write_op = ('which,transaction,revision) Badop.generic_write_op

let open_database = Bk.open_database
let close_database = Bk.close_database
Expand All @@ -47,55 +68,123 @@ module F (Bk: Badop.S) = struct
| l ->
get_tr xtr
@> fun tr -> Bk.write_list tr (List.rev l)
@> fun tr ->
{ xtr with tr = Some tr; status = Changed; stash = [] } |> k
@> fun tr -> { xtr with tr = Some tr; status = Changed; stash = [] } |> k

module Tr =
struct

module Tr = struct
let start db k =
{ db = db; tr = None; status = Fresh; stash = [] } |> k
{ db = db; tr = None; status = Fresh; stash = []; cache = Hashtbl.create 128; } |> k

let start_at_revision db rev k =
Bk.Tr.start_at_revision db rev
@> fun tr -> { db = db; tr = Some tr; status = Fresh; stash = [] } |> k
@> fun tr -> { db = db; tr = Some tr; status = Fresh; stash = []; cache = Hashtbl.create 128; } |> k

let prepare xtr k =
flush xtr
@> fun xtr ->
match xtr.status with
| Changed ->
get_tr xtr
@> fun tr -> Bk.Tr.prepare tr
@> fun (tr,ok) -> ({ xtr with tr = Some tr; status = if ok then Prepared else Failed}, ok) |> k
@> fun (tr,ok) -> ({ xtr with tr = Some tr; status = if ok then Prepared else Failed}, ok) |> k
| Fresh | Prepared -> (xtr,true) |> k
| Failed | Committed -> (xtr,false) |> k

let rec commit xtr k =
match xtr.status with
| Prepared ->
assert(xtr.stash = []);
get_tr xtr @> fun tr -> Bk.Tr.commit tr @> k
| Changed ->
prepare xtr
@> fun (xtr,ok) -> if ok then get_tr xtr
@> fun tr -> Bk.Tr.commit tr @> k else false |> k
prepare xtr
@> fun (xtr,ok) ->
if ok
then get_tr xtr
@> fun tr -> Bk.Tr.commit tr
@> k
else false |> k
| Fresh ->
if xtr.stash = [] then true |> k else flush xtr
@> fun xtr -> commit xtr @> k
if xtr.stash = []
then true |> k
else flush xtr
@> fun xtr -> commit xtr
@> k
| Committed -> true |> k
| Failed -> false |> k

let abort xtr k =
match xtr.status with
| Failed | Committed -> () |> k
| _ -> match xtr.tr with Some tr -> Bk.Tr.abort tr @> k | None -> () |> k
end

type 'which read_op = 'which Bk.read_op
type 'which write_op = ('which,transaction,revision) Badop.generic_write_op
end

let read xtr path read_op k =
(* For debug, we can get rid of this later... *)
let string_of_DLU = function `Data -> "Data" | `Link -> "Link" | `Unset -> "Unset" | _ -> assert false
let string_of_time t = Date.rfc1123 (Time.localtime t)
let string_of_range (to_string:'a -> string) ((ao,i):'a Badop.range) = sprintf "(%s,%d)" (Option.to_string to_string ao) i
let string_of_gro = function
| Badop.Stat (Dialog.Query ()) -> "Query(Stat())"
| Badop.Stat (Dialog.Response (path, rev_opt, _DLU)) ->
sprintf "Response(Stat(%s,%s,%s))"
(Path.to_string path) (Option.to_string Bk.Debug.revision_to_string rev_opt) (string_of_DLU _DLU)
| Badop.Contents (Dialog.Query ()) -> "Query(Contents())"
| Badop.Contents (Dialog.Response data) -> sprintf "Response(Contents(%s))" (DataImpl.to_string data)
| Badop.Children (Dialog.Query key_range) -> sprintf "Query(Children(%s))" (string_of_range Keys.to_string key_range)
| Badop.Children (Dialog.Response path_list) ->
sprintf "Response(Children([%s]))" (String.concat_map "; " Path.to_string path_list)
| Badop.Revisions (Dialog.Query rev_range) ->
sprintf "Query(Children(%s))" (string_of_range Bk.Debug.revision_to_string rev_range)
| Badop.Revisions (Dialog.Response rtl) ->
sprintf "Response(Children([%s]))"
(String.concat_map "; " (fun (r,t) -> sprintf "(%s,%s)" (Bk.Debug.revision_to_string r) (string_of_time t)) rtl)
| Badop.Search (Dialog.Query (sl,ir)) ->
sprintf "Query(Search([%s],%s))" (String.concat "; " sl) (string_of_range string_of_int ir)
| Badop.Search (Dialog.Response kl) -> sprintf "Response(Search([%s]))" (String.concat_map "; " Keys.to_string kl)

let really_read ans_list xtr path read_op k =
flush xtr
@> fun xtr ->
get_tr xtr @> fun tr -> Bk.read tr path read_op @> k
@> fun xtr -> get_tr xtr
@> fun tr -> Badop.Aux.map_read_op ~revision:(fun r k -> r |> k) read_op
@> fun bk_read_op -> Bk.read tr path bk_read_op
@> fun ans ->
#<If:BADOP_DEBUG$minlevel 10>Logger.debug "CACHING(%s,%s)" (Path.to_string path) (string_of_gro read_op)#<End>;
Hashtbl.replace xtr.cache path (CacheAnswer ((read_op,ans)::ans_list));
ans |> k

let rec read xtr path read_op k =
match Hashtbl.find_opt xtr.cache path with
| Some (CacheAnswer ans_list) ->
(match List.assoc_opt read_op ans_list with
| Some ans ->
#<If:BADOP_DEBUG$minlevel 10>Logger.debug "CACHED(%s,%s)" (Path.to_string path) (string_of_gro read_op)#<End>;
ans |> k
| None -> really_read ans_list xtr path read_op k)
| Some (CacheLink p) ->
#<If:BADOP_DEBUG$minlevel 10>Logger.debug "FOLLOWING(%s)" (Path.to_string p)#<End>;
read xtr p read_op k
| None -> really_read [] xtr path read_op k

let write xtr path write_op k =
(* We make some effort to update the read cache but mostly we just stomp on it *)
(match write_op with
| Badop.Set (Dialog.Query data) ->
let gro = Badop.Contents (Dialog_aux.make_unsafe_response data) in
#<If:BADOP_DEBUG$minlevel 10>Logger.debug "UPDATED(%s,%s)" (Path.to_string path) (string_of_gro gro)#<End>;
Hashtbl.replace xtr.cache path (CacheAnswer[(Badop.Contents (Dialog_aux.make_unsafe_query ()),`Answer gro)
(* Can't add stat here because we can't predict the revision *)])
| Badop.Clear (Dialog.Query ()) ->
#<If:BADOP_DEBUG$minlevel 10>Logger.debug "CLEARED(%s)" (Path.to_string path)#<End>;
Hashtbl.replace xtr.cache path (CacheAnswer [(Badop.Contents (Dialog_aux.make_unsafe_query ()),`Absent);
(Badop.Stat (Dialog_aux.make_unsafe_query ()),`Absent)])
| Badop.Link (Dialog.Query p) ->
#<If:BADOP_DEBUG$minlevel 10>Logger.debug "LINKED(%s->%s)" (Path.to_string path) (Path.to_string p)#<End>;
Hashtbl.replace xtr.cache path (CacheLink p)
(*| Badop.Copy (Dialog.Query _) ???*)
| _ ->
#<If:BADOP_DEBUG$minlevel 10>Logger.debug "INVALIDATED(%s)" (Path.to_string path)#<End>;
Hashtbl.remove xtr.cache path);
Badop.Aux.map_write_op ~transaction:(fun xtr k -> get_tr xtr @> k) ~revision:(fun r k -> r |> k) write_op
(* only for types, no tr in queries *)
@> fun bk_write_op ->
Expand Down
4 changes: 2 additions & 2 deletions database/light/db_light.ml
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ let start = root_eid

(* may raise UnqualifiedPath *)
let rec get db path =
Logger.info "Db_light.get: path=%s%!" (Path.to_string path);
#<If:DEBUG_DB$minlevel 20>Logger.info "Db_light.get: path=%s%!" (Path.to_string path)#<End>;
let node, _rev = get_node_of_path db path in
match Node_light.get_content node with
| Datas.Data d -> d
Expand Down Expand Up @@ -514,7 +514,7 @@ let start = root_eid
with Not_found -> raise UnqualifiedPath

let follow_link db path =
Logger.info "Db_light.follow_link: path=%s" (Path.to_string path);
#<If:DEBUG_DB$minlevel 20>Logger.info "Db_light.follow_link: path=%s" (Path.to_string path)#<End>;
let rec aux db path =
let path_end = Path.to_list path in
let (path_end, node) = follow_path db db.tree path_end in
Expand Down

0 comments on commit 666a08e

Please sign in to comment.