Skip to content
This repository has been archived by the owner on May 25, 2021. It is now read-only.

Commit

Permalink
dang, large commit.
Browse files Browse the repository at this point in the history
 * node removed from #mem{}
 * start_gossip api call added
 * some dialyzer specs
 * 'new' join accepts PingNode, calls into cluster itself
 * get_test convenience method
 * don't save state when testing
  • Loading branch information
Brad Anderson committed May 10, 2010
1 parent eb593c0 commit 454b9af
Show file tree
Hide file tree
Showing 3 changed files with 114 additions and 62 deletions.
1 change: 0 additions & 1 deletion include/common.hrl
Expand Up @@ -42,7 +42,6 @@

%% version 3 of membership state
-record(mem, {header=3,
node,
nodes=[],
clock=[],
args
Expand Down
142 changes: 90 additions & 52 deletions src/mem3.erl
Expand Up @@ -20,13 +20,13 @@

%% API
-export([start_link/0, start_link/1, stop/0, stop/1, reset/0]).
-export([join/2, clock/0, state/0]).
-export([join/3, clock/0, state/0, start_gossip/0]).
-export([partitions/0, fullmap/0]).
-export([nodes/0, nodes_for_part/1, nodes_for_part/2, all_nodes_parts/1]).
-export([parts_for_node/1]).

%% for testing more than anything else
-export([merge_nodes/2]).
-export([merge_nodes/2, next_up_node/1, next_up_node/3]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
Expand All @@ -46,9 +46,11 @@
-type options() :: list().
-type mem_node() :: {join_order(), node(), options()}.
-type mem_node_list() :: [mem_node()].
-type arg_options() :: {test, boolean()} | {config, #config{}}.
-type config() :: #config{}.
-type arg_options() :: {test, boolean()} | {config, config()}.
-type args() :: [] | [arg_options()].
-type mem_state() :: #mem{}.
-type test() :: undefined | node().
-type epoch() :: float().
-type clock() :: {node(), epoch()}.
-type vector_clock() :: [clock()].
Expand Down Expand Up @@ -77,9 +79,9 @@ stop(Server) ->
gen_server:cast(Server, stop).


-spec join(join_type(), mem_node_list()) -> ok.
join(JoinType, Nodes) ->
gen_server:call(?SERVER, {join, JoinType, Nodes}).
-spec join(join_type(), mem_node_list(), node() | nil) -> ok.
join(JoinType, Nodes, PingNode) ->
gen_server:call(?SERVER, {join, JoinType, Nodes, PingNode}).


-spec clock() -> vector_clock().
Expand All @@ -92,6 +94,11 @@ state() ->
gen_server:call(?SERVER, state).


-spec start_gossip() -> ok.
start_gossip() ->
gen_server:call(?SERVER, start_gossip).


-spec reset() -> ok | not_reset.
reset() ->
gen_server:call(?SERVER, reset).
Expand Down Expand Up @@ -156,7 +163,7 @@ all_nodes_parts(true) ->
init(Args) ->
process_flag(trap_exit,true),
Config = get_config(Args),
Test = proplists:get_value(test, Args),
Test = get_test(Args),
OldState = read_latest_state_file(Test, Config),
showroom_log:message(info, "membership: membership server starting...", []),
net_kernel:monitor_nodes(true),
Expand All @@ -165,9 +172,11 @@ init(Args) ->


%% new node(s) joining to this node
handle_call({join, JoinType, ExtNodes}, _From, State) ->
handle_call({join, JoinType, ExtNodes, PingNode}, _From, State) ->
% NewState = handle_join(JoinType, ExtNodes, PingNode, State),
% {reply, ok, NewState};
try
NewState = handle_join(JoinType, ExtNodes, State),
NewState = handle_join(JoinType, ExtNodes, PingNode, State),
{reply, ok, NewState}
catch _:Error ->
showroom_log:message(error, "~p", [Error]),
Expand All @@ -184,7 +193,7 @@ handle_call(state, _From, State) ->

%% reset - but only if we're in test mode
handle_call(reset, _From, #mem{args=Args} = State) ->
Test = proplists:get_value(test, Args),
Test = get_test(Args),
case Test of
undefined -> {reply, not_reset, State};
_ -> {reply, ok, int_reset(Test, State)}
Expand All @@ -196,11 +205,16 @@ handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
{reply, {ok, NodeList}, State};

%% gossip
handle_call({gossip, #mem{node=RemoteNode} = RemoteState}, From, LocalState) ->
handle_call({gossip, RemoteState}, {Pid,_Tag} = From, LocalState) ->
showroom_log:message(info, "membership: received gossip from ~p",
[RemoteNode]),
[erlang:node(Pid)]),
handle_gossip(From, RemoteState, LocalState);

% start_gossip
handle_call(start_gossip, _From, State) ->
NewState = gossip(State),
{reply, ok, NewState};

%% ignored call
handle_call(Msg, _From, State) ->
showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
Expand Down Expand Up @@ -259,6 +273,10 @@ get_config(Args) ->
end.


get_test(Args) ->
proplists:get_value(test, Args).


% we could be automatically:
% 1. rejoining a cluster after some downtime
%
Expand All @@ -277,7 +295,7 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->
{_, NodeList, _} = lists:unzip3(Nodes),
ping_all_yall(NodeList),
{RemoteStates, _BadNodes} = get_remote_states(NodeList),
Test = proplists:get_value(test, Args),
Test = get_test(Args),
case compare_state_with_rest(OldState, RemoteStates) of
match ->
showroom_log:message(info, "membership: rejoined successfully", []),
Expand All @@ -289,32 +307,43 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->


%% handle join activities, return NewState
handle_join(first, ExtNodes, State) ->
handle_join(first, ExtNodes, nil, State) ->
{_,Nodes,_} = lists:unzip3(ExtNodes),
ping_all_yall(Nodes),
int_join(ExtNodes, State);

handle_join(new, ExtNodes, State) ->
{_,Nodes,_} = lists:unzip3(ExtNodes),
ping_all_yall(Nodes),
int_join(ExtNodes, State);
handle_join(new, ExtNodes, PingNode, #mem{args=Args} = State) ->
NewState = case get_test(Args) of
undefined ->
% ping the PingNode and get its state
pong = net_adm:ping(PingNode),
timer:sleep(1000), % let dist. erl get set up... sigh.
{ok, RemoteState} = rpc:call(PingNode, mem3, state, []),
RemoteState;
_ ->
% testing, so meh
State
end,
% now use this info to join the ring
int_join(ExtNodes, NewState);

handle_join(replace, [_OldNode | _], _State) ->
handle_join(replace, [_OldNode | _], _PingNode, _State) ->
% TODO implement me
ok;

handle_join(JoinType, _, _) ->
showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]),
handle_join(JoinType, _, PingNode, _) ->
showroom_log:message(info, "membership: unknown join type: ~p "
"for ping node: ~p", [JoinType, PingNode]),
{error, {unknown_join_type, JoinType}}.


int_join(ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State) ->
int_join(ExtNodes, #mem{nodes=Nodes, clock=Clock} = State) ->
NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
check_pos(Pos, N, Nodes),
[New|AccIn]
end, Nodes, ExtNodes),
NewNodes1 = lists:sort(NewNodes),
NewClock = vector_clock:increment(Node, Clock),
NewClock = vector_clock:increment(node(), Clock),
NewState = State#mem{nodes=NewNodes1, clock=NewClock},
install_new_state(NewState),
NewState.
Expand All @@ -333,7 +362,7 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
greater ->
% local node needs updating
gen_server:reply(From, ok), % reply to sender first
install_new_state(RemoteState);
{noreply, install_new_state(RemoteState)};
concurrent ->
% ick, so let's resolve and merge states
showroom_log:message(info,
Expand All @@ -342,7 +371,7 @@ handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
, [RemoteState, LocalState]),
MergedState = merge_states(RemoteState, LocalState),
gen_server:reply(From, {new_state, MergedState}), % reply to sender
install_new_state(MergedState)
{noreply, install_new_state(MergedState)}
end.


Expand All @@ -367,18 +396,19 @@ merge_nodes(Remote, Local) ->


gossip(#mem{args=Args} = NewState) ->
Test = proplists:get_value(test, Args),
Test = get_test(Args),
gossip(Test, NewState).


gossip(undefined, #mem{node=Node, nodes=StateNodes} = State) ->
-spec gossip(test(), mem_state()) -> mem_state().
gossip(undefined, #mem{nodes=StateNodes} = State) ->
{_, Nodes, _} = lists:unzip3(StateNodes),
TargetNode = next_up_node(Node, Nodes),
TargetNode = next_up_node(Nodes),
showroom_log:message(info, "membership: firing gossip from ~p to ~p",
[Node, TargetNode]),
[node(), TargetNode]),
case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of
ok -> ok;
{new_state, _NewState} -> ?debugHere,ok;
ok -> State;
{new_state, NewState} -> NewState;
Error -> throw({unknown_gossip_response, Error})
end;

Expand All @@ -387,11 +417,19 @@ gossip(_,_) ->
ok.


next_up_node(Node, Nodes) ->
next_up_node(Nodes) ->
Node = node(),
next_up_node(Node, Nodes, up_nodes()).


next_up_node(Node, Nodes, UpNodes) ->
{A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
List = lists:append([B, A, [Node]]),
UpNodes = lists:delete(fun(N) -> lists:member(N, up_nodes()) end, List),
hd(UpNodes). % TODO: empty list?
List = lists:append(B, A), % be sure to eliminate Node
DownNodes = Nodes -- UpNodes,
case List -- DownNodes of
[Target|_] -> Target;
[] -> throw({error, no_gossip_targets_available})
end.


up_nodes() ->
Expand Down Expand Up @@ -425,8 +463,7 @@ read_latest_state_file(undefined, Config) ->
{ok, File} = find_latest_state_filename(Config),
case file:consult(File) of
{ok, [#mem{}=State]} -> State;
Else ->
?debugFmt("~nElse: ~p~n", [Else]),
_Else ->
throw({error, bad_mem_state_file})
end
catch _:Error ->
Expand All @@ -439,13 +476,15 @@ read_latest_state_file(_, _) ->

install_new_state(#mem{args=Args} = State) ->
Config = get_config(Args),
save_state_file(State, Config),
Test = get_test(Args),
save_state_file(Test, State, Config),
gossip(State).


%% @doc save the state file to disk, with current timestamp.
%% thx to riak_ring_manager:do_write_ringfile/1
save_state_file(State, Config) ->
-spec save_state_file(test(), mem_state(), config()) -> ok.
save_state_file(undefined, State, Config) ->
Dir = Config#config.directory,
{{Year, Month, Day},{Hour, Minute, Second}} = calendar:universal_time(),
TS = io_lib:format("~B~2.10.0B~2.10.0B~2.10.0B~2.10.0B~2.10.0B",
Expand All @@ -454,7 +493,9 @@ save_state_file(State, Config) ->
ok = filelib:ensure_dir(FN),
{ok, File} = file:open(FN, [binary, write]),
io:format(File, "~w.~n", [State]),
file:close(File).
file:close(File);

save_state_file(_,_,_) -> ok. % don't save if testing


check_pos(Pos, Node, Nodes) ->
Expand All @@ -476,12 +517,8 @@ int_reset(Test) ->
int_reset(Test, #mem{}).


int_reset(Test, State) ->
Node = case Test of
undefined -> node();
_ -> Test
end,
State#mem{node=Node, nodes=[], clock=[]}.
int_reset(_Test, State) ->
State#mem{nodes=[], clock=[]}.


ping_all_yall(Nodes) ->
Expand All @@ -492,18 +529,19 @@ get_remote_states(NodeList) ->
NodeList1 = lists:delete(node(), NodeList),
{States1, BadNodes} = rpc:multicall(NodeList1, mem3, state, [], 5000),
{_Status, States2} = lists:unzip(States1),
{States2, BadNodes}.
{lists:zip(NodeList1,States2), BadNodes}.


%% @doc compare state with states based on vector clock
%% return match | {bad_state_match, Node, NodesThatDontMatch}
compare_state_with_rest(#mem{node=Node, clock=Clock} = _State, States) ->
Results = lists:map(fun(#mem{node=Node1, clock=Clock1}) ->
{vector_clock:equals(Clock, Clock1), Node1}
compare_state_with_rest(#mem{clock=Clock} = _State, States) ->
Results = lists:map(fun({Node, #mem{clock=Clock1}}) ->
{vector_clock:equals(Clock, Clock1), Node}
end, States),
BadResults = lists:foldl(fun({true, _N}, AccIn) -> AccIn;
({false, N}, AccIn) -> [N | AccIn] end, [], Results),
({false, N}, AccIn) -> [N | AccIn]
end, [], Results),
if
length(BadResults) == 0 -> match;
true -> {bad_state_match, Node, BadResults}
true -> {bad_state_match, node(), BadResults}
end.

0 comments on commit 454b9af

Please sign in to comment.