Skip to content

Commit

Permalink
store: add a second transaction merge algorithm
Browse files Browse the repository at this point in the history
The existing transaction merge algorithm keeps track of the least
upper bound of all the nodes which have been read and written,
and will re-combine two stores which have disjoint upper bounds.
This works well for small transactions but the larger ones such
used in

* VM start: creates /vm/... /vss/... /local/domain/...
  The least upper bound of this transaction is / and so all
  these transactions conflict with everything

* Device hotplug: creates /local/domain/n /local/domain/m
  The least upper bound of this transaction is /local/domain so
  all these transactions conflict with each other

If the existing merge algorithm cannot merge and commit, we attempt
a /replay/ of the failed transaction against the new store.
During the first run through of a transaction we store both the
request received and the response sent to the client. When we
replay the requests we check whether the response sent to the client
is the same. If the responses are all the same then the transaction
replay can be committed. If any differ then the transaction replay
must be aborted and the client must retry.

This algorithm uses the intuition that the transactions made by
the toolstack are designed to be for separate domains, and should
fundamentally not conflict in the sense that they don't read or write
any shared keys. By replaying the transaction on the server side
we do what the client would have to do anyway, only we can do it
quickly without allowing any other requests to interfere.

Performing 300 parallel simulated VM start and shutdowns without this code:

djs@ely:~/ocaml-xenstore$ ./_build/bench/xsbench -path /tmp/socket -n 300
300 parallel starts and shutdowns: 395.47

Performing 300 parallel simulated VM start and shutdowns with this code:

djs@ely:~/ocaml-xenstore$ ./_build/bench/xsbench -path /tmp/socket -n 300
300 parallel starts and shutdowns: 4.33
  • Loading branch information
David Scott committed Aug 10, 2012
1 parent 3b05317 commit 0735988
Show file tree
Hide file tree
Showing 10 changed files with 336 additions and 191 deletions.
1 change: 1 addition & 0 deletions TODO.md
Expand Up @@ -26,3 +26,4 @@ To-do list

11. Add mirage domain frontend

12. Remove exceptions
4 changes: 2 additions & 2 deletions bench/xs_bench.ml
Expand Up @@ -460,12 +460,12 @@ let main () =
| Some n -> int_of_string n in

lwt client = make () in
(*
lwt t = time (fun () -> sequential n client) in
lwt () = Lwt_io.write Lwt_io.stdout (Printf.sprintf "%d sequential starts and shutdowns: %.02f\n" n t) in
(*
*)
lwt t = time (fun () -> parallel n client) in
lwt () = Lwt_io.write Lwt_io.stdout (Printf.sprintf "%d parallel starts and shutdowns: %.02f\n" n t) in
*)
return ()
end

Expand Down
154 changes: 126 additions & 28 deletions core/xs_packet.ml
Expand Up @@ -347,34 +347,101 @@ let set_data pkt (data: string) =

module Response = struct

let read request x = set_data request x
let getperms request perms = set_data request (data_concat [ ACL.to_string perms ])
let getdomainpath request x = set_data request (data_concat [ x ])
let transaction_start request tid = set_data request (data_concat [ Int32.to_string tid ])

let directory request ls = set_data request (if ls = [] then "" else data_concat ls)

let error request x =
let reply = { request with ty = Op.Error } in
set_data reply (data_concat [ x ])

let ack request = set_data request "OK\000"

let write = ack
let mkdir = ack
let rm = ack
let setperms = ack
let watch = ack
let unwatch = ack
let transaction_end = ack
let introduce = ack
let release = ack
let debug request items = set_data request (data_concat items)
let set_target = ack
let restrict = ack
let resume = ack
let isintroduced request b = set_data request (data_concat [ if b then "T" else "F" ])
let watchevent path token = create 0l 0l Op.Watchevent (data_concat [ path; token ])
type payload =
| Read of string
| Directory of string list
| Getperms of ACL.t
| Getdomainpath of string
| Transaction_start of int32
| Write
| Mkdir
| Rm
| Setperms
| Watch
| Unwatch
| Transaction_end
| Debug of string list
| Introduce
| Resume
| Release
| Set_target
| Restrict
| Isintroduced of bool
| Error of string
| Watchevent of string * string

let prettyprint_payload =
let open Printf in function
| Read x -> sprintf "Read %s" x
| Directory xs -> sprintf "Directory [ %s ]" (String.concat "; " xs)
| Getperms acl -> sprintf "Getperms %s" (ACL.to_string acl)
| Getdomainpath p -> sprintf "Getdomainpath %s" p
| Transaction_start x -> sprintf "Transaction_start %ld" x
| Write -> "Write"
| Mkdir -> "Mkdir"
| Rm -> "Rm"
| Setperms -> "Setperms"
| Watch -> "Watch"
| Unwatch -> "Unwatch"
| Transaction_end -> "Transaction_end"
| Debug xs -> sprintf "Debug [ %s ]" (String.concat "; " xs)
| Introduce -> "Introduce"
| Resume -> "Resume"
| Release -> "Release"
| Set_target -> "Set_target"
| Restrict -> "Restrict"
| Isintroduced x -> sprintf "Isintroduced %b" x
| Error x -> sprintf "Error %s" x
| Watchevent (x, y) -> sprintf "Watchevent %s %s" x y

