Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
tree: fd21fea013
Fetching contributors…

Cannot retrieve contributors at this time

500 lines (444 sloc) 19.392 kb
%% -------------------------------------------------------------------
%%
%% riak_core: Core Riak Application
%%
%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
%% @doc riak_core_gossip takes care of the mechanics of shuttling a from one
%% node to another upon request by other Riak processes.
%%
%% Additionally, it occasionally checks to make sure the current node has its
%% fair share of partitions, and also sends a copy of the ring to some other
%% random node, ensuring that all nodes eventually synchronize on the same
%% understanding of the Riak cluster. This interval is configurable, but
%% defaults to once per minute.
-module(riak_core_gossip).
-behaviour(gen_server).
-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-export ([distribute_ring/1, send_ring/1, send_ring/2, remove_from_cluster/2,
remove_from_cluster/3, random_gossip/1,
recursive_gossip/1, random_recursive_gossip/1, rejoin/2,
gossip_version/0, legacy_gossip/0, legacy_gossip/1,
any_legacy_gossip/2]).
-include("riak_core_ring.hrl").
%% Default gossip rate: allow at most 45 gossip messages every 10 seconds
-define(DEFAULT_LIMIT, {45, 10000}).
-record(state, {gossip_versions,
gossip_tokens}).
%% ===================================================================
%% Public API
%% ===================================================================
%% distribute_ring/1 -
%% Distribute a ring to all members of that ring.
distribute_ring(Ring) ->
gen_server:cast({?MODULE, node()}, {distribute_ring, Ring}).
%% send_ring/1 -
%% Send the current node's ring to some other node.
send_ring(ToNode) -> send_ring(node(), ToNode).
%% send_ring/2 -
%% Send the ring from one node to another node.
%% Does nothing if the two nodes are the same.
send_ring(Node, Node) ->
ok;
send_ring(FromNode, ToNode) ->
gen_server:cast({?MODULE, FromNode}, {send_ring_to, ToNode}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:cast(?MODULE, stop).
rejoin(Node, Ring) ->
gen_server:cast({?MODULE, Node}, {rejoin, Ring}).
legacy_gossip() ->
gen_server:call(?MODULE, legacy_gossip).
legacy_gossip(Node) ->
gen_server:call(?MODULE, {legacy_gossip, Node}).
%% @doc Determine if any of the `Nodes' are using legacy gossip by querying
%% each node's capability directly over RPC. The proper way to check
%% for legacy gossip is to use {@link legacy/gossip/1}. This function
%% is used to support staged clustering in `riak_core_claimant'.
any_legacy_gossip(_Ring, []) ->
false;
any_legacy_gossip(Ring, [Node|Nodes]) ->
case rpc_gossip_version(Ring, Node) of
?LEGACY_RING_VSN ->
true;
_ ->
any_legacy_gossip(Ring, Nodes)
end.
%% @doc Gossip state to a random node in the ring.
random_gossip(Ring) ->
case riak_core_ring:random_other_active_node(Ring) of
no_node -> % must be single node cluster
ok;
RandomNode ->
send_ring(node(), RandomNode)
end.
%% @doc Gossip state to a fixed set of nodes determined from a binary
%% tree decomposition of the membership state. Recursive gossip
%% converts the list of node members into a binary tree and
%% gossips to the given node's right/left children. The gossip
%% is considered recursive, because each receiving node may also
%% call recursive_gossip therefore gossiping to their children.
%% The fan-out therefore expands logarithmically to cover the
%% entire cluster.
recursive_gossip(Ring, Node) ->
Nodes = riak_core_ring:active_members(Ring),
Tree = riak_core_util:build_tree(2, Nodes, [cycles]),
Children = orddict:fetch(Node, Tree),
[send_ring(node(), OtherNode) || OtherNode <- Children],
ok.
recursive_gossip(Ring) ->
%% A non-active member will not show-up in the tree decomposition
%% and therefore we fallback to random_recursive_gossip as necessary.
Active = riak_core_ring:active_members(Ring),
case lists:member(node(), Active) of
true ->
recursive_gossip(Ring, node());
false ->
random_recursive_gossip(Ring)
end.
random_recursive_gossip(Ring) ->
Active = riak_core_ring:active_members(Ring),
RNode = lists:nth(random:uniform(length(Active)), Active),
recursive_gossip(Ring, RNode).
%% ===================================================================
%% gen_server behaviour
%% ===================================================================
%% @private
init(_State) ->
schedule_next_reset(),
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
{Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
State = update_known_versions(Ring,
#state{gossip_versions=orddict:new(),
gossip_tokens=Tokens}),
{ok, State}.
handle_call(legacy_gossip, _From, State) ->
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
Reply = check_legacy_gossip(Ring, State),
{reply, Reply, State};
handle_call({legacy_gossip, Node}, _From, State) ->
{ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
State2 = update_known_versions(MyRing, State),
Reply = known_legacy_gossip(Node, State2),
{reply, Reply, State2};
handle_call(_, _From, State) ->
{reply, ok, State}.
update_gossip_version(Ring) ->
CurrentVsn = riak_core_ring:get_member_meta(Ring, node(), gossip_vsn),
DesiredVsn = gossip_version(),
case CurrentVsn of
DesiredVsn ->
Ring;
_ ->
Ring2 = riak_core_ring:update_member_meta(node(), Ring, node(),
gossip_vsn, DesiredVsn),
Ring2
end.
known_legacy_gossip(Node, State) ->
case orddict:find(Node, State#state.gossip_versions) of
error ->
true;
{ok, ?LEGACY_RING_VSN} ->
true;
_ ->
false
end.
check_legacy_gossip(Ring, State) ->
{ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
State2 = update_known_versions(MyRing, State),
case riak_core_ring:legacy_ring(Ring) of
true ->
true;
false ->
%% If any member is using legacy gossip, then we use legacy gossip.
Members = riak_core_ring:all_members(Ring),
Legacy = [known_legacy_gossip(Node, State2) || Node <- Members],
Result = lists:any(fun(E) -> E =:= true end, Legacy),
Result
end.
update_known_version(Node, {OtherRing, GVsns}) ->
case riak_core_ring:get_member_meta(OtherRing, Node, gossip_vsn) of
undefined ->
case riak_core_ring:owner_node(OtherRing) of
Node ->
%% Ring owner defaults to legacy gossip if unspecified.
{OtherRing, orddict:store(Node, ?LEGACY_RING_VSN, GVsns)};
_ ->
{OtherRing, GVsns}
end;
GossipVsn ->
{OtherRing, orddict:store(Node, GossipVsn, GVsns)}
end.
update_known_versions(OtherRing, State=#state{gossip_versions=GVsns}) ->
{_, GVsns2} = lists:foldl(fun update_known_version/2,
{OtherRing, GVsns},
riak_core_ring:all_members(OtherRing)),
State#state{gossip_versions=GVsns2}.
gossip_version() ->
case app_helper:get_env(riak_core, legacy_gossip) of
true ->
?LEGACY_RING_VSN;
_ ->
?CURRENT_RING_VSN
end.
rpc_gossip_version(Ring, Node) ->
GossipVsn = riak_core_ring:get_member_meta(Ring, Node, gossip_vsn),
case GossipVsn of
undefined ->
case rpc:call(Node, riak_core_gossip, gossip_version, [], 1000) of
{badrpc, _} ->
?LEGACY_RING_VSN;
Vsn ->
Vsn
end;
_ ->
GossipVsn
end.
%% @private
handle_cast({send_ring_to, _Node}, State=#state{gossip_tokens=0}) ->
%% Out of gossip tokens, ignore the send request
{noreply, State};
handle_cast({send_ring_to, Node}, State) ->
{ok, MyRing0} = riak_core_ring_manager:get_raw_ring(),
MyRing = update_gossip_version(MyRing0),
GossipVsn = case gossip_version() of
?LEGACY_RING_VSN ->
?LEGACY_RING_VSN;
_ ->
rpc_gossip_version(MyRing, Node)
end,
RingOut = riak_core_ring:downgrade(GossipVsn, MyRing),
riak_core_ring:check_tainted(RingOut,
"Error: riak_core_gossip/send_ring_to :: "
"Sending tainted ring over gossip"),
gen_server:cast({?MODULE, Node}, {reconcile_ring, RingOut}),
Tokens = State#state.gossip_tokens - 1,
{noreply, State#state{gossip_tokens=Tokens}};
handle_cast({distribute_ring, Ring}, State) ->
RingOut = case check_legacy_gossip(Ring, State) of
true ->
riak_core_ring:downgrade(?LEGACY_RING_VSN, Ring);
false ->
Ring
end,
Nodes = riak_core_ring:active_members(Ring),
riak_core_ring:check_tainted(RingOut,
"Error: riak_core_gossip/distribute_ring :: "
"Sending tainted ring over gossip"),
gen_server:abcast(Nodes, ?MODULE, {reconcile_ring, RingOut}),
{noreply, State};
handle_cast({reconcile_ring, RingIn}, State) ->
OtherRing = riak_core_ring:upgrade(RingIn),
State2 = update_known_versions(OtherRing, State),
case check_legacy_gossip(RingIn, State2) of
true ->
LegacyRing = riak_core_ring:downgrade(?LEGACY_RING_VSN, OtherRing),
riak_core_gossip_legacy:handle_cast({reconcile_ring, LegacyRing},
State2),
{noreply, State2};
false ->
%% Compare the two rings, see if there is anything that
%% must be done to make them equal...
riak_core_stat:update(gossip_received),
riak_core_ring_manager:ring_trans(fun reconcile/2, [OtherRing]),
{noreply, State2}
end;
handle_cast(reset_tokens, State) ->
schedule_next_reset(),
gen_server:cast(?MODULE, gossip_ring),
{Tokens, _} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
{noreply, State#state{gossip_tokens=Tokens}};
handle_cast(gossip_ring, State) ->
% Gossip the ring to some random other node...
{ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
random_gossip(MyRing),
{noreply, State};
handle_cast({rejoin, RingIn}, State) ->
OtherRing = riak_core_ring:upgrade(RingIn),
{ok, Ring} = riak_core_ring_manager:get_raw_ring(),
SameCluster = (riak_core_ring:cluster_name(Ring) =:=
riak_core_ring:cluster_name(OtherRing)),
case SameCluster of
true ->
Legacy = check_legacy_gossip(Ring, State),
OtherNode = riak_core_ring:owner_node(OtherRing),
riak_core:join(Legacy, node(), OtherNode, true, true),
{noreply, State};
false ->
{noreply, State}
end;
handle_cast(_, State) ->
{noreply, State}.
%% @private
handle_info(_Info, State) -> {noreply, State}.
%% @private
terminate(_Reason, _State) ->
ok.
%% @private
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% ===================================================================
%% Internal functions
%% ===================================================================
schedule_next_reset() ->
{_, Reset} = app_helper:get_env(riak_core, gossip_limit, ?DEFAULT_LIMIT),
timer:apply_after(Reset, gen_server, cast, [?MODULE, reset_tokens]).
reconcile(Ring0, [OtherRing0]) ->
%% Due to rolling upgrades and legacy gossip, a ring's cluster name
%% may be temporarily undefined. This is eventually fixed by the claimant.
{Ring, OtherRing} = riak_core_ring:reconcile_names(Ring0, OtherRing0),
Node = node(),
OtherNode = riak_core_ring:owner_node(OtherRing),
Members = riak_core_ring:reconcile_members(Ring, OtherRing),
WrongCluster = (riak_core_ring:cluster_name(Ring) /=
riak_core_ring:cluster_name(OtherRing)),
PreStatus = riak_core_ring:member_status(Members, OtherNode),
IgnoreGossip = (WrongCluster or
(PreStatus =:= invalid) or
(PreStatus =:= down)),
case IgnoreGossip of
true ->
Ring2 = Ring,
Changed = false;
false ->
{Changed, Ring2} =
riak_core_ring:reconcile(OtherRing, Ring)
end,
OtherStatus = riak_core_ring:member_status(Ring2, OtherNode),
case {WrongCluster, OtherStatus, Changed} of
{true, _, _} ->
%% TODO: Tell other node to stop gossiping to this node.
riak_core_stat:update(ignored_gossip),
ignore;
{_, down, _} ->
%% Tell other node to rejoin the cluster.
riak_core_gossip:rejoin(OtherNode, Ring2),
ignore;
{_, invalid, _} ->
%% Exiting/Removed node never saw shutdown cast, re-send.
ClusterName = riak_core_ring:cluster_name(Ring),
riak_core_ring_manager:refresh_ring(OtherNode, ClusterName),
ignore;
{_, _, new_ring} ->
Ring3 = riak_core_ring:ring_changed(Node, Ring2),
riak_core_stat:update(rings_reconciled),
log_membership_changes(Ring, Ring3),
{reconciled_ring, Ring3};
{_, _, _} ->
ignore
end.
log_membership_changes(OldRing, NewRing) ->
OldStatus = orddict:from_list(riak_core_ring:all_member_status(OldRing)),
NewStatus = orddict:from_list(riak_core_ring:all_member_status(NewRing)),
%% Pad both old and new status to the same length
OldDummyStatus = [{Node, undefined} || {Node, _} <- NewStatus],
OldStatus2 = orddict:merge(fun(_, Status, _) ->
Status
end, OldStatus, OldDummyStatus),
NewDummyStatus = [{Node, undefined} || {Node, _} <- OldStatus],
NewStatus2 = orddict:merge(fun(_, Status, _) ->
Status
end, NewStatus, NewDummyStatus),
%% Merge again to determine changed status
orddict:merge(fun(_, Same, Same) ->
Same;
(Node, undefined, New) ->
lager:info("'~s' joined cluster with status '~s'~n",
[Node, New]);
(Node, Old, undefined) ->
lager:info("'~s' removed from cluster (previously: "
"'~s')~n", [Node, Old]);
(Node, Old, New) ->
lager:info("'~s' changed from '~s' to '~s'~n",
[Node, Old, New])
end, OldStatus2, NewStatus2),
ok.
remove_from_cluster(Ring, ExitingNode) ->
remove_from_cluster(Ring, ExitingNode, erlang:now()).
remove_from_cluster(Ring, ExitingNode, Seed) ->
% Get a list of indices owned by the ExitingNode...
AllOwners = riak_core_ring:all_owners(Ring),
% Transfer indexes to other nodes...
ExitRing =
case attempt_simple_transfer(Seed, Ring, AllOwners, ExitingNode) of
{ok, NR} ->
NR;
target_n_fail ->
%% re-diagonalize
%% first hand off all claims to *any* one else,
%% just so rebalance doesn't include exiting node
Members = riak_core_ring:claiming_members(Ring),
Other = hd(lists:delete(ExitingNode, Members)),
TempRing = lists:foldl(
fun({I,N}, R) when N == ExitingNode ->
riak_core_ring:transfer_node(I, Other, R);
(_, R) -> R
end,
Ring,
AllOwners),
riak_core_claim:claim_rebalance_n(TempRing, Other)
end,
ExitRing.
attempt_simple_transfer(Seed, Ring, Owners, ExitingNode) ->
TargetN = app_helper:get_env(riak_core, target_n_val),
attempt_simple_transfer(Seed, Ring, Owners,
TargetN,
ExitingNode, 0,
[{O,-TargetN} || O <- riak_core_ring:claiming_members(Ring),
O /= ExitingNode]).
attempt_simple_transfer(Seed, Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) ->
%% handoff
case [ N || {N, I} <- Last, Idx-I >= TargetN ] of
[] ->
target_n_fail;
Candidates ->
%% these nodes don't violate target_n in the reverse direction
StepsToNext = fun(Node) ->
length(lists:takewhile(
fun({_, Owner}) -> Node /= Owner end,
Rest))
end,
case lists:filter(fun(N) ->
Next = StepsToNext(N),
(Next+1 >= TargetN)
orelse (Next == length(Rest))
end,
Candidates) of
[] ->
target_n_fail;
Qualifiers ->
%% these nodes don't violate target_n forward
{Rand, Seed2} = random:uniform_s(length(Qualifiers), Seed),
Chosen = lists:nth(Rand, Qualifiers),
%% choose one, and do the rest of the ring
attempt_simple_transfer(
Seed2,
riak_core_ring:transfer_node(P, Chosen, Ring),
Rest, TargetN, Exit, Idx+1,
lists:keyreplace(Chosen, 1, Last, {Chosen, Idx}))
end
end;
attempt_simple_transfer(Seed, Ring, [{_, N}|Rest], TargetN, Exit, Idx, Last) ->
%% just keep track of seeing this node
attempt_simple_transfer(Seed, Ring, Rest, TargetN, Exit, Idx+1,
lists:keyreplace(N, 1, Last, {N, Idx}));
attempt_simple_transfer(_, Ring, [], _, _, _, _) ->
{ok, Ring}.
Jump to Line
Something went wrong with that request. Please try again.