Skip to content

Commit

Permalink
Merge remote-tracking branch 'remotes/archaelus/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
Vagabond committed Aug 18, 2011
2 parents 66a9d56 + 0382b0e commit 4b6cf1e
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -13,3 +13,6 @@ profile
# Vim
*.swp
*.swo

# Emacs
*~
70 changes: 70 additions & 0 deletions include/gen_leader_specs.hrl
@@ -0,0 +1,70 @@

-spec init (Args::term()) ->
{'ok', State::term()} |
'ignore' |
{'stop', Reason::term()}.

%% elected(_,_,node()) - captured a single candidate
%% reply notifies only the new candidate of the
%% sync state, ok notifies all candidates
%% elected(_,_,undefined) - won a general election
%% ok broadcasts sync to all candidates
-spec elected(State, gen_leader:election(), node() | 'undefined') ->
{'ok', Sync::term(), State} |
{'reply', Sync::term(), State}.

-spec surrendered(State, Sync::term(), gen_leader:election()) ->
{'ok', State}.

-spec handle_leader_call(Request::term(),
From::{pid(), reference()},
State,
gen_leader:election()) ->
{'reply', Reply::term(), State} |
{'reply', Reply::term(), Broadcast::term(), State} |
{'noreply', State} |
{'stop', Reason::term(), Reply::term(), State}.

-spec handle_leader_cast(Msg::term(),
State,
gen_leader:election()) ->
{'ok', Broadcast::term(), State} |
{'noreply', State} |
{'stop', Reason::term(), State}.

-spec from_leader(Msg::term(),
State,
gen_leader:election()) ->
{'noreply', State} |
{'ok', State} |
{'stop', Reason::term(), State}.

-spec handle_call(Request::term(),
From::{pid(), reference()},
State,
gen_leader:election()) ->
{'reply', Reply::term(), State} |
{'noreply', State} |
{'stop', Reason::term(), State} |
{'stop', Reason::term(), Reply::term(), State}.

-spec handle_cast(Msg::term(),
State,
gen_leader:election()) ->
{'noreply', State} |
{'stop', Reason::term(), State}.

-spec handle_DOWN(node(), State, gen_leader:election()) ->
{'ok', State} |
{'ok', Sync::term(), State}.

-spec handle_info(Msg::term(), State) ->
{'noreply', State} |
{'stop', Reason::term(), State} |
{'ok', State}.

-spec terminate(Reason::term(), State::term()) -> any().

-spec code_change(OldVsn::term(), State, gen_leader:election(), Extra::term()) ->
{'ok', State} |
{'ok', State, gen_leader:election()}.
71 changes: 43 additions & 28 deletions src/gen_leader.erl
Expand Up @@ -92,18 +92,18 @@
]).


%% Notification control of candidate membership changes. `all'
%% means that returns from the handle_DOWN/3 and elected/3 leader's events
%% will be broadcast to all candidates.
-type bcast_type() :: 'all' | 'sender'.

-type option() :: {'workers', Workers::[node()]}
| {'vardir', Dir::string()}
| {'bcast_type', Type::bcast_type()}
| {'heartbeat', Seconds::integer()}.

-type options() :: [option()].

%% Notification control of candidate membership changes. `all'
%% means that returns from the handle_DOWN/3 and elected/3 leader's events
%% will be broadcast to all candidates.
-type bcast_type() :: 'all' | 'sender'.

-type status() :: 'elec1' | 'elec2' | 'wait' | 'joining' | 'worker' |
'waiting_worker' | 'norm'.

Expand Down Expand Up @@ -156,6 +156,10 @@
bcast_type :: bcast_type()
}).

-opaque election() :: #election{}.

-export_type([election/0]).

