Skip to content

Commit

Permalink
Removing is_lease_active from state
Browse files Browse the repository at this point in the history
  • Loading branch information
tdeconin committed Apr 30, 2012
1 parent c0efa92 commit 81472a3
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 61 deletions.
6 changes: 3 additions & 3 deletions src/hope/barakoon.ml
Expand Up @@ -112,8 +112,8 @@ module MC = struct
loop ()
end
*)
let server_t driver store host port =
let inner = Server.make_server_thread host port (BSC.protocol driver store) in
let server_t me driver store host port =
let inner = Server.make_server_thread host port (BSC.protocol me driver store) in
inner ()

(*
Expand Down Expand Up @@ -267,7 +267,7 @@ let run_node myname config_file daemonize =
let resyncs = create_resyncs others cluster_id in
let disp, q = create_dispatcher store msging resyncs in
let driver = create_driver disp q in
let service driver = server_t driver store mycfg.ip mycfg.client_port in
let service driver = server_t mycfg.node_name driver store mycfg.ip mycfg.client_port in
build_start_state store mycfg others >>= fun s ->
let delayed_timeout = MULTI.A_START_TIMER (s.MULTI.round, Core.start_tick, float_of_int mycfg.lease_period) in
DRIVER.dispatch driver s delayed_timeout >>= fun s' ->
Expand Down
74 changes: 41 additions & 33 deletions src/hope/c.ml
Expand Up @@ -22,12 +22,6 @@ module ProtocolHandler (S:Core.STORE) = struct
check_cluster cluster_id >>= fun () ->
Lwt.return ()

let get_test_and_set_args ic =
Llio.input_string ic >>= fun k ->
Llio.input_string_option ic >>= fun m_old ->
Llio.input_string_option ic >>= fun m_new ->
Lwt.return (k, m_old, m_new)

let get_key ic =
Llio.input_string ic

Expand Down Expand Up @@ -69,39 +63,37 @@ module ProtocolHandler (S:Core.STORE) = struct

let _safe_get = wrap_not_found _get

let am_i_master store = true
let extract_master_info = function
| None -> None
| Some s ->
begin
let m, off = Llio.string_option_from s 0 in
m
end

let am_i_master store me =
S.get_meta store >>= fun meta ->
match (extract_master_info meta) with
| Some m when m = me -> Lwt.return true
| _ -> Lwt.return false

let _range store ~(allow_dirty:bool)
(first:string option) (finc:bool)
(last:string option) (linc:bool)
(max:int) =
if allow_dirty || am_i_master store
then
S.range store first finc last linc max
else
failwith "Cannot perform range on not-master"

S.range store first finc last linc max

let _get_meta store = S.get_meta store

let _last_entries store i oc = S.last_entries store (Core.TICK i) oc

let one_command driver store ((ic,oc) as conn) =
let one_command me driver store ((ic,oc) as conn) =
Client_protocol.read_command conn >>= fun comm ->
match comm with
| WHO_MASTER ->
Lwtc.log "who master" >>= fun () ->
_get_meta store >>= fun ms ->
let mo =
begin
match ms with
| None -> None
| Some s ->
begin
let m, off = Llio.string_from s 0 in
Some m
end
end
in
let mo = extract_master_info ms in
Llio.output_int32 oc 0l >>= fun () ->
Llio.output_string_option oc mo >>= fun () ->
Lwt.return false
Expand Down Expand Up @@ -184,10 +176,15 @@ module ProtocolHandler (S:Core.STORE) = struct
Llio.input_int ic >>= fun max ->
Lwt.catch
(fun () ->
_range store ~allow_dirty first finc last linc max >>= fun list ->
Llio.output_int32 oc 0l >>= fun () ->
Llio.output_list Llio.output_string oc list >>= fun () ->
Lwt.return false
am_i_master store me >>= fun me_master ->
if allow_dirty || me_master
then
_range store ~allow_dirty first finc last linc max >>= fun list ->
Llio.output_int32 oc 0l >>= fun () ->
Llio.output_list Llio.output_string oc list >>= fun () ->
Lwt.return false
else
Lwt.fail (Common.XException(Arakoon_exc.E_NOT_MASTER, me))
)
(Client_protocol.handle_exception oc )
end
Expand All @@ -207,7 +204,9 @@ module ProtocolHandler (S:Core.STORE) = struct
Client_protocol.response_ok_unit oc
end
| TEST_AND_SET ->
get_test_and_set_args ic >>= fun (key, m_old, m_new) ->
Llio.input_string ic >>= fun key ->
Llio.input_string_option ic >>= fun m_old ->
Llio.input_string_option ic >>= fun m_new ->
_safe_get store key >>= fun m_val ->
begin
if m_val = m_old
Expand All @@ -228,12 +227,21 @@ module ProtocolHandler (S:Core.STORE) = struct
get_key ic >>= fun key ->
_delete driver key >>= fun () ->
Client_protocol.response_ok_unit oc
| _ -> Client_protocol.handle_exception oc (Failure "Command not implemented (yet)")
| RANGE_ENTRIES ->
Llio.input_bool ic >>= fun allow_dirty ->
Llio.input_string_option ic >>= fun bkey ->
Llio.input_bool ic >>= fun finc ->
Llio.input_string_option ic >>= fun ekey ->
Llio.input_bool ic >>= fun einc ->
Client_protocol.response_ok_unit oc


(* | _ -> Client_protocol.handle_exception oc (Failure "Command not implemented (yet)") *)

let protocol driver store (ic,oc) =
let protocol me driver store (ic,oc) =
let rec loop () =
begin
one_command driver store (ic,oc) >>= fun stop ->
one_command me driver store (ic,oc) >>= fun stop ->
if stop
then Lwtc.log "end of session"
else
Expand Down
11 changes: 5 additions & 6 deletions src/hope/dispatcher.ml
Expand Up @@ -59,9 +59,9 @@ module ADispatcher (S:STORE) = struct
)


