Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
reworking gossip, BugzID 10069
  • Loading branch information
Brad Anderson committed May 10, 2010
1 parent 0e88f2b commit eb593c0557710c29f8c476f43d72fb54172b8e4e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 120 deletions.
@@ -43,7 +43,7 @@
%% version 3 of membership state
-record(mem, {header=3,
node,
nodes,
clock,
nodes=[],
clock=[],
args
}).
@@ -165,11 +165,9 @@ init(Args) ->


%% new node(s) joining to this node
handle_call({join, JoinType, ExtNodes}, _From,
#mem{args=Args} = State) ->
Config = get_config(Args),
handle_call({join, JoinType, ExtNodes}, _From, State) ->
try
NewState = handle_join(JoinType, ExtNodes, State, Config),
NewState = handle_join(JoinType, ExtNodes, State),
{reply, ok, NewState}
catch _:Error ->
showroom_log:message(error, "~p", [Error]),
@@ -189,17 +187,20 @@ handle_call(reset, _From, #mem{args=Args} = State) ->
Test = proplists:get_value(test, Args),
case Test of
undefined -> {reply, not_reset, State};
_ ->
mochiglobal:delete(pmap),
mochiglobal:delete(fullmap),
{reply, ok, int_reset(Test, State)}
_ -> {reply, ok, int_reset(Test, State)}
end;

%% nodes
handle_call(nodes, _From, #mem{nodes=Nodes} = State) ->
{_,NodeList,_} = lists:unzip3(Nodes),
{reply, {ok, NodeList}, State};

%% gossip
handle_call({gossip, #mem{node=RemoteNode} = RemoteState}, From, LocalState) ->
showroom_log:message(info, "membership: received gossip from ~p",
[RemoteNode]),
handle_gossip(From, RemoteState, LocalState);

%% ignored call
handle_call(Msg, _From, State) ->
showroom_log:message(info, "membership: ignored call: ~p", [Msg]),
@@ -210,12 +211,6 @@ handle_call(Msg, _From, State) ->
handle_cast(stop, State) ->
{stop, normal, State};

%% gossip
handle_cast({gossip, #mem{node=RemoteNode} = RemoteState}, LocalState) ->
showroom_log:message(info, "membership: received gossip from ~p",
[RemoteNode]),
{noreply, handle_gossip(RemoteState, LocalState)};

%% ignored cast
handle_cast(Msg, State) ->
showroom_log:message(info, "membership: ignored cast: ~p", [Msg]),
@@ -294,52 +289,60 @@ handle_init(_Test, #mem{nodes=Nodes, args=Args} = OldState) ->


%% handle join activities, return NewState
handle_join(first, ExtNodes, State, Config) ->
handle_join(first, ExtNodes, State) ->
{_,Nodes,_} = lists:unzip3(ExtNodes),
ping_all_yall(Nodes),
int_join(first, ExtNodes, State, Config);
int_join(ExtNodes, State);

handle_join(new, ExtNodes, State, Config) ->
int_join(new, ExtNodes, State, Config);
handle_join(new, ExtNodes, State) ->
{_,Nodes,_} = lists:unzip3(ExtNodes),
ping_all_yall(Nodes),
int_join(ExtNodes, State);

handle_join(replace, [_OldNode | _], _State, _Config) ->
handle_join(replace, [_OldNode | _], _State) ->
% TODO implement me
ok;

handle_join(JoinType, _, _, _) ->
handle_join(JoinType, _, _) ->
showroom_log:message(info, "membership: unknown join type: ~p", [JoinType]),
{error, {unknown_join_type, JoinType}}.


int_join(JoinType, ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State,
Config) ->
int_join(ExtNodes, #mem{node=Node, nodes=Nodes, clock=Clock} = State) ->
NewNodes = lists:foldl(fun({Pos, N, _Options}=New, AccIn) ->
check_pos(Pos, N, Nodes),
[New|AccIn]
end, Nodes, ExtNodes),
NewNodes1 = lists:sort(NewNodes),
NewClock = vector_clock:increment(Node, Clock),
NewState = State#mem{nodes=ExtNodes, clock=NewClock},
{Pmap, Fullmap} = create_maps(Config, JoinType, ExtNodes, Nodes),
new_state(NewState, Pmap, Fullmap, Config).
NewState = State#mem{nodes=NewNodes1, clock=NewClock},
install_new_state(NewState),
NewState.


%% @doc handle the gossip messages
%% We're not using vector_clock:resolve b/c we need custom merge strategy
handle_gossip(RemoteState=#mem{clock=RemoteClock, node=RemoteNode},
handle_gossip(From, RemoteState=#mem{clock=RemoteClock},
LocalState=#mem{clock=LocalClock}) ->
case vector_clock:compare(RemoteClock, LocalClock) of
equal -> LocalState;
equal ->
{reply, ok, LocalState};
less ->
% remote node needs updating
gen_server:cast({?SERVER, RemoteNode}, {gossip, LocalState}),
LocalState;
{reply, {new_state, LocalState}, LocalState};
greater ->
% local node needs updating
new_state(RemoteState);
gen_server:reply(From, ok), % reply to sender first
install_new_state(RemoteState);
concurrent ->
% ick, so let's resolve and merge states
showroom_log:message(info,
"~nmembership: Concurrent Clocks~n"
"~nRemoteState : ~p~nLocalState : ~p~n"
"membership: Concurrent Clocks~n"
"RemoteState : ~p~nLocalState : ~p~n"
, [RemoteState, LocalState]),
MergedState = merge_states(RemoteState, LocalState),
new_state(MergedState)
gen_server:reply(From, {new_state, MergedState}), % reply to sender
install_new_state(MergedState)
end.


@@ -363,29 +366,39 @@ merge_nodes(Remote, Local) ->
end.


% notify(Type, Nodes) ->
% lists:foreach(fun(Node) ->
% gen_event:notify(membership_events, {Type, Node})
% end, Nodes).

gossip(#mem{args=Args} = NewState) ->
Test = proplists:get_value(test, Args),
gossip(Test, NewState).


gossip(undefined, #mem{node=Node, nodes=StateNodes} = NewState) ->
gossip(undefined, #mem{node=Node, nodes=StateNodes} = State) ->
{_, Nodes, _} = lists:unzip3(StateNodes),
PartnersPlus = replication:partners_plus(Node, Nodes),
lists:foreach(fun(TargetNode) ->
showroom_log:message(info, "membership: firing gossip from ~p to ~p",
TargetNode = next_up_node(Node, Nodes),
showroom_log:message(info, "membership: firing gossip from ~p to ~p",
[Node, TargetNode]),
gen_server:cast({?SERVER, TargetNode}, {gossip, NewState})
end, PartnersPlus);
case gen_server:call({?SERVER, TargetNode}, {gossip, State}) of
ok -> ok;
{new_state, _NewState} -> ?debugHere,ok;
Error -> throw({unknown_gossip_response, Error})
end;

gossip(_,_) ->
% testing, so don't gossip
ok.


next_up_node(Node, Nodes) ->
{A, [Node|B]} = lists:splitwith(fun(N) -> N /= Node end, Nodes),
List = lists:append([B, A, [Node]]),
UpNodes = lists:delete(fun(N) -> lists:member(N, up_nodes()) end, List),
hd(UpNodes). % TODO: empty list?


up_nodes() ->
% TODO: implement cache (fb 9704 & 9449)
erlang:nodes().


%% @doc find the latest state file on disk
find_latest_state_filename(Config) ->
Dir = Config#config.directory,
@@ -424,6 +437,12 @@ read_latest_state_file(_, _) ->
nil.


install_new_state(#mem{args=Args} = State) ->
Config = get_config(Args),
save_state_file(State, Config),
gossip(State).


%% @doc save the state file to disk, with current timestamp.
%% thx to riak_ring_manager:do_write_ringfile/1
save_state_file(State, Config) ->
@@ -438,58 +457,6 @@ save_state_file(State, Config) ->
file:close(File).


%% @doc given Config and a list of ExtNodes, construct a {Pmap,Fullmap}
%% This is basically replaying all the mem events that have happened.
create_maps(#config{q=Q} = Config, JoinType, ExtNodes, Nodes) ->
[{_,FirstNode,_}|_] = ExtNodes,
Fun = fun({Pos, Node, Options}, Pmap) ->
check_pos(Pos, Node, Nodes),
Hints = proplists:get_value(hints, Options),
{ok, NewPmap} = partitions:join(Node, Pmap, Hints),
NewPmap
end,
Acc0 = case JoinType of
first -> partitions:create_partitions(Q, FirstNode);
new -> mochiglobal:get(pmap)
end,
Pmap = lists:foldl(Fun, Acc0, lists:keysort(1, ExtNodes)),
{Pmap, make_fullmap(Pmap, Config)}.


%% @doc construct a table with all partitions, with the primary node and all
%% replication partner nodes as well.
make_fullmap(PMap, Config) ->
{Nodes, _Parts} = lists:unzip(PMap),
NodeParts = lists:flatmap(
fun({Node,Part}) ->
Partners = replication:partners(Node, lists:usort(Nodes), Config),
PartnerList = [{Partner, Part} || Partner <- Partners],
[{Node, Part} | PartnerList]
end, PMap),
NodeParts.


%% @doc tasks associated with a new state
new_state(#mem{nodes=Nodes, args=Args} = State) ->
Config = get_config(Args),
{Pmap, Fullmap} = create_maps(Config, first, Nodes, []),
new_state(State, Pmap, Fullmap, Config).


%% @doc tasks associated with a new state
new_state(State, Pmap, Fullmap, Config) ->
update_cache(Pmap, Fullmap),
save_state_file(State, Config),
gossip(State),
State.


%% cache table helper function
update_cache(Pmap, Fullmap) ->
mochiglobal:put(pmap, Pmap),
mochiglobal:put(fullmap, Fullmap).


check_pos(Pos, Node, Nodes) ->
Found = lists:keyfind(Pos, 1, Nodes),
case Found of
@@ -514,9 +481,7 @@ int_reset(Test, State) ->
undefined -> node();
_ -> Test
end,
Nodes = [{0, Node, []}],
Clock = vector_clock:create(Node),
State#mem{node=Node, nodes=Nodes, clock=Clock}.
State#mem{node=Node, nodes=[], clock=[]}.


ping_all_yall(Nodes) ->
@@ -61,16 +61,14 @@ init(_Pid) ->

clock(_Pid) ->
{ok, Clock} = mem3:clock(),
?assertMatch([{?TEST_NODE_NAME, _}], Clock).
?assertMatch([], Clock).


join_first(_Pid) ->
mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}]),
Fullmap = mem3:fullmap(),
?assertEqual(16, length(Fullmap)),
Pmap = mem3:partitions(),
?assertEqual(8, length(Pmap)),
{ok, Nodes} = mem3:nodes(),
?assertEqual(2, length(Nodes)),
ok.


@@ -81,43 +79,43 @@ join_first_with_hints(_Pid) ->
{3, c, [{hints, [?HINT_C1, ?HINT_C2]}]},
{4, d, []},
{5, e, []}]),
Fullmap = mem3:fullmap(),
?assertEqual(24, length(Fullmap)),
Pmap = mem3:partitions(),
?assertEqual(8, length(Pmap)),
{ok, Nodes} = mem3:nodes(),
?assertEqual(5, length(Nodes)),
%?debugFmt("~nFullmap: ~p~n", [Fullmap]),
?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C1)),
?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C2)),
% ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C1)),
% ?assertEqual([c,d,e], mem3:nodes_for_part(?HINT_C2)),
ok.


join_new_node(_Pid) ->
mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
?assertEqual(24, length(mem3:fullmap())),
?assertEqual([], mem3:parts_for_node(d)),
{ok, Nodes1} = mem3:nodes(),
?assertEqual(3, length(Nodes1)),
mem3:join(new, [{4, d, []}]),
?assertEqual(?PARTS_FOR_D1, mem3:parts_for_node(d)),
%?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
{ok, Nodes2} = mem3:nodes(),
?assertEqual(4, length(Nodes2)),
?debugFmt("~nNodes: ~p~n", [Nodes2]),
ok.


join_two_new_nodes(_Pid) ->
mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
?assertEqual([], mem3:parts_for_node(d)),
{ok, Nodes1} = mem3:nodes(),
?assertEqual(3, length(Nodes1)),
Res = mem3:join(new, [{4, d, []}, {5, e, []}]),
?assertEqual(ok, Res),
?assertEqual([a,d,e], mem3:nodes_for_part(?x40)),
?assertEqual([c,d,e], mem3:nodes_for_part(?x60)),
{ok, Nodes2} = mem3:nodes(),
?assertEqual(5, length(Nodes2)),
%?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
ok.


join_with_wrong_order(_Pid) ->
mem3:reset(),
mem3:join(first, [{1, a, []}, {2, b, []}, {3, c, []}]),
?assertEqual([], mem3:parts_for_node(d)),
% ?assertEqual([], mem3:parts_for_node(d)),
%?debugFmt("~nFullmap: ~p~n", [mem3:fullmap()]),
Res = mem3:join(new, [{3, d, []}]),
?assertEqual({error,{position_exists,3,c}}, Res),

0 comments on commit eb593c0

Please sign in to comment.