Permalink
Browse files

Merge remote-tracking branch 'remotes/archaelus/master'

  • Loading branch information...
2 parents 66a9d56 + 0382b0e commit 4b6cf1eaac51d10455796ba87708896bcc2e2d5a @Vagabond committed Aug 18, 2011
Showing with 116 additions and 28 deletions.
  1. +3 −0 .gitignore
  2. +70 −0 include/gen_leader_specs.hrl
  3. +43 −28 src/gen_leader.erl
View
3 .gitignore
@@ -13,3 +13,6 @@ profile
# Vim
*.swp
*.swo
+
+# Emacs
+*~
View
70 include/gen_leader_specs.hrl
@@ -0,0 +1,70 @@
+
+-spec init (Args::term()) ->
+ {'ok', State::term()} |
+ 'ignore' |
+ {'stop', Reason::term()}.
+
+%% elected(_,_,node()) - captured a single candidate
+%% reply notifies only the new candidate of the
+%% sync state, ok notifies all candidates
+%% elected(_,_,undefined) - won a general election
+%% ok broadcasts sync to all candidates
+-spec elected(State, gen_leader:election(), node() | 'undefined') ->
+ {'ok', Sync::term(), State} |
+ {'reply', Sync::term(), State}.
+
+-spec surrendered(State, Sync::term(), gen_leader:election()) ->
+ {'ok', State}.
+
+-spec handle_leader_call(Request::term(),
+ From::{pid(), reference()},
+ State,
+ gen_leader:election()) ->
+ {'reply', Reply::term(), State} |
+ {'reply', Reply::term(), Broadcast::term(), State} |
+ {'noreply', State} |
+ {'stop', Reason::term(), Reply::term(), State}.
+
+-spec handle_leader_cast(Msg::term(),
+ State,
+ gen_leader:election()) ->
+ {'ok', Broadcast::term(), State} |
+ {'noreply', State} |
+ {'stop', Reason::term(), State}.
+
+-spec from_leader(Msg::term(),
+ State,
+ gen_leader:election()) ->
+ {'noreply', State} |
+ {'ok', State} |
+ {'stop', Reason::term(), State}.
+
+-spec handle_call(Request::term(),
+ From::{pid(), reference()},
+ State,
+ gen_leader:election()) ->
+ {'reply', Reply::term(), State} |
+ {'noreply', State} |
+ {'stop', Reason::term(), State} |
+ {'stop', Reason::term(), Reply::term(), State}.
+
+-spec handle_cast(Msg::term(),
+ State,
+ gen_leader:election()) ->
+ {'noreply', State} |
+ {'stop', Reason::term(), State}.
+
+-spec handle_DOWN(node(), State, gen_leader:election()) ->
+ {'ok', State} |
+ {'ok', Sync::term(), State}.
+
+-spec handle_info(Msg::term(), State) ->
+ {'noreply', State} |
+ {'stop', Reason::term(), State} |
+ {'ok', State}.
+
+-spec terminate(Reason::term(), State::term()) -> any().
+
+-spec code_change(OldVsn::term(), State, gen_leader:election(), Extra::term()) ->
+ {'ok', State} |
+ {'ok', State, gen_leader:election()}.
View
71 src/gen_leader.erl
@@ -92,18 +92,18 @@
]).
+%% Notification control of candidate membership changes. `all'
+%% means that returns from the handle_DOWN/3 and elected/3 leader's events
+%% will be broadcast to all candidates.
+-type bcast_type() :: 'all' | 'sender'.
+
-type option() :: {'workers', Workers::[node()]}
| {'vardir', Dir::string()}
| {'bcast_type', Type::bcast_type()}
| {'heartbeat', Seconds::integer()}.
-type options() :: [option()].
-%% Notification control of candidate membership changes. `all'
-%% means that returns from the handle_DOWN/3 and elected/3 leader's events
-%% will be broadcast to all candidates.
--type bcast_type() :: 'all' | 'sender'.
-
-type status() :: 'elec1' | 'elec2' | 'wait' | 'joining' | 'worker' |
'waiting_worker' | 'norm'.
@@ -156,6 +156,10 @@
bcast_type :: bcast_type()
}).
+-opaque election() :: #election{}.
+
+-export_type([election/0]).
+
-record(server, {
parent,
mod,
@@ -242,27 +246,27 @@ start_link(Name, CandidateNodes, OptArgs, Mod, Arg, Options)
%% Query functions to be used from the callback module
%% @doc Returns list of alive nodes.
--spec alive(#election{}) -> [node()].
+-spec alive(election()) -> [node()].
alive(E) ->
candidates(E) -- down(E).
%% @doc Returns list of down nodes.
--spec down(#election{}) -> [node()].
+-spec down(election()) -> [node()].
down(#election{down = Down}) ->
Down.
%% @doc Returns the current leader node.
--spec leader_node(#election{}) -> node() | 'none'.
+-spec leader_node(election()) -> node() | 'none'.
leader_node(#election{leadernode=Leader}) ->
Leader.
%% @doc Returns a list of known candidates.
--spec candidates(#election{}) -> [node()].
+-spec candidates(election()) -> [node()].
candidates(#election{candidate_nodes = Cands}) ->
Cands.
%% @doc Returns a list of known workers.
--spec workers(#election{}) -> [node()].
+-spec workers(election()) -> [node()].
workers(#election{worker_nodes = Workers}) ->
Workers.
@@ -344,23 +348,23 @@ leader_call(Name, Request, Timeout) ->
%% @equiv gen_server:cast/2
--spec cast(Name::name()|pid(), Request::term()) -> 'ok'.
+-spec cast(Name::server_ref(), Request::term()) -> 'ok'.
cast(Name, Request) ->
catch do_cast('$gen_cast', Name, Request),
ok.
%% @doc Similar to <code>gen_server:cast/2</code> but will be forwarded to
%% the leader via the local gen_leader instance.
--spec leader_cast(Name::name()|pid(), Request::term()) -> 'ok'.
+-spec leader_cast(Name::server_ref(), Request::term()) -> 'ok'.
leader_cast(Name, Request) ->
catch do_cast('$leader_cast', Name, Request),
ok.
-do_cast(Tag, Name, Request) when is_atom(Name) ->
- Name ! {Tag, Request};
-do_cast(Tag, Pid, Request) when is_pid(Pid) ->
- Pid ! {Tag, Request}.
+do_cast(Tag, {global, Name}, Request) ->
+ global:send(Name, {Tag, Request});
+do_cast(Tag, ServerRef, Request) ->
+ ServerRef ! {Tag, Request}.
%% @equiv gen_server:reply/2
@@ -446,9 +450,7 @@ init_it(Starter,Parent,Name,Mod,{CandidateNodes,OptArgs,Arg},Options) ->
hasBecomeLeader(NewE,Server,{init});
false ->
%% more than one candidate worker, continue as normal
- safe_loop(#server{parent = Parent,mod = Mod,
- state = State,debug = Debug},
- candidate, NewE,{init})
+ safe_loop(Server, candidate, NewE,{init})
end;
{{ok, State}, true, true} ->
Server = #server{parent = Parent,mod = Mod,
@@ -1014,10 +1016,11 @@ handle_msg({'$leader_call', From, Request} = Msg, Server, Role,
handle_msg({Ref, {leader,reply,Reply}} = Msg, Server, Role,
#election{buffered = Buffered} = E) ->
{value, {_,From}} = lists:keysearch(Ref,1,Buffered),
- NewServer = reply(From, {leader,reply,Reply}, Server, Role,
- E#election{buffered =
- lists:keydelete(Ref,1,Buffered)}),
- loop(NewServer, Role, E, Msg);
+ El = E#election{buffered = lists:keydelete(Ref,1,Buffered)},
+
+ NewServer = reply(From, {leader,reply,Reply}, Server, Role, El),
+
+ loop(NewServer, Role, El, Msg);
handle_msg({'$gen_call', From, get_candidates} = Msg, Server, Role, E) ->
NewServer = reply(From, {ok, candidates(E)}, Server, Role, E),
loop(NewServer, Role, E, Msg);
@@ -1405,11 +1408,23 @@ mon_nodes(E,Nodes,Server) ->
mon_node(El, Pid, Server)
end,E1,Nodes -- [node()]).
-%% Star monitoring one Process
-mon_node(E,Proc,Server) ->
- {Ref,Node} = do_monitor(Proc, Server),
- E#election{monitored = [{Ref,Node} | E#election.monitored]}.
+%% Start monitoring one Process
+mon_node(E,{_RegName, NodeName} = Proc,Server) ->
+ case lists:keymember(NodeName, 2, E#election.monitored) of
+ true -> E;
+ false ->
+ {Ref,Node} = do_monitor(Proc, Server),
+ E#election{monitored = [{Ref,Node} | E#election.monitored]}
+ end;
+mon_node(E,Proc,Server) when is_pid(Proc) ->
+ case lists:keymember(node(Proc), 2, E#election.monitored) of
+ true -> E;
+ false ->
+ {Ref,Node} = do_monitor(Proc, Server),
+ E#election{monitored = [{Ref,Node} | E#election.monitored]}
+ end
+ .
spawn_monitor_proc() ->
Parent = self(),
@@ -1444,7 +1459,7 @@ mon_handle_req({monitor, P}, From, Refs) ->
Pid when is_pid(Pid) -> node(Pid)
end,
case lists:keyfind(Node, 2, Refs) of
- {_, Ref} ->
+ {Ref, _} ->
mon_reply(From, {Ref,Node}),
Refs;
false ->

0 comments on commit 4b6cf1e

Please sign in to comment.