Skip to content

Commit

Permalink
Pushing naive voting implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
auser committed Jul 7, 2010
1 parent db5db03 commit eeed01a
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
49 changes: 35 additions & 14 deletions src/gen_cluster.erl
Expand Up @@ -38,7 +38,8 @@
-export([
plist/1,
mod_plist/2,
publish/2
publish/2,
call_vote/2
]).

behaviour_info(callbacks) ->
Expand Down Expand Up @@ -111,6 +112,9 @@ mod_plist(Type, PidRef) ->
publish(Mod, Msg) when is_atom(Mod) ->
do_publish(Mod, Msg).

call_vote(Mod, Msg) ->
do_call_vote(Mod, Msg).

%%--------------------------------------------------------------------
%% Function: init(Args) -> {ok, State} |
%% {ok, State, Timeout} |
Expand Down Expand Up @@ -185,9 +189,16 @@ handle_call({'$gen_cluster', plist}, _From, State) ->
{reply, Reply, State};

handle_call({'$gen_cluster', mod_plist, Mod}, _From, State) ->
Pids = gproc:lookup_pids({p,g,{gen_cluster, Mod}}),
Pids = gproc:lookup_pids({p,g,cluster_key(Mod)}),
{reply, {ok, Pids}, State};

handle_call({'$gen_cluster', handle_vote_called, Msg}, From, #state{module = Mod, state = ExtState} = State) ->
Reply = case erlang:function_exported(Mod, handle_vote, 2) of
false -> {reply, 0, State};
true -> Mod:handle_vote(Msg, ExtState)
end,
handle_call_reply(Reply, From, State);

handle_call(Request, From, State) ->
Mod = State#state.module,
ExtState = State#state.state,
Expand Down Expand Up @@ -260,7 +271,7 @@ handle_info({'$gen_cluster', handle_publish, Msg}, #state{module = Mod, state =
Reply = Mod:handle_publish(Msg, ExtState),
handle_publish_reply(Reply, State)
end;

handle_info(Info, State) ->
?TRACE("got other INFO", Info),
Mod = State#state.module,
Expand Down Expand Up @@ -331,18 +342,18 @@ handle_pid_leaving(Pid, Info, State) ->
true ->
% XXX: There's a bug in gproc that prevents it from removing
% keys when a remote node goes away. This should do the trick for now.
gproc_dist:leader_call({unreg, {p,g,cluster_key(State)}, Pid}),
gproc_dist:leader_call({unreg, {p,g,cluster_key(State#state.module)}, Pid}),
{ok, NewExtState} = Mod:handle_leave(Pid, Info, ExtState),
{true, State#state{state=NewExtState}};
false -> {false, State}
end.


cluster_key(#state{module = Mod} = _State) ->
cluster_key(Mod) ->
{gen_cluster, Mod}.

cluster_pids(State) ->
gproc:lookup_pids({p,g,cluster_key(State)}).
gproc:lookup_pids({p,g,cluster_key(State#state.module)}).

monitor_pid(Pid) when is_pid(Pid) ->
case Pid =/= self() of
Expand All @@ -354,7 +365,7 @@ is_pid_part_of_the_cluster(Pid, State) when is_pid(Pid) ->
lists:member(Pid, cluster_pids(State)).

join_cluster(State) ->
gproc:reg({p,g,cluster_key(State)}),
gproc:reg({p,g,cluster_key(State#state.module)}),
lists:foreach(
fun(Pid) ->
case Pid =/= self() of
Expand All @@ -370,10 +381,20 @@ join_cluster(State) ->
% publish through gproc
do_publish(Mod, Msg) ->
lists:foreach(fun(Pid) ->
case catch erlang:send(Pid, {'$gen_cluster', handle_publish, Msg}, [noconnect]) of
noconnect ->
spawn(erlang, send, [Pid,Msg]);
Other ->
Other
end
end, gproc:lookup_pids({p,g,{gen_cluster, Mod}})).
do_send(Pid, {'$gen_cluster', handle_publish, Msg})
end, gproc:lookup_pids({p,g,cluster_key(Mod)})).

% Call vote
do_call_vote(Mod, Msg) ->
Votes = lists:map(fun(Pid) ->
Vote = gen_server:call(Pid, {'$gen_cluster', handle_vote_called, Msg}),
{Pid, Vote}
end, gproc:lookup_pids({p,g,cluster_key(Mod)})),
[{WinnerPid, _WinnerVote}|_Rest] = lists:sort(fun({_Pid1, Vote1},{_Pid2, Vote2}) -> Vote1 > Vote2 end, Votes),
WinnerPid.

do_send(Pid, Msg) ->
case catch erlang:send(Pid, Msg, [noconnect]) of
noconnect -> spawn(erlang, send, [Pid,Msg]);
Other -> Other
end.
11 changes: 10 additions & 1 deletion test/src/cluster_srv_tests.erl
Expand Up @@ -19,7 +19,8 @@ all_test_() ->
fun do_some_more/0,
fun different_type_of_node/0,
fun node_global_takeover/0,
fun publish_tests/0
fun publish_tests/0,
fun vote_tests/0
]
}
]
Expand Down Expand Up @@ -47,6 +48,14 @@ publish_tests() ->

passed.

vote_tests() ->
O = gen_cluster:call_vote(example_cluster_srv, {run, server}),
{ok, Plist} = gen_cluster:mod_plist(example_cluster_srv, node1),
% In this test, the winner will always be the last element
Winner = hd(lists:reverse(Plist)),
?assert(O == Winner),
passed.

node_global_takeover() ->
Node1Pid = whereis(node0),
{ok, Name1} = gen_cluster:call(Node1Pid, {'$gen_cluster', globally_registered_name}),
Expand Down
23 changes: 20 additions & 3 deletions test/src/example_cluster_srv.erl
Expand Up @@ -16,7 +16,12 @@
code_change/3]).

% gen_cluster callback
-export([handle_join/2, handle_leave/3, handle_publish/2]).
-export([handle_join/2, handle_leave/3]).
% Optional gen_cluster callbacks
-export ([
handle_publish/2,
handle_vote/2
]).

-include ("debugger.hrl").

Expand Down Expand Up @@ -59,7 +64,7 @@ get_msg() ->

init(Args) ->
?TRACE("called init", Args),
InitialState = #state{name=example_cluster_srv, pid=self(), timestamp=0},
InitialState = #state{name=example_cluster_srv, pid=self(), timestamp=calendar:datetime_to_gregorian_seconds(calendar:now_to_universal_time(now()))},
{ok, InitialState}.

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -163,4 +168,16 @@ handle_leave(LeavingPid, Info, State) ->
{ok, State}.

handle_publish(Msg, State) ->
{noreply, State#state{last_msg = Msg}}.
{noreply, State#state{last_msg = Msg}}.

handle_vote({run, server}, State) ->
VoteValue = index_of(self(), erlang:processes()),
{reply, VoteValue, State};
handle_vote(_Msg, State) ->
{reply, 0, State}.

index_of(Item, List) -> index_of(Item, List, 1).

index_of(_, [], _) -> not_found;
index_of(Item, [Item|_], Index) -> Index;
index_of(Item, [_|Tl], Index) -> index_of(Item, Tl, Index+1).

0 comments on commit eeed01a

Please sign in to comment.