let ok = "OK\000"

let print request =
let f op data = create (get_tid request) (get_rid request) op data in
function
| Read x -> f Op.Read x
| Directory ls -> f Op.Directory (if ls = [] then "" else data_concat ls)
| Getperms perms -> f Op.Getperms (data_concat [ ACL.to_string perms ])
| Getdomainpath x -> f Op.Getdomainpath (data_concat [ x ])
| Transaction_start tid -> f Op.Transaction_start (data_concat [ Int32.to_string tid ])
| Debug items -> f Op.Debug (data_concat items)
| Isintroduced b -> f Op.Isintroduced (data_concat [ if b then "T" else "F" ])
| Watchevent (path, token) -> create 0l 0l Op.Watchevent (data_concat [ path; token ])
| Error x -> f Op.Error (data_concat [ x ])
| Write -> f Op.Write ok
| Mkdir -> f Op.Mkdir ok
| Rm -> f Op.Rm ok
| Setperms -> f Op.Setperms ok
| Watch -> f Op.Watch ok
| Unwatch -> f Op.Unwatch ok
| Transaction_end -> f Op.Transaction_end ok
| Introduce -> f Op.Introduce ok
| Resume -> f Op.Resume ok
| Release -> f Op.Release ok
| Set_target -> f Op.Set_target ok
| Restrict -> f Op.Restrict ok

let read request x = print request (Read x)
let getperms request perms = print request (Getperms perms)
let getdomainpath request x = print request (Getdomainpath x)
let transaction_start request tid = print request (Transaction_start tid)
let directory request ls = print request (Directory ls)
let error request x = print request (Error x)
let write request = print request Write
let mkdir request = print request Mkdir
let rm request = print request Rm
let setperms request = print request Setperms
let watch request = print request Watch
let unwatch request = print request Unwatch
let transaction_end request = print request Transaction_end
let introduce request = print request Introduce
let release request = print request Release
let debug request items = print request (Debug items)
let set_target request = print request Set_target
let restrict request = print request Restrict
let resume request = print request Resume
let isintroduced request b = print request (Isintroduced b)
let watchevent path token = create 0l 0l Op.Watchevent (data_concat [ path; token ])
end

module Request = struct
Expand Down Expand Up @@ -402,6 +469,30 @@ module Request = struct
| Error of string
| Watchevent of string

let prettyprint_payload =
let open Printf in function
| Read x -> sprintf "Read %s" x
| Directory x -> sprintf "Directory %s" x
| Getperms x -> sprintf "Getperms %s" x
| Getdomainpath x -> sprintf "Getdomainpath %d" x
| Transaction_start -> "Transaction_start"
| Write (k, v) -> sprintf "Write %s %s" k v
| Mkdir x -> sprintf "Mkdir %s" x
| Rm x -> sprintf "Rm %s" x
| Setperms (x, acl) -> sprintf "Setperms %s %s" x (ACL.to_string acl)
| Watch (x, y) -> sprintf "Watch %s %s" x y
| Unwatch (x, y) -> sprintf "Unwatch %s %s" x y
| Transaction_end x -> sprintf "Transaction_end %b" x
| Debug xs -> sprintf "Debug [ %s ]" (String.concat "; " xs)
| Introduce (x, n, y) -> sprintf "Introduce %d %nu %d" x n y
| Resume x -> sprintf "Resume %d" x
| Release x -> sprintf "Release %d" x
| Set_target (x, y) -> sprintf "Set_target %d %d" x y
| Restrict x -> sprintf "Restrict %d" x
| Isintroduced x -> sprintf "Isintroduced %d" x
| Error x -> sprintf "Error %s" x
| Watchevent x -> sprintf "Watchevent %s" x

exception Parse_failure

let strings data = split_string '\000' data
Expand Down Expand Up @@ -497,6 +588,13 @@ module Request = struct
Some (parse_exn request)
with _ -> None

let prettyprint request =
Printf.sprintf "tid = %ld; rid = %ld; payload = %s"
(get_tid request) (get_rid request)
(match parse request with
| None -> "None"
| Some x -> "Some " ^ (prettyprint_payload x))

let print x tid = match x with
| Directory path -> with_path Op.Directory tid path
| Read path -> with_path Op.Read tid path
Expand Down
30 changes: 30 additions & 0 deletions core/xs_packet.mli
Expand Up @@ -143,6 +143,33 @@ module Response : sig
val resume : t -> t
val isintroduced : t -> bool -> t
val watchevent : string -> string -> t

