Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Merge /Users/nem/ngmoco/ngproc/deps/gl_async_bully

  • Loading branch information...
commit 747e9d3b08fb6e110c6df7ac31800471203d20e3 2 parents 91c8529 + f89d852
@archaelus archaelus authored
View
4 .gitignore
@@ -0,0 +1,4 @@
+ebin/*.beam
+ebin/*.app
+*~
+erl_crash.dump
View
67 include/gl_async_bully_specs.hrl
@@ -0,0 +1,67 @@
+
+-type event_return(State) :: {'ok', State} |
+ {'ok', Sync::term(), State} |
+ {stop, Reason::term(), State}.
+
+-type call_return(State) :: {'reply', Reply::term(), State} |
+ {'reply', Reply::term(), Broadcast::term(), State} |
+ {'noreply', State} |
+ {'stop', Reason::term(), Reply::term(), State}.
+
+-spec init (Args::term()) ->
+ {'ok', State::term()} |
+ 'ignore' |
+ {'stop', Reason::term()}.
+
+%% elected(_,_,node()) - captured a single candidate
+%% reply notifies only the new candidate of the
+%% sync state, ok notifies all candidates
+%% elected(_,_,undefined) - won a general election
+%% ok broadcasts sync to all candidates
+-spec elected(gl_async_bully:cluster_info(),
+ State
+ ) -> event_return(State).
+
+-spec handle_leader_call(Request::term(),
+ From::{pid(), reference()},
+ gl_async_bully:cluster_info(),
+ State
+ ) -> call_return(State).
+
+-spec handle_leader_cast(Msg::term(),
+ gl_async_bully:cluster_info(),
+ State
+ ) -> event_return(State).
+
+-spec from_leader(Msg::term(),
+ gl_async_bully:cluster_info(),
+ State
+ ) -> event_return(State).
+
+-spec handle_call(Request::term(),
+ From::{pid(), reference()},
+ gl_async_bully:cluster_info(),
+ State
+ ) -> call_return(State).
+
+-spec handle_cast(Msg::term(),
+ gl_async_bully:cluster_info(),
+ State
+ ) -> event_return(State).
+
+-spec handle_info(Msg::term(),
+ gl_async_bully:cluster_info(),
+ State
+ ) -> event_return(State).
+
+-spec terminate(Reason::term(),
+ gl_async_bully:cluster_info(),
+ State::term()
+ ) -> any().
+
+-spec code_change(OldVsn::term(),
+ State,
+ Extra::term()) -> {'ok', State}.
+
+-spec format_status('normal' | 'terminate',
+ [proplists:proplist(), State::term()]) -> list().
View
503 src/gl_async_bully.erl
@@ -4,10 +4,34 @@
%% @version {@vsn}, {@date} {@time}
%% @doc Attempts to implement the 'Asynchronous Bully Algorithm' from
%% Scott D. Stoller's 1997 paper 'Leader Election in Distributed
-%% Systems with Crash Failures' [http://www.cs.sunysb.edu/~stoller/leader-election.html]
+%% Systems with Crash Failures'
+%% [http://www.cs.sunysb.edu/~stoller/leader-election.html]
%%
%% Notes:
-%% Our failure detector is built on erlang:monitor_node
+%% Our failure detector is built on erlang:monitor_node to detect
+%% peer nodes as they connect and disconnect from us. On receiving
+%% an up message from a peer we then monitor the leader election
+%% control process on the new node (in the case that we are
+%% interested in setting a FD on that node).
+%%
+%% Protocols:
+%% 1) Leader election.
+%% XXX - fill in rest of section.
+%%
+%% Ideas:
+%% * Calls currently need to pass through the local node in order to
+%% get to the leader. If this serialization of client calls
+%% through the local node causes problems due to the high message
+%% volume swamping the control messages, it may be necessary to
+%% introduce an ets table containing the current identity of the
+%% leader, or a second process that tracks the leader identity and
+%% forwards calls appropriately.
+%% * Another plan would be to split the control functionality into a
+%% separate registered process. leader_call traffic could be split
+%% into a third process if needed. This would give a process model
+%% of control, message routing and API client processing in
+%% separate processes. Would be tricky to implement and require
+%% complex coordination.
%% @end
%%%-------------------------------------------------------------------
-module(gl_async_bully).
@@ -18,8 +42,9 @@
-include_lib("eunit/include/eunit.hrl").
%% API
--export([start_link/3
+-export([start_link/4
,leader_call/2
+ ,leader_call/3
,leader_cast/2
,call/2
,cast/2
@@ -29,7 +54,13 @@
%% For callback modules
-export([role/1
,leader/1
- ,broadcast/2]).
+ ,peers/1
+ ,live_peers/1
+ ,broadcast/2
+ ,to_follower/3
+ ,to_followers/2
+ ,to_other_followers/3
+ ]).
-export([behaviour_info/1]).
@@ -41,8 +72,17 @@
%% gen_fsm callbacks
-export([init/1, handle_event/3,
- handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
-
+ handle_sync_event/4, handle_info/3, terminate/3, code_change/4,
+ format_status/2]).
+
+%% @doc
+%% Modification to Stoller's algorithm. Instead of using a
+%% monotonically increasing integer preserved in stable storage across
+%% incarnations of this process, we use the result of erlang:now()
+%% captured at process recovery time. This will be unique to a node,
+%% will not go backwards and in Erlang can be compared in the same
+%% manner a single integer would be in Stoller's paper. This allows us
+%% to avoid the stable storage requirement.
-type incarnation() :: {non_neg_integer(),
non_neg_integer(),
non_neg_integer()}.
@@ -62,58 +102,114 @@
%% Modifications
peers = ordsets:new() :: ordsets:ordset(node()),
%% Failure detector
- fd = ordsets:new() :: ordsets:ordset(node()),
+ fd :: glab_fd:fd_set(),
%% Client Mod/State
- ms = undefined
+ ms = undefined,
+ name = ?MODULE :: atom()
}).
--type proto_messages() :: {halt, election_id()} |
- {ack, election_id()} |
- {rej, election_id()} |
- {norm_p, election_id()} |
- {leader, election_id()} |
- {not_norm, election_id()}.
+%% @doc Leader election protocol message.
+-type proto_message() :: {halt, election_id()} |
+ {ack, election_id()} |
+ {rej, election_id()} |
+ {norm_p, election_id()} |
+ {not_norm, election_id()} |
+ {leader, election_id()}.
+
+%% @doc generic leader election protocol message. Includes ID of sender.
+-type control_message() :: {'gl_async_bully',
+ Sender::node(),
+ proto_message()}.
-type proto_states() :: norm | wait | elec2.
--opaque cluster_info() :: {Leader::node(), Peers::[node()]}.
+-opaque cluster_info() :: {Leader::node(), Peers::[node()], Name::atom()}.
-export_type([cluster_info/0]).
-define(ALPHA_TIME, timer:seconds(10)).
+-type lc_proto() :: 'leader_only' | 'local_sync'.
+
%%====================================================================
%% API
%%====================================================================
-start_link(Mod, Arg, Net) when is_list(Net) ->
- gen_fsm:start_link({local, ?MODULE}, ?MODULE, [Mod, Arg, Net], []).
+start_link(Name, Mod, Arg, Net) when is_list(Net) ->
+ gen_fsm:start_link({local, Name}, ?MODULE, [Name, Mod, Arg, Net], []).
+
+-spec role(cluster_info()) -> 'leader' | 'follower'.
+role({Leader, _, _}) when Leader =:= node() -> leader;
+role({_,_,_}) -> follower.
+
+-spec leader(cluster_info()) -> node().
+leader({Leader,_,_}) -> Leader.
+-spec peers(cluster_info()) -> [node()].
+peers({_Ldr, Peers, _}) -> Peers.
+-spec live_peers(cluster_info()) -> [node()].
+live_peers(CI) ->
+ [ P || P <- peers(CI),
+ lists:member(P, nodes())].
--spec role(cluster_info()) -> 'leader' | 'candidate'.
-role({Leader, _}) when Leader =:= self() -> leader;
-role({_,_}) -> candidate.
+-spec broadcast(term(), cluster_info()) -> 'ok'.
+broadcast(Msg, CI) ->
+ [erlang:send(server_on(Peer, CI), Msg)
+ || Peer <- live_peers(CI),
+ Peer =/= node()],
+ ok.
-leader({Leader,_}) -> Leader.
+-spec to_follower(node(), term(), cluster_info()) -> 'ok'.
+to_follower(FNode, Msg, CI) ->
+ leader = role(CI),
+ gen_fsm:send_all_state_event(server_on(FNode, CI),
+ {from_leader, node(), Msg}),
+ ok.
--spec broadcast(term(), cluster_info()) -> 'ok'.
-broadcast(Msg, {_Ldr, Peers}) ->
- [server_on(Peer) ! Msg || Peer <- Peers,
- Peer =/= node()],
+-spec to_followers(term(), cluster_info()) -> 'ok'.
+to_followers(Msg, CI) ->
+ leader = role(CI),
+ [to_follower(Node, Msg, CI)
+ || Node <- live_peers(CI),
+ Node =/= node()],
+ ok.
+
+-spec to_other_followers(node(), term(), cluster_info()) -> 'ok'.
+to_other_followers(ExceptNode, Msg, CI)
+ when is_atom(ExceptNode) ->
+ leader = role(CI),
+ [to_follower(Node, Msg, CI)
+ || Node <- live_peers(CI),
+ Node =/= node(),
+ Node =/= ExceptNode],
ok.
-leader_call(Name, Msg) ->
- gen_fsm:sync_send_all_state_event(server_on(Name),
- {leader_call, Msg}).
-leader_cast(Name, Msg) ->
- gen_fsm:send_all_state_event(server_on(Name),
+-spec leader_call(atom(), term()) -> any().
+leader_call(Name, Msg) ->
+ leader_call(Name, Msg, local_sync).
+
+-spec leader_call(atom(), term(), lc_proto()) -> any().
+leader_call(Name, Msg, Proto) when Proto =:= local_sync;
+ Proto =:= leader_only ->
+ gen_fsm:sync_send_all_state_event(Name,
+ {leader_call, Proto, Msg}).
+
+-spec leader_cast(atom() | cluster_info(), term()) -> any().
+leader_cast(Name, Msg) when is_atom(Name) ->
+ gen_fsm:send_all_state_event(Name,
+ {leader_cast, Msg});
+leader_cast({Leader,_,_} = CI, Msg) ->
+ gen_fsm:send_all_state_event(server_on(Leader, CI),
{leader_cast, Msg}).
+
+-spec call(atom(), term()) -> any().
call(Name, Msg) ->
- gen_fsm:sync_send_all_state_event(server_on(Name),
+ gen_fsm:sync_send_all_state_event(Name,
{call, Msg}).
+-spec cast(atom(), term()) -> any().
cast(Name, Msg) ->
- gen_fsm:send_all_state_event(server_on(Name),
+ gen_fsm:send_all_state_event(Name,
{cast, Msg}).
reply(From, Msg) ->
@@ -128,7 +224,8 @@ behaviour_info(callbacks) ->
{handle_leader_cast, 3},
{handle_info, 3},
{elected, 2},
- {code_change, 4},
+ {surrendered, 3},
+ {code_change, 3},
{terminate, 3}
];
behaviour_info(_) ->
@@ -146,13 +243,15 @@ behaviour_info(_) ->
%% gen_fsm:start_link/3,4, this function is called by the new process to
%% initialize.
%%--------------------------------------------------------------------
-init([Mod, Arg, Net]) when is_list(Net) ->
+init([Name, Mod, Arg, Net]) when is_list(Net) ->
case Mod:init(Arg) of
{ok, ModS} ->
timer:send_interval(?ALPHA_TIME, periodically),
net_kernel:monitor_nodes(true, [{node_type, visible}]),
{ok, recovery, #state{peers=ordsets:from_list(Net),
- ms={Mod, ModS}}, 0};
+ ms={Mod, ModS},
+ name=Name,
+ fd=glab_fd:new(Name)}, 0};
Else ->
Else
end.
@@ -172,23 +271,24 @@ init([Mod, Arg, Net]) when is_list(Net) ->
%%--------------------------------------------------------------------
recovery(timeout, State) ->
- start_stage2(State#state{incarn = erlang:now()}).
+ start_stage2(reincarnate(State)).
norm(_Evt, State) -> {next_state, norm, State}.
elec2(_Evt, State) -> {next_state, elec2, State}.
wait(_Evt, State) -> {next_state, wait, State}.
-halting(T, J, State = #state{peers = Peers}) ->
- [play_dead(N) || N <- ordsets:to_list(Peers),
- node() < N ],
- send(J, {ack, T}),
+halting(T, J, State = #state{peers = Peers, name = Name}) ->
+ [play_dead({Name, N}) || N <- ordsets:to_list(Peers),
+ node() < N ],
+ send(J, {ack, T}, State),
{next_state, wait, start_fd(J, State#state{elid=T})}.
start_stage2(State = #state{incarn = Incarn,
nextel = Nextel,
- peers = Peers}) ->
- [play_alive(N) || N <- ordsets:to_list(Peers),
- N > node() ],
+ peers = Peers,
+ name = Name}) ->
+ [play_alive({Name, N}) || N <- ordsets:to_list(Peers),
+ N > node() ],
NewState = State#state{elid = {node(), Incarn, Nextel},
nextel = Nextel + 1,
acks = ordsets:new(),
@@ -201,12 +301,11 @@ contin_stage2(State = #state{pendack = P,
case max_p(Peers) of
N when P < N ->
Pendack = next_p(P, Peers),
- send(Pendack, {halt, Elid}),
+ send(Pendack, {halt, Elid}, State),
{next_state, elec2, start_fd(Pendack,State#state{pendack=Pendack})};
_ ->
%% I'm the leader.
- ?INFO("I ~p became the leader.", [node()]),
- [send(Node, {leader, Elid})
+ [send(Node, {leader, Elid}, State)
|| Node <- ordsets:to_list(State#state.acks)],
NewState = State#state{leader=node()},
case ms_event(elected, [], NewState) of
@@ -219,64 +318,72 @@ contin_stage2(State = #state{pendack = P,
periodically(norm, #state{leader=Self,
peers=Peers,
- elid=Elid}) when Self =:= node() ->
- [ send(P, {norm_p, Elid})
+ elid=Elid} = State)
+ when Self =:= node() ->
+ %% norm_p query also sent from nodeup handle_info
+ [ send(P, {norm_p, Elid}, State)
|| P <- ordsets:to_list(Peers),
P > node() ];
periodically(_StateName, _State) ->
ok.
-handle_event({?MODULE, J, {halt, T}}, StateName,
+handle_event({gl_async_bully, J, {halt, T}}, StateName,
State = #state{leader=Ldr,
elid=Elid})
when StateName =:= norm, Ldr < J;
StateName =:= wait, element(1, Elid) < J ->
- send(J, {rej, T}),
+ send(J, {rej, T}, State),
{next_state, StateName, State};
-handle_event({?MODULE, J, {halt, T}}, _StateName,
+handle_event({gl_async_bully, J, {halt, T}}, _StateName,
State = #state{}) ->
halting(T, J, State);
-handle_event({?MODULE, J, downsig}, StateName,
+handle_event({gl_async_bully, J, downsig}, StateName,
State = #state{elid=Elid,
leader=Ldr})
when StateName =:= norm, J =:= Ldr;
StateName =:= wait, J =:= element(1, Elid) ->
start_stage2(State);
-handle_event({?MODULE, J, downsig}, elec2,
+handle_event({gl_async_bully, J, downsig}, elec2,
State = #state{pendack=J}) ->
contin_stage2(State);
-handle_event({?MODULE, J, {rej, _T}}, elec2,
+handle_event({gl_async_bully, J, {rej, _T}}, elec2,
State = #state{pendack=J}) ->
contin_stage2(State);
-handle_event({?MODULE, J, {ack, T}}, elec2,
+handle_event({gl_async_bully, J, {ack, T}}, elec2,
State = #state{elid=T,
pendack=J,
acks=Acks}) ->
contin_stage2(State#state{acks = ordsets:add_element(J, Acks)});
-handle_event({?MODULE, J, {leader, T}}, wait,
+handle_event({gl_async_bully, J, {leader, T}}, wait,
State = #state{elid=T}) ->
- {next_state, norm, set_fds([J], State#state{leader=J})};
+ NewState = set_fds([J], State#state{leader=J}),
+ case ms_event(surrendered, [J], NewState) of
+ {ok, NewState2} ->
+ {next_state, norm, NewState2};
+ {stop, _Reason, _NewState2} = Stop ->
+ Stop
+ end;
-handle_event({?MODULE, J, {norm_p, T}}, StateName,
+handle_event({gl_async_bully, J, {norm_p, T}}, StateName,
State = #state{elid=Elid,
leader = Leader})
when StateName =/= norm, J < element(1, Elid);
StateName =:= norm, J < Leader ->
- send(J, {not_norm, T}),
+ send(J, {not_norm, T}, State),
{next_state, StateName, State};
-handle_event({?MODULE, J, {norm_p, T}}, StateName,
+handle_event({gl_async_bully, J, {norm_p, T}}, StateName,
State = #state{elid = T,
leader = J}) ->
%% Everything is fine.
{next_state, StateName, State};
-handle_event({?MODULE, _J, {not_norm, T}}, norm,
+handle_event({gl_async_bully, _J, {not_norm, T}}, norm,
State = #state{elid = T,
leader = Self})
when Self =:= node() ->
@@ -288,15 +395,45 @@ handle_event({leader_cast, Cast}, StateName, State) ->
case ms_event(handle_leader_cast, [Cast], State) of
{ok, NewState} ->
{next_state, StateName, NewState};
- {stop, Reason, NewState} ->
- {stop, Reason, NewState}
+ {stop, _Reason, _NewState} = Err -> Err
end;
Node ->
- gen_fsm:send_all_state_event(server_on(Node),
+ gen_fsm:send_all_state_event(server_on(Node, State),
{leader_cast, Cast}),
{next_state, StateName, State}
end;
+handle_event({from_leader, Ldr, Event}, StateName, State) ->
+ case State#state.leader of
+ Ldr when node() =/= Ldr ->
+ case ms_event(from_leader, [Event], State) of
+ {ok, NewState} ->
+ {next_state, StateName, NewState};
+ {stop, _Reason, _NewState} = Err -> Err
+ end;
+ _ ->
+ %% XXX - should we have a callback to allow a client
+ %% module to deal with leader events from a stale leader
+ ?ERR("Received a from_leader event "
+ "- stale, or incorrectly sent to self.~p",
+ [Event]),
+ {next_state, StateName, State}
+ end;
+
+handle_event({from_leader, Ldr, Event, {local_sync_reply, From, Reply}},
+ StateName, State) ->
+ %% Strip local_sync_reply info from message and pass through to
+ %% regular from_leader processing. Return value from regular
+ %% processing.
+ case handle_event({from_leader, Ldr, Event}, StateName, State) of
+ {next_state, _StateName, _NewState} = CBReturn ->
+ gen_fsm:reply(From, Reply),
+ CBReturn;
+ Else ->
+ Else
+ end;
+
+
handle_event(Msg, StateName, State) ->
?INFO("~p: ignored ~p", [StateName, Msg]),
{next_state, StateName, State}.
@@ -350,26 +487,31 @@ handle_event(Msg, StateName, State) ->
%% the event.
%%--------------------------------------------------------------------
-handle_sync_event({leader_call, Call}, From, StateName, State) ->
+handle_sync_event(force_recovery, From, _StateName, State) ->
+ gen_fsm:reply(From, ok),
+ start_stage2(reincarnate(State));
+
+handle_sync_event({leader_call, LCProto, Call}, From, StateName, State) ->
case State#state.leader of
Node when node() =:= Node ->
- case ms_call(handle_leader_call, [Call], From, State) of
+ case ms_call(LCProto, handle_leader_call, [Call], From, State) of
{noreply, NewState} ->
- {noreply, StateName, NewState};
+ {next_state, StateName, NewState};
{stop, Reason, NewState} ->
{stop, Reason, NewState}
end;
Node ->
%% Fake a gen_fsm sync_send_all_state_event.
- server_on(Node) ! {'$gen_sync_all_state_event', From,
- {leader_call, Call}},
+ erlang:send(server_on(Node, State),
+ {'$gen_sync_all_state_event', From,
+ {leader_call, LCProto, Call}}),
{next_state, StateName, State}
end;
handle_sync_event({call, Call}, From, StateName, State) ->
- case ms_call(handle_call, [Call], From, State) of
+ case ms_call(call, handle_call, [Call], From, State) of
{noreply, NewState} ->
- {noreply, StateName, NewState};
+ {next_state, StateName, NewState};
{stop, Reason, NewState} ->
{stop, Reason, NewState}
end;
@@ -388,21 +530,29 @@ handle_sync_event(_Event, _From, StateName, State) ->
%% (or a system message).
%%--------------------------------------------------------------------
+handle_info({'DOWN', _, _, _, _} = Msg, StateName, State = #state{}) ->
+ %% Check if this nodedown is relevant to the
+ %% leader election algorithm
+ case filter_nodedown(Msg, State) of
+ {{down, J}, NewState} ->
+ handle_event({gl_async_bully, J, downsig},
+ StateName, NewState);
+ {ignore, NewState} ->
+ case ms_event(handle_info, [Msg], State) of
+ {ok, NewState} ->
+ {next_state, StateName, NewState};
+ {stop, Reason, NewState} ->
+ {stop, Reason, NewState}
+ end
+ end;
+
handle_info({nodedown, J, _Info} = Msg, StateName, State = #state{}) ->
case is_peer(J, State) of
peer ->
%% Pass event through to callback
case ms_event(handle_info, [Msg], State) of
{ok, NewState} ->
- %% Check if this nodedown is relevant to the
- %% leader election algorithm
- case filter_nodedown(J, State) of
- nodedown ->
- handle_event({?MODULE, J, downsig},
- StateName, NewState);
- ignore ->
- {next_state, StateName, NewState}
- end;
+ {next_state, StateName, NewState};
{stop, Reason, NewState} ->
{stop, Reason, NewState}
end;
@@ -411,6 +561,23 @@ handle_info({nodedown, J, _Info} = Msg, StateName, State = #state{}) ->
{next_state, StateName, State}
end;
+handle_info({nodeup, J, _Info} = Msg, StateName = norm,
+ State = #state{leader = Self,
+ elid = Elid,
+ peers = Peers})
+ when Self =:= node() ->
+ case lists:member(J, Peers) of
+ true ->
+ send(J, {norm_p, Elid}, State);
+ _ -> ok
+ end,
+ case ms_event(handle_info, [Msg], State) of
+ {ok, NewState} ->
+ {next_state, StateName, NewState};
+ {stop, Reason, NewState} ->
+ {stop, Reason, NewState}
+ end;
+
handle_info(periodically, StateName, State) ->
periodically(StateName, State),
{next_state, StateName, State};
@@ -430,8 +597,8 @@ handle_info(Info, StateName, State) ->
%% necessary cleaning up. When it returns, the gen_fsm terminates with
%% Reason. The return value is ignored.
%%--------------------------------------------------------------------
-terminate(Reason, StateName, S = #state{ms={_Mod, _ModS}}) ->
- ms_event(terminate, [Reason], S),
+terminate(Reason, StateName, S = #state{ms={Mod, ModS}}) ->
+ Mod:terminate(Reason, cluster_info(S), ModS),
terminate(Reason, StateName, S#state{ms=undefined});
terminate(_Reason, _StateName, #state{ms=undefined}) ->
ok.
@@ -441,24 +608,37 @@ terminate(_Reason, _StateName, #state{ms=undefined}) ->
%% code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, NewState}
%% Description: Convert process state when code is changed
%%--------------------------------------------------------------------
-code_change(_OldVsn, StateName, State, _Extra) ->
- {ok, StateName, State}.
+code_change(OldVsn, StateName, State = #state{ms={Mod,ModS}}, Extra) ->
+ {ok, NewModS} = Mod:code_change(OldVsn, ModS, Extra),
+ {ok, StateName, State#state{ms={Mod,NewModS}}}.
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------
-send(Node, Msg) ->
- gen_fsm:send_all_state_event(server_on(Node), {?MODULE, node(), Msg}).
+-spec send(atom(), proto_message(), #state{}) -> any().
+send(Node, Msg, #state{name=Name})
+ when is_atom(Node), is_atom(Name) ->
+ gen_fsm:send_all_state_event({Name, Node}, control_message(Msg)).
-play_dead(Node) ->
- server_on(Node) ! {nodedown, node(), [{?MODULE, {playing_dead, node()}}]}.
+-spec play_dead({atom(), atom()}) -> 'ok'.
+play_dead({Name, Node}) ->
+ erlang:send({Name, Node},
+ {nodedown, node(), [{gl_async_bully, {playing_dead, node()}}]}),
+ ok.
+
+-spec play_alive({atom(), atom()}) -> 'ok'.
+play_alive({Name, Node}) ->
+ erlang:send({Name, Node},
+ {nodeup, node(), [{gl_async_bully, {playing_alive, node()}}]}),
+ ok.
+
+server_on(Node, #state{name=Name}) ->
+ {Name, Node};
+server_on(Node, {_,_,Name}) when is_atom(Name) ->
+ {Name, Node}.
-play_alive(Node) ->
- server_on(Node) ! {nodeup, node(), [{?MODULE, {playing_dead, node()}}]}.
-server_on(Node) ->
- {?MODULE, Node}.
max_p(Nodes) -> lists:max(Nodes).
@@ -469,18 +649,18 @@ next_p(N, Nodes) ->
end.
start_fd(Node, State = #state{fd = FDs}) ->
- State#state{fd=ordsets:add_element(Node, FDs)}.
+ State#state{fd=glab_fd:start(Node, FDs)}.
set_fds(Nodes, State = #state{fd = FDs}) ->
- NewFDs = ordsets:union(FDs, ordsets:from_list(Nodes)),
- State#state{fd = NewFDs}.
-
-filter_nodedown(Node, #state{fd = FDs}) ->
- case ordsets:is_element(Node, FDs) of
- true ->
- nodedown;
- false ->
- ignore
+ State#state{fd = glab_fd:set(Nodes, FDs)}.
+
+filter_nodedown(Msg, State = #state{fd = FDs}) ->
+ case glab_fd:filter_DOWN(Msg, FDs) of
+ {ignore, _Reason, NewFDs} ->
+ {ignore, State#state{fd = NewFDs}};
+ {down, {Name, Node, Info}, NewFDs} ->
+ ?INFO("FD for ~p detected a failure: ~p", [{Name, Node}, Info]),
+ {{down, Node}, State#state{fd = NewFDs}}
end.
is_peer(Node, #state{peers=Peers}) ->
@@ -494,15 +674,23 @@ ms_event(Function, Args, S = #state{ms={Mod,ModS}})
case apply(Mod, Function, Args ++ [cluster_info(S), ModS]) of
{ok, NewModS} ->
{ok, S#state{ms={Mod, NewModS}}};
- {ok, Broadcast, NewModS} ->
+ %% You're only allowed to send Sync terms
+ {ok, SyncBcast, NewModS} when node() =:= S#state.leader ->
NewState = S#state{ms={Mod, NewModS}},
- broadcast(Broadcast, cluster_info(NewState)),
+ sync_bcast(SyncBcast, NewState),
{ok, NewState};
+ {ok, _, NewModS} when node() =/= S#state.leader ->
+ {stop, {?MODULE, callback_error,
+ sync_broadcast_when_not_leader},
+ S#state{ms={Mod, NewModS}}};
{stop, Reason, NewModS} ->
- {stop, Reason, S = #state{ms={Mod, NewModS}}}
+ {stop, Reason, S#state{ms={Mod, NewModS}}}
end.
-ms_call(Function, Args, From, S = #state{ms={Mod,ModS}})
+-spec ms_call('call' | lc_proto(),
+ atom(), list(), term(), #state{}) ->
+ term().
+ms_call(LCProto, Function, Args, From, S = #state{ms={Mod,ModS}})
when is_atom(Function), is_list(Args), is_tuple(From) ->
case apply(Mod, Function,
Args ++ [From, cluster_info(S), ModS]) of
@@ -511,21 +699,110 @@ ms_call(Function, Args, From, S = #state{ms={Mod,ModS}})
{reply, Reply, NewModS} ->
gen_fsm:reply(From, Reply),
{noreply, S#state{ms={Mod, NewModS}}};
- {reply, Reply, Broadcast, NewModS} ->
+ {reply, Reply, SyncBcast, NewModS}
+ when node() =:= S#state.leader,
+ LCProto =:= leader_only ->
NewState = S#state{ms={Mod, NewModS}},
- %% XXX Broadcast happens before reply - bug?
- broadcast(Broadcast, cluster_info(NewState)),
+ sync_bcast(SyncBcast, NewState),
gen_fsm:reply(From, Reply),
{noreply, NewState};
+ {reply, Reply, SyncBcast, NewModS}
+ when node() =:= S#state.leader,
+ LCProto =:= local_sync ->
+ NewState = S#state{ms={Mod, NewModS}},
+ local_sync_bcast(From, Reply, SyncBcast, NewState),
+ {noreply, NewState};
+ {reply, _Reply, _Broadcast, NewModS}
+ when node() =/= S#state.leader ->
+ {stop, {?MODULE, callback_error,
+ sync_broadcast_when_not_leader},
+ S#state{ms={Mod, NewModS}}};
{stop, Reason, NewModS} ->
- {stop, Reason, S = #state{ms={Mod, NewModS}}};
+ {stop, Reason, S#state{ms={Mod, NewModS}}};
{stop, Reply, Reason, NewModS} ->
gen_fsm:reply(From, Reply),
- {stop, Reason, S = #state{ms={Mod, NewModS}}}
+ {stop, Reason, S#state{ms={Mod, NewModS}}}
end.
-spec cluster_info(#state{}) -> cluster_info().
cluster_info(#state{leader=Node,
- peers=Peers}) ->
- {Node, Peers}.
+ peers=Peers,
+ name=Name}) ->
+ {Node, Peers, Name}.
+
+%% Fake a gen_fsm:send_all_state_event to avoid a trip through
+%% net_kernel:connect.
+sync_bcast(Msg, S = #state{acks=Acks}) ->
+ [ erlang:send(server_on(Node, S),
+ {'$gen_all_state_event', {from_leader, node(), Msg}},
+ [noconnect])
+ || Node <- ordsets:from_list(Acks),
+ Node =/= node(),
+ lists:member(Node, nodes()) ].
+
+%% @doc
+%% Sync broadcast for the local_sync leader_call protocol. Causes one
+%% peer (on the same machine as the caller) to send the reply locally
+%% after processing the SyncBcast result.
+local_sync_bcast(From = {Pid, _Tag}, Reply, Msg, S = #state{acks=Acks}) ->
+ FromNode = node(Pid),
+ case node() =:= FromNode of
+ true ->
+ %% Can send reply here as our local processing is already
+ %% finished by the time we get here.
+ gen_fsm:reply(From, Reply);
+ false ->
+ erlang:send(server_on(FromNode, S),
+ {'$gen_all_state_event',
+ {from_leader, node(), Msg,
+ {local_sync_reply, From, Reply}}},
+ [noconnect])
+ end,
+ [ erlang:send(server_on(Node, S),
+ {'$gen_all_state_event', {from_leader, node(), Msg}},
+ [noconnect])
+ || Node <- ordsets:from_list(Acks),
+ Node =/= node(),
+ Node =/= FromNode,
+ lists:member(Node, nodes()) ].
+
+%% @doc gen_fsm:format_status/2 callback.
+%%
+%% Examines internal state to give easier to read output for
+%% sys:get_status(<Some gl_async_bully process>).
+format_status(Fmt, [_Dict, S = #state{name=Name,
+ elid=Elid}]) ->
+ [{name, Name},
+ {election, Elid, format_election_status(S)},
+ format_mod_status(Fmt, S),
+ {state, S}].
+
+format_election_status(#state{leader=L,
+ acks=Acks,
+ peers=Peers}) when L =:= node() ->
+ [{leader, true},
+ {leader_node, L},
+ {acks, Acks},
+ {peers, Peers}];
+format_election_status(#state{leader=L}) ->
+ [{leader, false},
+ {leader_node, L}].
+
+format_mod_status(Fmt, #state{ms={Mod, ModS}}) ->
+ case erlang:function_exported(Mod, format_status, 2) of
+ true ->
+ {Mod, Mod:format_status(Fmt, ModS)};
+ false ->
+ {Mod, [{state, ModS}]}
+ end.
+
+-spec control_message(proto_message()) -> control_message().
+%% @doc Format a control_message from the local node.
+control_message(Msg) ->
+ {gl_async_bully, node(), Msg}.
+
+%% @doc generate new incarnation id strictly greather than the old
+%% one.
+reincarnate(State = #state{}) ->
+ State#state{incarn = erlang:now()}.
View
82 src/glab_fd.erl
@@ -0,0 +1,82 @@
+%%%-------------------------------------------------------------------
+%% @copyright Geoff Cant
+%% @author Geoff Cant <nem@erlang.geek.nz>
+%% @version {@vsn}, {@date} {@time}
+%% @doc Implements (slightly?) unreliable failure detectors for the
+%% async bully algorithm used in gl_async_bully.
+%% @end
+%%%-------------------------------------------------------------------
+-module(glab_fd).
+
+%% API
+-export([new/1
+ ,start/2
+ ,stop/2
+ ,filter_DOWN/2
+ ,set/2
+ ]).
+
+-record(fd, {name :: atom(),
+ nodes = orddict:new() :: orddict:orddict()
+ }).
+
+-opaque fd_set() :: #fd{}.
+
+-export_type([ fd_set/0 ]).
+
+%%====================================================================
+%% API
+%%====================================================================
+
+-spec new(atom()) -> fd_set().
+new(Name) when is_atom(Name) ->
+ #fd{name=Name}.
+
+-spec start(node(), fd_set()) -> fd_set().
+start(Node, FD = #fd{name = Name, nodes = Nodes}) when is_atom(Node)->
+ case orddict:is_key(Node, Nodes) of
+ true ->
+ FD;
+ false ->
+ Ref = erlang:monitor(process, {Name, Node}),
+ FD#fd{nodes=orddict:store(Node, Ref, Nodes)}
+ end.
+
+-spec filter_DOWN({'DOWN', reference(), _, _, Info}, fd_set()) ->
+ {'down', {atom(), node(), Info}, fd_set()} |
+ {'ignore', Reason::atom(), fd_set()}.
+filter_DOWN({'DOWN', Ref, process, {Name, Node}, Info},
+ FD = #fd{name = Name, nodes = Nodes}) ->
+ case orddict:find(Node, Nodes) of
+ {ok, Ref} ->
+ {down, {Name, Node, Info},
+ FD#fd{nodes=orddict:erase(Node, Nodes)}};
+ {ok, WrongRef} when is_reference(WrongRef) ->
+ {ignore, stale, FD};
+ error ->
+ {ignore, not_monitored, FD}
+ end;
+filter_DOWN({'DOWN', _Ref, _Type, _Obj, _Info}, FD) ->
+ {ignore, other_montior, FD}.
+
+
+-spec stop(node(), fd_set()) -> fd_set().
+stop(Node, FD = #fd{nodes = Nodes}) when is_atom(Node) ->
+ case orddict:find(Node, Nodes) of
+ {ok, Ref} ->
+ erlang:demonitor(Ref, [flush]),
+ FD#fd{nodes=orddict:erase(Node, Nodes)};
+ error ->
+ FD
+ end.
+
+-spec set([node()], fd_set()) -> fd_set().
+set(NewNodes, FD = #fd{nodes = Nodes}) ->
+ lists:foldl(fun start/2,
+ lists:foldl(fun stop/2, FD,
+ orddict:fetch_keys(Nodes)),
+ NewNodes).
+
+%%====================================================================
+%% Internal functions
+%%====================================================================
Please sign in to comment.
Something went wrong with that request. Please try again.