Skip to content

Commit

Permalink
type fiddling to hook up sync functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
Romain Slootmaekers committed Apr 23, 2012
1 parent d4439a9 commit 09af2a0
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 32 deletions.
5 changes: 4 additions & 1 deletion src/hope/barakoon.ml
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,10 @@ let create_resyncs others cluster_id =
List.iter
(fun cfg ->
let (n, ip, port) = extract_name_ip_port cfg in
Hashtbl.replace resyncs n (fun l -> Lwt.return () ) (* Sync.sync ip port cluster_id) *)
Hashtbl.replace resyncs n
(fun l ->
let log = (l: Bstore.BStore.t) in
Sync.sync ip port cluster_id log)
)
others;
resyncs
Expand Down
21 changes: 10 additions & 11 deletions src/hope/bstore.ml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,10 @@ module BStore = (struct
BS.init fn

let create fn =
BS.make fn >>= fun s ->
Lwt.return
{
m = Lwt_mutex.create();
store = s;
}
BS.make fn >>= fun store ->
let m = Lwt_mutex.create() in
let r = {m;store} in
Lwt.return r

let pref_key k = "@" ^ k
let unpref_key k = String.sub k 1 ((String.length k) -1)
Expand All @@ -36,10 +34,10 @@ module BStore = (struct
let log t d u =
let _exec tx =
begin
let rec _inner tx = function
| SET (k,v) -> BS.set tx (pref_key k) v
| DELETE k -> BS.delete tx (pref_key k)
| SEQUENCE s -> Lwt_list.iter_s (fun u -> _inner tx u) s
let rec _inner (tx: BS.tx) = function
| Core.SET (k,v) -> BS.set tx (pref_key k) v
| Core.DELETE k -> BS.delete tx (pref_key k)
| Core.SEQUENCE s -> Lwt_list.iter_s (fun u -> _inner tx u) s
in _inner tx u
end
in
Expand All @@ -61,7 +59,8 @@ module BStore = (struct
begin
let tick_i = TICK i_time in
let cvo =
if committed then None
if committed
then None
else
match ups with
| [] -> failwith "No update logged???"
Expand Down
3 changes: 0 additions & 3 deletions src/hope/dispatcher.ml
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,9 @@ module ADispatcher (S:STORE) = struct
master_id = Some m;
}
| A_RESYNC tgt ->
(*
let resync = Hashtbl.find t.resyncs tgt in
resync t.store >>= fun () ->
Lwt.return s
*)
Lwtc.failfmt "Resync not implemented"

end

38 changes: 21 additions & 17 deletions src/hope/sync.ml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
open Lwt
open Bstore

open Baardskeerder

let iterate (sa:Unix.sockaddr) cluster_id
(i0:int64)
(f: 'a -> int64 -> Baardskeerder.action list -> 'a Lwt.t)
(f: 'a -> int64 -> action list -> 'a Lwt.t)
a0
=
let outgoing buf =
Expand Down Expand Up @@ -34,26 +34,30 @@ let iterate (sa:Unix.sockaddr) cluster_id
Common.response ic incoming
)

let sync ip port cluster_id log =
let sync ip port cluster_id (log : BStore.t) =
let sa = Network.make_address ip port in
begin
BS.last_update log >>= fun uo ->
let i0 = match uo with
| None -> 0L
| Some (i,_,is_explicit) ->
if is_explicit then i else (Int64.pred i)
BStore.last_update log >>= fun luo ->
let i0 =
match luo with
| None -> 0L
| Some (Core.TICK ct,cuo) ->
if cuo = None
then Int64.pred ct
else ct
in
let rec action2update = function
| Set(k,v) -> Core.SET(k,v)
| Delete k -> Core.DELETE k
in
let do_actions (tx:BS.tx) acs = Lwt_list.iter_s
(function
| Baardskeerder.Set (k,v) -> BS.set tx k v
| Baardskeerder.Delete k -> BS.delete tx k
) acs
in

iterate sa cluster_id i0
(fun a i acs ->
BS.log_update log ~diff:true
(fun tx -> do_actions tx acs))
()
let us = List.map action2update acs in
let u = Core.SEQUENCE us in
let d = true in
BStore.log log d u)
Core.UNIT
>>= fun () ->
Lwt.return ()
end

0 comments on commit 09af2a0

Please sign in to comment.