let store_lease t m =
let store_lease t mo =
let buf = Buffer.create 32 in
Llio.string_to buf m;
Llio.string_option_to buf mo;
let s = Buffer.contents buf in
S.set_meta t.store s

Expand Down Expand Up @@ -120,12 +120,11 @@ module ADispatcher (S:STORE) = struct
| A_CLIENT_REPLY (w, r) ->
safe_wakeup w r >>= fun () ->
Lwt.return s
| A_STORE_LEASE m ->
store_lease t m >>= fun () ->
| A_STORE_LEASE mo ->
store_lease t mo >>= fun () ->
Lwt.return
{ s with
master_id = Some m;
is_lease_active = true;
master_id = mo;
}
| A_RESYNC (tgt, n, m) ->
let resync = Hashtbl.find t.resyncs tgt in
Expand Down
31 changes: 12 additions & 19 deletions src/hope/mp.ml
Expand Up @@ -197,7 +197,6 @@ module MULTI = struct
prop : update option;
votes : node_id list;
election_votes : election_votes;
is_lease_active : bool;
constants : paxos_constants;
cur_cli_req : request_awakener option;
valid_inputs : msg_channel list;
Expand All @@ -214,7 +213,6 @@ module MULTI = struct
prop = u;
votes = [];
election_votes = {nnones = []; nsomes = []};
is_lease_active = false;
constants = c;
cur_cli_req = None;
valid_inputs = ch_all;
Expand All @@ -227,11 +225,7 @@ module MULTI = struct
| S_SLAVE -> "S_SLAVE"

let state2s s =
let m =
if s.is_lease_active
then (so2s s.master_id)
else "None"
in
let m = so2s s.master_id in
Printf.sprintf "id: %s, n: %s, m: %s, p: %s, a: %s, state: %s, master: %s, votes: %d"
s.constants.me (tick2s s.round) (tick2s s.extensions) (tick2s s.proposed) (tick2s s.accepted)
(state_n2s s.state_n) m (List.length s.votes)
Expand Down Expand Up @@ -270,7 +264,7 @@ module MULTI = struct
| A_LOG_UPDATE of tick * update
| A_START_TIMER of tick * tick * float
| A_CLIENT_REPLY of client_reply
| A_STORE_LEASE of node_id
| A_STORE_LEASE of node_id option

let action2s = function
| A_RESYNC (src, n, m) ->
Expand All @@ -290,7 +284,7 @@ module MULTI = struct
| A_CLIENT_REPLY rep ->
Printf.sprintf "A_CLIENT_REPLY (r: %s)" (client_reply2s rep)
| A_STORE_LEASE m ->
Printf.sprintf "A_STORE_LEASE (m:%s)" m
Printf.sprintf "A_STORE_LEASE (m:%s)" (so2s m)

type step_result =
| StepFailure of string
Expand Down Expand Up @@ -388,7 +382,7 @@ module MULTI = struct

let send_promise state src n m i =
begin
if state.is_lease_active && not (is_node_master src state)
if (state.master_id <> None) && not (is_node_master src state)
then
StepSuccess( [], state )
else
Expand Down Expand Up @@ -565,7 +559,7 @@ module MULTI = struct
let actions =
(bcast_mset_actions state)
@
(A_STORE_LEASE (state.constants.me )
(A_STORE_LEASE (Some state.constants.me )
:: postmortem_actions)
in
StepSuccess(actions, new_state)
Expand Down Expand Up @@ -677,29 +671,28 @@ module MULTI = struct
end

let start_elections new_n m state =
let sn, la =
let sn, mid =
begin
match state.state_n with
| S_MASTER ->
if state.round = new_n
then (S_MASTER, true)
else (S_RUNNING_FOR_MASTER, false)
| _ -> (S_RUNNING_FOR_MASTER, false)
then (S_MASTER, Some state.constants.me)
else (S_RUNNING_FOR_MASTER, None)
| _ -> (S_RUNNING_FOR_MASTER, None)
end
in
let msg = M_PREPARE (state.constants.me, new_n, m, (next_tick state.accepted)) in
let new_state = {
state with
round = new_n;
extensions = m;
master_id = Some state.constants.me;
master_id = mid;
state_n = sn;
votes = [];
is_lease_active = la;
election_votes = {nnones=[]; nsomes=[]};
} in
let lease_expiry = build_lease_timer new_n m (state.constants.lease_duration /. 2.0 ) in
StepSuccess([A_BROADCAST_MSG msg; lease_expiry], new_state)
StepSuccess([A_STORE_LEASE mid; A_BROADCAST_MSG msg; lease_expiry], new_state)

let handle_lease_timeout n m state =
let diff = state_cmp n start_tick state in
Expand Down Expand Up @@ -758,7 +751,7 @@ module MULTI = struct
| S_SLAVE when state.round = n ->
begin
let d = state.constants.lease_duration in
let l = A_STORE_LEASE src in
let l = A_STORE_LEASE (Some src) in
let t = A_START_TIMER (n, m, d) in
StepSuccess ([l;t], state)
end
Expand Down

0 comments on commit 81472a3

Please sign in to comment.