type payload =
| Read of string
| Directory of string list
| Getperms of ACL.t
| Getdomainpath of string
| Transaction_start of int32
| Write
| Mkdir
| Rm
| Setperms
| Watch
| Unwatch
| Transaction_end
| Debug of string list
| Introduce
| Resume
| Release
| Set_target
| Restrict
| Isintroduced of bool
| Error of string
| Watchevent of string * string

val prettyprint_payload: payload -> string

val print: t -> payload -> t
end

module Request : sig
Expand Down Expand Up @@ -190,6 +217,9 @@ module Request : sig
| Error of string
| Watchevent of string

val prettyprint_payload: payload -> string
val prettyprint: t -> string

val parse: t -> payload option
val print: payload -> int32 -> t
end
Expand Down
30 changes: 15 additions & 15 deletions lib_test/xs_test.ml
Expand Up @@ -138,36 +138,36 @@ let make_example_response op f wire_fmt =

(* We use the example requests to generate example responses *)
let example_response_packets =
let open Xs_packet.Op in
let open Xs_packet in
let open Xs_packet.Response in [
make_example_response Read (fun t -> read t "theresult")
make_example_response Op.Read (fun t -> read t "theresult")
"\x02\x00\x00\x00\x0e\x00\x00\x00\x06\x00\x00\x00\x09\x00\x00\x00\x74\x68\x65\x72\x65\x73\x75\x6c\x74";
make_example_response Read (fun t -> read t "")
make_example_response Op.Read (fun t -> read t "")
"\x02\x00\x00\x00\x0e\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00";
make_example_response Getperms (fun t -> getperms t (Xs_packet.ACL.( { owner = 2; other = READ; acl = [ 4, NONE ] } )))
make_example_response Op.Getperms (fun t -> getperms t (Xs_packet.ACL.( { owner = 2; other = READ; acl = [ 4, NONE ] } )))
"\x03\x00\x00\x00\x0d\x00\x00\x00\x07\x00\x00\x00\x06\x00\x00\x00\x72\x32\x00\x6e\x34\x00";
make_example_response Getdomainpath (fun t -> getdomainpath t "/local/domain/4")
make_example_response Op.Getdomainpath (fun t -> getdomainpath t "/local/domain/4")
"\x0a\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x00\x2f\x6c\x6f\x63\x61\x6c\x2f\x64\x6f\x6d\x61\x69\x6e\x2f\x34\x00";
make_example_response Transaction_start (fun t -> transaction_start t 3l)
make_example_response Op.Transaction_start (fun t -> transaction_start t 3l)
"\x06\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x33\x00";
make_example_response Directory (fun t -> directory t [ "a"; "b"; "c"; "aseasyas"; "1"; "2"; "3" ])
make_example_response Op.Directory (fun t -> directory t [ "a"; "b"; "c"; "aseasyas"; "1"; "2"; "3" ])
"\x01\x00\x00\x00\x0f\x00\x00\x00\x05\x00\x00\x00\x15\x00\x00\x00\x61\x00\x62\x00\x63\x00\x61\x73\x65\x61\x73\x79\x61\x73\x00\x31\x00\x32\x00\x33\x00";
make_example_response Write write
make_example_response Op.Write write
"\x0b\x00\x00\x00\x0a\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x4f\x4b\x00";
make_example_response Mkdir mkdir
make_example_response Op.Mkdir mkdir
"\x0c\x00\x00\x00\x09\x00\x00\x00\x00\x04\x00\x00\x03\x00\x00\x00\x4f\x4b\x00";
make_example_response Rm rm
make_example_response Op.Rm rm
"\x0d\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x4f\x4b\x00";
make_example_response Setperms setperms
make_example_response Op.Setperms setperms
"\x0e\x00\x00\x00\x0b\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x4f\x4b\x00";
make_example_response Watch watch
make_example_response Op.Watch watch
"\x04\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x4f\x4b\x00";
make_example_response Unwatch unwatch
make_example_response Op.Unwatch unwatch
"\x05\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x4f\x4b\x00";
make_example_response Transaction_end transaction_end
make_example_response Op.Transaction_end transaction_end
"\x07\x00\x00\x00\x07\x00\x00\x00\x01\x00\x00\x00\x03\x00\x00\x00\x4f\x4b\x00";
{
op = Error;
op = Op.Error;
packet = error (Xs_packet.Request.directory "/foo" 2l |> unbox) "whatyoutalkingabout";
wire_fmt =
"\x10\x00\x00\x00\x10\x00\x00\x00\x02\x00\x00\x00\x14\x00\x00\x00\x77\x68\x61\x74\x79\x6f\x75\x74\x61\x6c\x6b\x69\x6e\x67\x61\x62\x6f\x75\x74\x00"
Expand Down

0 comments on commit 0735988

Please sign in to comment.