-record(server, {
parent,
mod,
Expand Down Expand Up @@ -242,27 +246,27 @@ start_link(Name, CandidateNodes, OptArgs, Mod, Arg, Options)
%% Query functions to be used from the callback module

%% @doc Returns list of alive nodes.
-spec alive(#election{}) -> [node()].
-spec alive(election()) -> [node()].
alive(E) ->
candidates(E) -- down(E).

%% @doc Returns list of down nodes.
-spec down(#election{}) -> [node()].
-spec down(election()) -> [node()].
down(#election{down = Down}) ->
Down.

%% @doc Returns the current leader node.
-spec leader_node(#election{}) -> node() | 'none'.
-spec leader_node(election()) -> node() | 'none'.
leader_node(#election{leadernode=Leader}) ->
Leader.

%% @doc Returns a list of known candidates.
-spec candidates(#election{}) -> [node()].
-spec candidates(election()) -> [node()].
candidates(#election{candidate_nodes = Cands}) ->
Cands.

%% @doc Returns a list of known workers.
-spec workers(#election{}) -> [node()].
-spec workers(election()) -> [node()].
workers(#election{worker_nodes = Workers}) ->
Workers.

Expand Down Expand Up @@ -344,23 +348,23 @@ leader_call(Name, Request, Timeout) ->


%% @equiv gen_server:cast/2
-spec cast(Name::name()|pid(), Request::term()) -> 'ok'.
-spec cast(Name::server_ref(), Request::term()) -> 'ok'.
cast(Name, Request) ->
catch do_cast('$gen_cast', Name, Request),
ok.

%% @doc Similar to <code>gen_server:cast/2</code> but will be forwarded to
%% the leader via the local gen_leader instance.
-spec leader_cast(Name::name()|pid(), Request::term()) -> 'ok'.
-spec leader_cast(Name::server_ref(), Request::term()) -> 'ok'.
leader_cast(Name, Request) ->
catch do_cast('$leader_cast', Name, Request),
ok.


do_cast(Tag, Name, Request) when is_atom(Name) ->
Name ! {Tag, Request};
do_cast(Tag, Pid, Request) when is_pid(Pid) ->
Pid ! {Tag, Request}.
do_cast(Tag, {global, Name}, Request) ->
global:send(Name, {Tag, Request});
do_cast(Tag, ServerRef, Request) ->
ServerRef ! {Tag, Request}.


%% @equiv gen_server:reply/2
Expand Down Expand Up @@ -446,9 +450,7 @@ init_it(Starter,Parent,Name,Mod,{CandidateNodes,OptArgs,Arg},Options) ->
hasBecomeLeader(NewE,Server,{init});
false ->
%% more than one candidate worker, continue as normal
safe_loop(#server{parent = Parent,mod = Mod,
state = State,debug = Debug},
candidate, NewE,{init})
safe_loop(Server, candidate, NewE,{init})
end;
{{ok, State}, true, true} ->
Server = #server{parent = Parent,mod = Mod,
Expand Down Expand Up @@ -1014,10 +1016,11 @@ handle_msg({'$leader_call', From, Request} = Msg, Server, Role,
handle_msg({Ref, {leader,reply,Reply}} = Msg, Server, Role,
#election{buffered = Buffered} = E) ->
{value, {_,From}} = lists:keysearch(Ref,1,Buffered),
NewServer = reply(From, {leader,reply,Reply}, Server, Role,
E#election{buffered =
lists:keydelete(Ref,1,Buffered)}),
loop(NewServer, Role, E, Msg);
El = E#election{buffered = lists:keydelete(Ref,1,Buffered)},

NewServer = reply(From, {leader,reply,Reply}, Server, Role, El),

loop(NewServer, Role, El, Msg);
handle_msg({'$gen_call', From, get_candidates} = Msg, Server, Role, E) ->
NewServer = reply(From, {ok, candidates(E)}, Server, Role, E),
loop(NewServer, Role, E, Msg);
Expand Down Expand Up @@ -1405,11 +1408,23 @@ mon_nodes(E,Nodes,Server) ->
mon_node(El, Pid, Server)
end,E1,Nodes -- [node()]).

%% Star monitoring one Process
mon_node(E,Proc,Server) ->
{Ref,Node} = do_monitor(Proc, Server),
E#election{monitored = [{Ref,Node} | E#election.monitored]}.
%% Start monitoring one Process
mon_node(E,{_RegName, NodeName} = Proc,Server) ->
case lists:keymember(NodeName, 2, E#election.monitored) of
true -> E;
false ->
{Ref,Node} = do_monitor(Proc, Server),
E#election{monitored = [{Ref,Node} | E#election.monitored]}
end;

mon_node(E,Proc,Server) when is_pid(Proc) ->
case lists:keymember(node(Proc), 2, E#election.monitored) of
true -> E;
false ->
{Ref,Node} = do_monitor(Proc, Server),
E#election{monitored = [{Ref,Node} | E#election.monitored]}
end
.

spawn_monitor_proc() ->
Parent = self(),
Expand Down Expand Up @@ -1444,7 +1459,7 @@ mon_handle_req({monitor, P}, From, Refs) ->
Pid when is_pid(Pid) -> node(Pid)
end,
case lists:keyfind(Node, 2, Refs) of
{_, Ref} ->
{Ref, _} ->
mon_reply(From, {Ref,Node}),
Refs;
false ->
Expand Down

0 comments on commit 4b6cf1e

Please sign in to comment.