Permalink
Browse files

Implements alternative protocols for leader_call.

Existing was 'leader_only'. New 'local_sync' protocol ensures that operations will be synced on the originators node before the leader_call function returns.
  • Loading branch information...
1 parent 3938fd5 commit 15d49f52d3f16b8c6315ecfbd8a3836a4b2b6f85 @archaelus archaelus committed Aug 3, 2011
Showing with 75 additions and 12 deletions.
  1. +75 −12 src/gl_async_bully.erl
@@ -43,6 +43,7 @@
%% API
-export([start_link/4
,leader_call/2
+ ,leader_call/3
,leader_cast/2
,call/2
,cast/2
@@ -127,6 +128,8 @@
-define(ALPHA_TIME, timer:seconds(10)).
+-type lc_proto() :: 'leader_only' | 'local_sync'.
+
%%====================================================================
%% API
%%====================================================================
@@ -181,8 +184,13 @@ to_other_followers(ExceptNode, Msg, CI)
-spec leader_call(atom(), term()) -> any().
leader_call(Name, Msg) ->
+ leader_call(Name, Msg, local_sync).
+
+-spec leader_call(atom(), term(), lc_proto()) -> any().
+leader_call(Name, Msg, Proto) when Proto =:= local_sync;
+ Proto =:= leader_only ->
gen_fsm:sync_send_all_state_event(Name,
- {leader_call, Msg}).
+ {leader_call, Proto, Msg}).
-spec leader_cast(atom() | cluster_info(), term()) -> any().
leader_cast(Name, Msg) when is_atom(Name) ->
@@ -262,7 +270,7 @@ init([Name, Mod, Arg, Net]) when is_list(Net) ->
%%--------------------------------------------------------------------
recovery(timeout, State) ->
- start_stage2(State#state{incarn = erlang:now()}).
+ start_stage2(reincarnate(State)).
norm(_Evt, State) -> {next_state, norm, State}.
elec2(_Evt, State) -> {next_state, elec2, State}.
@@ -411,6 +419,20 @@ handle_event({from_leader, Ldr, Event}, StateName, State) ->
{next_state, StateName, State}
end;
+handle_event({from_leader, Ldr, Event, {local_sync_reply, From, Reply}},
+ StateName, State) ->
+ %% Strip local_sync_reply info from message and pass through to
+ %% regular from_leader processing. Return value from regular
+ %% processing.
+ case handle_event({from_leader, Ldr, Event}, StateName, State) of
+ {next_state, _StateName, _NewState} = CBReturn ->
+ gen_fsm:reply(From, Reply),
+ CBReturn;
+ Else ->
+ Else
+ end;
+
+
handle_event(Msg, StateName, State) ->
?INFO("~p: ignored ~p", [StateName, Msg]),
{next_state, StateName, State}.
@@ -466,12 +488,12 @@ handle_event(Msg, StateName, State) ->
handle_sync_event(force_recovery, From, _StateName, State) ->
gen_fsm:reply(From, ok),
- start_stage2(State#state{incarn = erlang:now()});
+ start_stage2(reincarnate(State));
-handle_sync_event({leader_call, Call}, From, StateName, State) ->
+handle_sync_event({leader_call, LCProto, Call}, From, StateName, State) ->
case State#state.leader of
Node when node() =:= Node ->
- case ms_call(handle_leader_call, [Call], From, State) of
+ case ms_call(LCProto, handle_leader_call, [Call], From, State) of
{noreply, NewState} ->
{next_state, StateName, NewState};
{stop, Reason, NewState} ->
@@ -481,12 +503,12 @@ handle_sync_event({leader_call, Call}, From, StateName, State) ->
%% Fake a gen_fsm sync_send_all_state_event.
erlang:send(server_on(Node, State),
{'$gen_sync_all_state_event', From,
- {leader_call, Call}}),
+ {leader_call, LCProto, Call}}),
{next_state, StateName, State}
end;
handle_sync_event({call, Call}, From, StateName, State) ->
- case ms_call(handle_call, [Call], From, State) of
+ case ms_call(call, handle_call, [Call], From, State) of
{noreply, NewState} ->
{next_state, StateName, NewState};
{stop, Reason, NewState} ->
@@ -654,7 +676,7 @@ ms_event(Function, Args, S = #state{ms={Mod,ModS}})
%% You're only allowed to send Sync terms
{ok, SyncBcast, NewModS} when node() =:= S#state.leader ->
NewState = S#state{ms={Mod, NewModS}},
- to_peers(SyncBcast, NewState),
+ sync_bcast(SyncBcast, NewState),
{ok, NewState};
{ok, _, NewModS} when node() =/= S#state.leader ->
{stop, {?MODULE, callback_error,
@@ -664,7 +686,10 @@ ms_event(Function, Args, S = #state{ms={Mod,ModS}})
{stop, Reason, S#state{ms={Mod, NewModS}}}
end.
-ms_call(Function, Args, From, S = #state{ms={Mod,ModS}})
+-spec ms_call('call' | lc_proto(),
+ atom(), list(), term(), #state{}) ->
+ term().
+ms_call(LCProto, Function, Args, From, S = #state{ms={Mod,ModS}})
when is_atom(Function), is_list(Args), is_tuple(From) ->
case apply(Mod, Function,
Args ++ [From, cluster_info(S), ModS]) of
@@ -674,11 +699,18 @@ ms_call(Function, Args, From, S = #state{ms={Mod,ModS}})
gen_fsm:reply(From, Reply),
{noreply, S#state{ms={Mod, NewModS}}};
{reply, Reply, SyncBcast, NewModS}
- when node() =:= S#state.leader ->
+ when node() =:= S#state.leader,
+ LCProto =:= leader_only ->
NewState = S#state{ms={Mod, NewModS}},
- to_peers(SyncBcast, NewState),
+ sync_bcast(SyncBcast, NewState),
gen_fsm:reply(From, Reply),
{noreply, NewState};
+ {reply, Reply, SyncBcast, NewModS}
+ when node() =:= S#state.leader,
+ LCProto =:= local_sync ->
+ NewState = S#state{ms={Mod, NewModS}},
+ local_sync_bcast(From, Reply, SyncBcast, NewState),
+ {noreply, NewState};
{reply, _Reply, _Broadcast, NewModS}
when node() =/= S#state.leader ->
{stop, {?MODULE, callback_error,
@@ -700,14 +732,40 @@ cluster_info(#state{leader=Node,
%% Fake a gen_fsm:send_all_state_event to avoid a trip through
%% net_kernel:connect.
-to_peers(Msg, S = #state{acks=Acks}) ->
+sync_bcast(Msg, S = #state{acks=Acks}) ->
[ erlang:send(server_on(Node, S),
{'$gen_all_state_event', {from_leader, node(), Msg}},
[noconnect])
|| Node <- ordsets:from_list(Acks),
Node =/= node(),
lists:member(Node, nodes()) ].
+%% @doc
+%% Sync broadcast for the local_sync leader_call protocol. Causes one
+%% peer (on the same machine as the caller) to send the reply locally
+%% after processing the SyncBcast result.
+local_sync_bcast(From = {Pid, _Tag}, Reply, Msg, S = #state{acks=Acks}) ->
+ FromNode = node(Pid),
+ case node() =:= FromNode of
+ true ->
+ %% Can send reply here as our local processing is already
+ %% finished by the time we get here.
+ gen_fsm:reply(From, Reply);
+ false ->
+ erlang:send(server_on(FromNode, S),
+ {'$gen_all_state_event',
+ {from_leader, node(), Msg,
+ {local_sync_reply, From, Reply}}},
+ [noconnect])
+ end,
+ [ erlang:send(server_on(Node, S),
+ {'$gen_all_state_event', {from_leader, node(), Msg}},
+ [noconnect])
+ || Node <- ordsets:from_list(Acks),
+ Node =/= node(),
+ Node =/= FromNode,
+ lists:member(Node, nodes()) ].
+
format_status(Fmt, [_Dict, S = #state{name=Name,
elid=Elid}]) ->
[{name, Name},
@@ -738,3 +796,8 @@ format_mod_status(Fmt, #state{ms={Mod, ModS}}) ->
%% @doc Format a control_message from the local node.
control_message(Msg) ->
{gl_async_bully, node(), Msg}.
+
+%% @doc generate new incarnation id strictly greather than the old
+%% one.
+reincarnate(State = #state{}) ->
+ State#state{incarn = erlang:now()}.

0 comments on commit 15d49f5

Please sign in to comment.