Skip to content
Browse files

Overhaul cluster membership, ring format, and gossip protocol

Tickets: AZ462 AZ533 AZ630 AZ642
Fixes: BZ1024 BZ878 BZ688 BZ931 BZ869

Change cluster membership/gossip/ring in the following manner:
-- Partition ownership and cluster membership are decoupled concepts.

-- A single node in the cluster makes all partition claim decisions
   in a deterministic manner, avoiding endless churn possible in random
   merging. This single node (the claimant) can be assumed by any node in the
   cluster, and therefore avoids introducing a single point of failure.

-- All data from all vnode modules is guaranteed to be handed off to a new
   owner before the owner takes over ownership for a partition, thus avoiding
   requests prematurely being sent to a node that does not yet have the data.

-- Nodes will always handoff their data before cleaning exiting the cluster,
   even if the node is restarted before finishing handoff.

-- Waiting for ring convergence is now part of the gossip/membership protocol,
   and is used to ensure consistent ring transitions across the cluster.

-- Joining/leaving a cluster is handled more gracefully, and there is no
   need to wait for ring convergence (riak-admin ring-ready) as is previously
   suggested. Adding 20 nodes to a cluster at once should "just work".

-- Handoff related to partition ownership changes can now occur under load,
   therefore allowing a cluster to scale up/down while handling normal
   requests.

Other changes:
-- Support for new commands: member_status, ring_status.
-- Tracking of various new riak_core stats.
  • Loading branch information...
1 parent d7dd499 commit c4b80137998359f0db6eec769c0295db70e61739 @jtuple jtuple committed Sep 2, 2011
View
4 ebin/riak_core.app
@@ -14,13 +14,15 @@
merkerl,
priority_queue,
process_proxy,
+ riak_core_gossip_legacy,
riak_core,
riak_core_apl,
riak_core_app,
riak_core_bucket,
riak_core_cinfo_core,
riak_core_claim,
riak_core_config,
+ riak_core_console,
riak_core_coverage_fsm,
riak_core_coverage_plan,
riak_core_eventhandler_guard,
@@ -38,6 +40,7 @@
riak_core_ring_handler,
riak_core_ring_manager,
riak_core_ring_util,
+ riak_core_stat,
riak_core_status,
riak_core_sup,
riak_core_sysmon_handler,
@@ -46,6 +49,7 @@
riak_core_test_util,
riak_core_util,
riak_core_vnode,
+ riak_core_vnode_manager,
riak_core_vnode_master,
riak_core_vnode_sup,
riak_core_vnode_worker,
View
2 include/riak_core_ring.hrl
@@ -0,0 +1,2 @@
+-define(LEGACY_RING_VSN, 1).
+-define(CURRENT_RING_VSN, 2).
View
168 src/riak_core.erl
@@ -20,7 +20,8 @@
%%
%% -------------------------------------------------------------------
-module(riak_core).
--export([stop/0, stop/1, join/1, remove_from_cluster/1]).
+-export([stop/0, stop/1, join/1, join/3, remove/1, down/1, leave/0,
+ remove_from_cluster/1]).
-export([register_vnode_module/1, vnode_modules/0]).
-export([register/1, bucket_fixups/0]).
-export([add_guarded_event_handler/3, add_guarded_event_handler/4]).
@@ -50,6 +51,75 @@ stop(Reason) ->
join(NodeStr) when is_list(NodeStr) ->
join(riak_core_util:str_to_node(NodeStr));
join(Node) when is_atom(Node) ->
+ join(node(), Node).
+
+join(Node, Node) ->
+ {error, self_join};
+join(_, Node) ->
+ join(riak_core_gossip:legacy_gossip(), node(), Node).
+
+join(true, _, Node) ->
+ legacy_join(Node);
+join(false, _, Node) ->
+ case net_adm:ping(Node) of
+ pang ->
+ {error, not_reachable};
+ pong ->
+ case rpc:call(Node, riak_core_gossip, legacy_gossip, []) of
+ true ->
+ legacy_join(Node);
+ _ ->
+ %% Failure due to trying to join older node that
+ %% doesn't define legacy_gossip will be handled
+ %% in standard_join based on seeing a legacy ring.
+ standard_join(Node)
+ end
+ end.
+
+standard_join(Node) when is_atom(Node) ->
+ case net_adm:ping(Node) of
+ pong ->
+ case rpc:call(Node, riak_core_ring_manager, get_my_ring, []) of
+ {ok, Ring} ->
+ case riak_core_ring:legacy_ring(Ring) of
+ true ->
+ legacy_join(Node);
+ false ->
+ standard_join(Node, Ring)
+ end;
+ _ ->
+ {error, unable_to_get_join_ring}
+ end;
+ pang ->
+ {error, not_reachable}
+ end.
+
+standard_join(Node, Ring) ->
+ {ok, MyRing} = riak_core_ring_manager:get_my_ring(),
+ SameSize = (riak_core_ring:num_partitions(MyRing) =:=
+ riak_core_ring:num_partitions(Ring)),
+ Singleton = ([node()] =:= riak_core_ring:all_members(MyRing)),
+ case {Singleton, SameSize} of
+ {false, _} ->
+ {error, not_single_node};
+ {_, false} ->
+ {error, different_ring_sizes};
+ _ ->
+ GossipVsn = riak_core_gossip:gossip_version(),
+ Ring2 = riak_core_ring:add_member(node(), Ring,
+ node()),
+ Ring3 = riak_core_ring:set_owner(Ring2, node()),
+ Ring4 =
+ riak_core_ring:update_member_meta(node(),
+ Ring3,
+ node(),
+ gossip_vsn,
+ GossipVsn),
+ riak_core_ring_manager:set_my_ring(Ring4),
+ riak_core_gossip:send_ring(Node, node())
+ end.
+
+legacy_join(Node) when is_atom(Node) ->
{ok, OurRingSize} = application:get_env(riak_core, ring_creation_size),
case net_adm:ping(Node) of
pong ->
@@ -66,11 +136,105 @@ join(Node) when is_atom(Node) ->
{error, not_reachable}
end.
+remove(Node) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ case {riak_core_ring:all_members(Ring),
+ riak_core_ring:member_status(Ring, Node)} of
+ {_, invalid} ->
+ {error, not_member};
+ {[Node], _} ->
+ {error, only_member};
+ _ ->
+ case riak_core_gossip:legacy_gossip() of
+ true ->
+ legacy_remove(Node);
+ false ->
+ standard_remove(Node)
+ end
+ end.
+
+standard_remove(Node) ->
+ riak_core_ring_manager:ring_trans(
+ fun(Ring2, _) ->
+ Ring3 = riak_core_ring:remove_member(node(), Ring2, Node),
+ Ring4 = riak_core_ring:ring_changed(node(), Ring3),
+ {new_ring, Ring4}
+ end, []),
+ ok.
+
+down(Node) ->
+ down(riak_core_gossip:legacy_gossip(), Node).
+down(true, _) ->
+ {error, legacy_mode};
+down(false, Node) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ case net_adm:ping(Node) of
+ pong ->
+ {error, is_up};
+ pang ->
+ case {riak_core_ring:all_members(Ring),
+ riak_core_ring:member_status(Ring, Node)} of
+ {_, invalid} ->
+ {error, not_member};
+ {[Node], _} ->
+ {error, only_member};
+ _ ->
+ riak_core_ring_manager:ring_trans(
+ fun(Ring2, _) ->
+ Ring3 = riak_core_ring:down_member(node(), Ring2, Node),
+ Ring4 = riak_core_ring:ring_changed(node(), Ring3),
+ {new_ring, Ring4}
+ end, []),
+ ok
+ end
+ end.
+
+leave() ->
+ Node = node(),
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ case {riak_core_ring:all_members(Ring),
+ riak_core_ring:member_status(Ring, Node)} of
+ {_, invalid} ->
+ {error, not_member};
+ {[Node], _} ->
+ {error, only_member};
+ {_, valid} ->
+ case riak_core_gossip:legacy_gossip() of
+ true ->
+ legacy_remove(Node);
+ false ->
+ standard_leave(Node)
+ end;
+ {_, _} ->
+ {error, already_leaving}
+ end.
+
+standard_leave(Node) ->
+ riak_core_ring_manager:ring_trans(
+ fun(Ring2, _) ->
+ Ring3 = riak_core_ring:leave_member(Node, Ring2, Node),
+ {new_ring, Ring3}
+ end, []),
+ ok.
+
%% @spec remove_from_cluster(ExitingNode :: atom()) -> term()
%% @doc Cause all partitions owned by ExitingNode to be taken over
%% by other nodes.
remove_from_cluster(ExitingNode) when is_atom(ExitingNode) ->
- riak_core_gossip:remove_from_cluster(ExitingNode).
+ remove(ExitingNode).
+
+legacy_remove(Node) when is_atom(Node) ->
+ case catch(riak_core_gossip_legacy:remove_from_cluster(Node)) of
+ {'EXIT', {badarg, [{erlang, hd, [[]]}|_]}} ->
+ %% This is a workaround because
+ %% riak_core_gossip:remove_from_cluster doesn't check if
+ %% the result of subtracting the current node from the
+ %% cluster member list results in the empty list. When
+ %% that code gets refactored this can probably go away.
+ {error, only_member};
+ ok ->
+ ok
+ end.
vnode_modules() ->
case application:get_env(riak_core, vnode_modules) of
View
3 src/riak_core_apl.erl
@@ -172,7 +172,8 @@ last_in_ring() ->
six_node_test() ->
%% its non-trivial to create a real 6 node ring, so here's one we made
%% earlier
- {ok, [Ring]} = file:consult("../test/my_ring"),
+ {ok, [Ring0]} = file:consult("../test/my_ring"),
+ Ring = riak_core_ring:upgrade(Ring0),
%DocIdx = riak_core_util:chash_key({<<"foo">>, <<"bar">>}),
DocIdx = <<73,212,27,234,104,13,150,207,0,82,86,183,125,225,172,
154,135,46,6,112>>,
View
4 src/riak_core_app.erl
@@ -77,7 +77,9 @@ start(_StartType, _StartArgs) ->
lager:critical("Failed to read ring file: ~p",
[lager:posix_error(Reason)]),
throw({error, Reason});
- Ring ->
+ Ring0 ->
+ %% Upgrade the ring data structure if necessary.
+ Ring = riak_core_ring:upgrade(Ring0),
riak_core_ring_manager:set_my_ring(Ring)
end;
{error, not_found} ->
View
25 src/riak_core_claim.erl
@@ -55,10 +55,12 @@
-export([default_wants_claim/1, default_choose_claim/1,
never_wants_claim/1, random_choose_claim/1]).
-export([default_choose_claim/2,
+ default_wants_claim/2,
claim_rebalance_n/2]).
-ifdef(TEST).
-ifdef(EQC).
+-export([prop_claim_ensures_unique_nodes/0]).
-include_lib("eqc/include/eqc.hrl").
-endif.
-include_lib("eunit/include/eunit.hrl").
@@ -69,10 +71,11 @@
default_wants_claim(Ring) ->
default_wants_claim(Ring, node()).
-default_wants_claim(Ring, Node) ->
+default_wants_claim(Ring0, Node) ->
+ Ring = riak_core_ring:upgrade(Ring0),
%% Determine how many nodes are involved with the ring; if the requested
%% node is not yet part of the ring, include it in the count.
- AllMembers = riak_core_ring:all_members(Ring),
+ AllMembers = riak_core_ring:claiming_members(Ring),
case lists:member(Node, AllMembers) of
true ->
Mval = length(AllMembers);
@@ -101,7 +104,8 @@ default_wants_claim(Ring, Node) ->
default_choose_claim(Ring) ->
default_choose_claim(Ring, node()).
-default_choose_claim(Ring, Node) ->
+default_choose_claim(Ring0, Node) ->
+ Ring = riak_core_ring:upgrade(Ring0),
TargetN = app_helper:get_env(riak_core, target_n_val),
case meets_target_n(Ring, TargetN) of
{true, TailViolations} ->
@@ -148,7 +152,7 @@ meets_target_n([], TargetN, Index, First, Last) ->
claim_with_n_met(Ring, TailViolations, Node) ->
CurrentOwners = lists:keysort(1, riak_core_ring:all_owners(Ring)),
- Nodes = lists:usort([Node|riak_core_ring:all_members(Ring)]),
+ Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
case lists:sort([ I || {I, N} <- CurrentOwners, N == Node ]) of
[] ->
%% node hasn't claimed anything yet - just claim stuff
@@ -229,9 +233,10 @@ find_biggest_hole(Mine) ->
none,
lists:zip(Mine, tl(Mine)++[hd(Mine)])).
-claim_rebalance_n(Ring, Node) ->
+claim_rebalance_n(Ring0, Node) ->
+ Ring = riak_core_ring:upgrade(Ring0),
%% diagonal stripes guarantee most disperse data
- Nodes = lists:usort([Node|riak_core_ring:all_members(Ring)]),
+ Nodes = lists:usort([Node|riak_core_ring:claiming_members(Ring)]),
Partitions = lists:sort([ I || {I, _} <- riak_core_ring:all_owners(Ring) ]),
Zipped = lists:zip(Partitions,
lists:sublist(
@@ -246,9 +251,10 @@ claim_rebalance_n(Ring, Node) ->
Ring,
Zipped).
-random_choose_claim(Ring) ->
+random_choose_claim(Ring0) ->
+ Ring = riak_core_ring:upgrade(Ring0),
riak_core_ring:transfer_node(riak_core_ring:random_other_index(Ring),
- node(), Ring).
+ node(), Ring).
%% @spec never_wants_claim(riak_core_ring()) -> no
%% @doc For use by nodes that should not claim any partitions.
@@ -319,7 +325,8 @@ prop_claim_ensures_unique_nodes() ->
R0 = riak_core_ring:fresh(Partitions, Node0),
Rfinal = lists:foldl(fun(Node, Racc) ->
- default_choose_claim(Racc, Node)
+ Racc0 = riak_core_ring:add_member(Node0, Racc, Node),
+ default_choose_claim(Racc0, Node)
end, R0, RestNodes),
Preflists = riak_core_ring:all_preflists(Rfinal, Nval),
View
170 src/riak_core_console.erl
@@ -0,0 +1,170 @@
+%% -------------------------------------------------------------------
+%%
+%% Copyright (c) 2011 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+-module(riak_core_console).
+-export([member_status/0, ring_status/0]).
+
+member_status() ->
+ io:format("~33..=s Membership ~34..=s~n", ["", ""]),
+ io:format("Status Ring Pending Node~n"),
+ io:format("~79..-s~n", [""]),
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ AllStatus = lists:keysort(2, riak_core_ring:all_member_status(Ring)),
+ RingSize = riak_core_ring:num_partitions(Ring),
+ IsPending = ([] /= riak_core_ring:pending_changes(Ring)),
+
+ {Joining, Valid, Down, Leaving, Exiting} =
+ lists:foldl(fun({Node, Status},
+ {Joining0, Valid0, Down0, Leaving0, Exiting0}) ->
+ Indices = riak_core_ring:indices(Ring, Node),
+ NextIndices =
+ riak_core_ring:future_indices(Ring, Node),
+ RingPercent = length(Indices) * 100 / RingSize,
+ NextPercent = length(NextIndices) * 100 / RingSize,
+
+ StatusOut =
+ case riak_core_gossip:legacy_gossip(Node) of
+ true -> "(legacy)";
+ false -> Status
+ end,
+
+ case IsPending of
+ true ->
+ io:format("~-8s ~5.1f% ~5.1f% ~p~n",
+ [StatusOut, RingPercent,
+ NextPercent, Node]);
+ false ->
+ io:format("~-8s ~5.1f% -- ~p~n",
+ [StatusOut, RingPercent, Node])
+ end,
+ case Status of
+ joining ->
+ {Joining0 + 1, Valid0, Down0, Leaving0, Exiting0};
+ valid ->
+ {Joining0, Valid0 + 1, Down0, Leaving0, Exiting0};
+ down ->
+ {Joining0, Valid0, Down0 + 1, Leaving0, Exiting0};
+ leaving ->
+ {Joining0, Valid0, Down0, Leaving0 + 1, Exiting0};
+ exiting ->
+ {Joining0, Valid0, Down0, Leaving0, Exiting0 + 1}
+ end
+ end, {0,0,0,0,0}, AllStatus),
+ io:format("~79..-s~n", [""]),
+ io:format("Valid:~b / Leaving:~b / Exiting:~b / Joining:~b / Down:~b~n",
+ [Valid, Leaving, Exiting, Joining, Down]),
+ ok.
+
+ring_status() ->
+ case riak_core_gossip:legacy_gossip() of
+ true ->
+ io:format("Currently in legacy gossip mode.~n"),
+ ok;
+ false ->
+ {Claimant, RingReady, Down, MarkedDown, Changes} =
+ riak_core_status:ring_status(),
+ claimant_status(Claimant, RingReady),
+ ownership_status(Down, Changes),
+ unreachable_status(Down -- MarkedDown),
+ ok
+ end.
+
+claimant_status(Claimant, RingReady) ->
+ io:format("~34..=s Claimant ~35..=s~n", ["", ""]),
+ io:format("Claimant: ~p~n", [Claimant]),
+ case RingReady of
+ undefined ->
+ io:format("Status: down~n"
+ "Ring Ready: unknown~n", []);
+ _ ->
+ io:format("Status: up~n"
+ "Ring Ready: ~p~n", [RingReady])
+ end,
+ io:format("~n", []).
+
+ownership_status(Down, Changes) ->
+ io:format("~30..=s Ownership Handoff ~30..=s~n", ["", ""]),
+ case Changes of
+ [] ->
+ io:format("No pending changes.~n");
+ _ ->
+ orddict:fold(fun print_ownership_status/3, Down, Changes)
+ end,
+ io:format("~n", []).
+
+print_ownership_status({Owner, NextOwner}, Transfers, Down) ->
+ io:format("Owner: ~s~n"
+ "Next Owner: ~s~n", [Owner, NextOwner]),
+ case {lists:member(Owner, Down),
+ lists:member(NextOwner, Down)} of
+ {true, true} ->
+ io:format("~n"),
+ io:format("!!! ~s is DOWN !!!~n", [Owner]),
+ io:format("!!! ~s is DOWN !!!~n~n", [NextOwner]),
+ lists:foreach(fun print_index/1, Transfers);
+ {true, _} ->
+ io:format("~n"),
+ io:format("!!! ~s is DOWN !!!~n~n", [Owner]),
+ lists:foreach(fun print_index/1, Transfers);
+ {_, true} ->
+ io:format("~n"),
+ io:format("!!! ~s is DOWN !!!~n~n", [NextOwner]),
+ lists:foreach(fun print_index/1, Transfers);
+ _ ->
+ lists:foreach(fun print_transfer_status/1, Transfers)
+ end,
+ io:format("~n"),
+ io:format("~79..-s~n", [""]),
+ Down.
+
+print_index({Idx, _Waiting, _Complete, _Status}) ->
+ io:format("Index: ~b~n", [Idx]).
+
+print_transfer_status({Idx, Waiting, Complete, Status}) ->
+ io:format("~nIndex: ~b~n", [Idx]),
+ case Status of
+ complete ->
+ io:format(" All transfers complete. Waiting for "
+ "claimant to change ownership.~n");
+ awaiting ->
+ io:format(" Waiting on: ~p~n", [Waiting]),
+ case Complete of
+ [] ->
+ ok;
+ _ ->
+ io:format(" Complete: ~p~n", [Complete])
+ end
+ end.
+
+unreachable_status([]) ->
+ io:format("~30..=s Unreachable Nodes ~30..=s~n", ["", ""]),
+ io:format("All nodes are up and reachable~n", []),
+ io:format("~n", []);
+unreachable_status(Down) ->
+ io:format("~30..=s Unreachable Nodes ~30..=s~n", ["", ""]),
+ io:format("The following nodes are unreachable: ~p~n", [Down]),
+ io:format("~n", []),
+ io:format("WARNING: The cluster state will not converge until all nodes~n"
+ "are up. Once the above nodes come back online, convergence~n"
+ "will continue. If the outages are long-term or permanent, you~n"
+ "can either mark the nodes as down (riak-admin down NODE) or~n"
+ "forcibly remove the nodes from the cluster (riak-admin~n"
+ "force-remove NODE) to allow the remaining nodes to settle.~n"),
+ ok.
View
378 src/riak_core_gossip.erl
@@ -36,12 +36,19 @@
-export([start_link/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export ([distribute_ring/1, send_ring/1, send_ring/2, remove_from_cluster/1]).
+-export ([distribute_ring/1, send_ring/1, send_ring/2, remove_from_cluster/2,
+ finish_handoff/4, claim_until_balanced/2, random_gossip/1,
+ recursive_gossip/1, random_recursive_gossip/1, rejoin/2,
+ gossip_version/0, legacy_gossip/0, legacy_gossip/1]).
+
+-include_lib("riak_core_ring.hrl").
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
+-record(state, {gossip_versions}).
+
%% ===================================================================
%% Public API
%% ===================================================================
@@ -69,6 +76,56 @@ start_link() ->
stop() ->
gen_server:cast(?MODULE, stop).
+finish_handoff(Idx, Prev, New, Mod) ->
+ gen_server:call(?MODULE, {finish_handoff, Idx, Prev, New, Mod}).
+
+rejoin(Node, Ring) ->
+ gen_server:cast({?MODULE, Node}, {rejoin, Ring}).
+
+legacy_gossip() ->
+ gen_server:call(?MODULE, legacy_gossip).
+
+legacy_gossip(Node) ->
+ gen_server:call(?MODULE, {legacy_gossip, Node}).
+
+%% @doc Gossip state to a random node in the ring.
+random_gossip(Ring) ->
+ case riak_core_ring:random_other_active_node(Ring) of
+ no_node -> % must be single node cluster
+ ok;
+ RandomNode ->
+ send_ring(node(), RandomNode)
+ end.
+
+%% @doc Gossip state to a fixed set of nodes determined from a binary
+%% tree decomposition of the membership state. Recursive gossip
+%% converts the list of node members into a binary tree and
+%% gossips to the given node's right/left children. The gossip
+%% is considered recursive, because each receiving node may also
+%% call recursive_gossip therefore gossiping to their children.
+%% The fan-out therefore expands logarithmically to cover the
+%% entire cluster.
+recursive_gossip(Ring, Node) ->
+ Nodes = riak_core_ring:active_members(Ring),
+ Tree = riak_core_util:build_tree(2, Nodes, [cycles]),
+ Children = orddict:fetch(Node, Tree),
+ [send_ring(node(), OtherNode) || OtherNode <- Children],
+ ok.
+recursive_gossip(Ring) ->
+ %% A non-active member will not show-up in the tree decomposition
+ %% and therefore we fallback to random_recursive_gossip as necessary.
+ Active = riak_core_ring:active_members(Ring),
+ case lists:member(node(), Active) of
+ true ->
+ recursive_gossip(Ring, node());
+ false ->
+ random_recursive_gossip(Ring)
+ end.
+
+random_recursive_gossip(Ring) ->
+ Active = riak_core_ring:active_members(Ring),
+ RNode = lists:nth(random:uniform(length(Active)), Active),
+ recursive_gossip(Ring, RNode).
%% ===================================================================
%% gen_server behaviour
@@ -77,57 +134,207 @@ stop() ->
%% @private
init(_State) ->
schedule_next_gossip(),
- {ok, true}.
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ State = update_known_versions(Ring,
+ #state{gossip_versions=orddict:new()}),
+ {ok, State}.
%% @private
+handle_call({finish_handoff, Idx, Prev, New, Mod}, _From, State) ->
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ Owner = riak_core_ring:index_owner(Ring, Idx),
+ {_, NextOwner, Status} = riak_core_ring:next_owner(Ring, Idx, Mod),
+ NewStatus = riak_core_ring:member_status(Ring, New),
+
+ case {Owner, NextOwner, NewStatus, Status} of
+ {_, _, invalid, _} ->
+ %% Handing off to invalid node, don't give-up data.
+ {reply, continue, State};
+ {Prev, New, _, awaiting} ->
+ riak_core_ring_manager:ring_trans(
+ fun(Ring2, _) ->
+ Ring3 = riak_core_ring:handoff_complete(Ring2, Idx, Mod),
+ {new_ring, Ring3}
+ end, []),
+ {reply, forward, State};
+ {Prev, New, _, complete} ->
+ %% Do nothing
+ {reply, continue, State};
+ {Prev, _, _, _} ->
+ %% Handoff wasn't to node that is scheduled in next, so no change.
+ {reply, continue, State};
+ {_, _, _, _} ->
+ {reply, shutdown, State}
+ end;
+
+handle_call(legacy_gossip, _From, State) ->
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ Reply = check_legacy_gossip(Ring, State),
+ {reply, Reply, State};
+
+handle_call({legacy_gossip, Node}, _From, State) ->
+ {ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
+ State2 = update_known_versions(MyRing, State),
+ Reply = known_legacy_gossip(Node, State2),
+ {reply, Reply, State2};
+
handle_call(_, _From, State) ->
{reply, ok, State}.
+update_gossip_version(Ring) ->
+ CurrentVsn = riak_core_ring:get_member_meta(Ring, node(), gossip_vsn),
+ DesiredVsn = gossip_version(),
+ case CurrentVsn of
+ DesiredVsn ->
+ Ring;
+ _ ->
+ Ring2 = riak_core_ring:update_member_meta(node(), Ring, node(),
+ gossip_vsn, DesiredVsn),
+ Ring2
+ end.
-%% @private
-handle_cast({send_ring_to, Node}, RingChanged) ->
+
+known_legacy_gossip(Node, State) ->
+ case orddict:find(Node, State#state.gossip_versions) of
+ error ->
+ true;
+ {ok, ?LEGACY_RING_VSN} ->
+ true;
+ _ ->
+ false
+ end.
+
+check_legacy_gossip(Ring, State) ->
{ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
- gen_server:cast({?MODULE, Node}, {reconcile_ring, MyRing}),
- {noreply, RingChanged};
+ State2 = update_known_versions(MyRing, State),
+ case riak_core_ring:legacy_ring(Ring) of
+ true ->
+ true;
+ false ->
+ %% If any member is using legacy gossip, then we use legacy gossip.
+ Members = riak_core_ring:all_members(Ring),
+ Legacy = [known_legacy_gossip(Node, State2) || Node <- Members],
+ Result = lists:any(fun(E) -> E =:= true end, Legacy),
+ Result
+ end.
-handle_cast({distribute_ring, Ring}, RingChanged) ->
- Nodes = riak_core_ring:all_members(Ring),
- gen_server:abcast(Nodes, ?MODULE, {reconcile_ring, Ring}),
- {noreply, RingChanged};
+update_known_version(Node, {OtherRing, GVsns}) ->
+ case riak_core_ring:get_member_meta(OtherRing, Node, gossip_vsn) of
+ undefined ->
+ case riak_core_ring:owner_node(OtherRing) of
+ Node ->
+ %% Ring owner defaults to legacy gossip if unspecified.
+ {OtherRing, orddict:store(Node, ?LEGACY_RING_VSN, GVsns)};
+ _ ->
+ {OtherRing, GVsns}
+ end;
+ GossipVsn ->
+ {OtherRing, orddict:store(Node, GossipVsn, GVsns)}
+ end.
-handle_cast({reconcile_ring, OtherRing}, RingChanged) ->
- % Compare the two rings, see if there is anything that
- % must be done to make them equal...
- {ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
- case riak_core_ring:reconcile(OtherRing, MyRing) of
- {no_change, _} ->
- {noreply, RingChanged};
-
- {new_ring, ReconciledRing} ->
- % Rebalance the new ring and save it
- BalancedRing = claim_until_balanced(ReconciledRing),
- riak_core_ring_manager:set_my_ring(BalancedRing),
-
- % Finally, push it out to another node - expect at least two nodes now
- RandomNode = riak_core_ring:random_other_node(BalancedRing),
- send_ring(node(), RandomNode),
- {noreply, true}
+update_known_versions(OtherRing, State=#state{gossip_versions=GVsns}) ->
+ {_, GVsns2} = lists:foldl(fun update_known_version/2,
+ {OtherRing, GVsns},
+ riak_core_ring:all_members(OtherRing)),
+ State#state{gossip_versions=GVsns2}.
+
+gossip_version() ->
+ case app_helper:get_env(riak_core, legacy_gossip) of
+ true ->
+ ?LEGACY_RING_VSN;
+ _ ->
+ ?CURRENT_RING_VSN
+ end.
+
+rpc_gossip_version(Ring, Node) ->
+ GossipVsn = riak_core_ring:get_member_meta(Ring, Node, gossip_vsn),
+ case GossipVsn of
+ undefined ->
+ case rpc:call(Node, riak_core_gossip, gossip_version, [], 1000) of
+ {badrpc, _} ->
+ ?LEGACY_RING_VSN;
+ Vsn ->
+ Vsn
+ end;
+ _ ->
+ GossipVsn
+ end.
+
+%% @private
+handle_cast({send_ring_to, Node}, State) ->
+ {ok, MyRing0} = riak_core_ring_manager:get_raw_ring(),
+ MyRing = update_gossip_version(MyRing0),
+ GossipVsn = case gossip_version() of
+ ?LEGACY_RING_VSN ->
+ ?LEGACY_RING_VSN;
+ _ ->
+ rpc_gossip_version(MyRing, Node)
+ end,
+ RingOut = riak_core_ring:downgrade(GossipVsn, MyRing),
+ gen_server:cast({?MODULE, Node}, {reconcile_ring, RingOut}),
+ {noreply, State};
+
+handle_cast({distribute_ring, Ring}, State) ->
+ RingOut = case check_legacy_gossip(Ring, State) of
+ true ->
+ riak_core_ring:downgrade(?LEGACY_RING_VSN, Ring);
+ false ->
+ Ring
+ end,
+ Nodes = riak_core_ring:active_members(Ring),
+ gen_server:abcast(Nodes, ?MODULE, {reconcile_ring, RingOut}),
+ {noreply, State};
+
+handle_cast({reconcile_ring, RingIn}, State) ->
+ OtherRing = riak_core_ring:upgrade(RingIn),
+ State2 = update_known_versions(OtherRing, State),
+ case check_legacy_gossip(RingIn, State2) of
+ true ->
+ LegacyRing = riak_core_ring:downgrade(?LEGACY_RING_VSN, OtherRing),
+ riak_core_gossip_legacy:handle_cast({reconcile_ring, LegacyRing},
+ State2),
+ {noreply, State2};
+ false ->
+ %% Compare the two rings, see if there is anything that
+ %% must be done to make them equal...
+ riak_core_stat:update(gossip_received),
+ riak_core_ring_manager:ring_trans(fun reconcile/2, [OtherRing]),
+ {noreply, State2}
end;
-handle_cast(gossip_ring, _RingChanged) ->
+handle_cast(gossip_ring, State) ->
% First, schedule the next round of gossip...
schedule_next_gossip(),
% Gossip the ring to some random other node...
{ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
- case riak_core_ring:random_other_node(MyRing) of
- no_node -> % must be single node cluster
+
+ %% Ensure vnodes necessary for ownership change are running
+ case riak_core_ring:disowning_indices(MyRing, node()) of
+ [] ->
ok;
- RandomNode ->
- send_ring(node(), RandomNode)
+ _ ->
+ riak_core_ring_events:force_update()
end,
- {noreply, false};
+
+ random_gossip(MyRing),
+ {noreply, State};
+
+handle_cast({rejoin, RingIn}, State) ->
+ OtherRing = riak_core_ring:upgrade(RingIn),
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ SameCluster = (riak_core_ring:cluster_name(Ring) =:=
+ riak_core_ring:cluster_name(OtherRing)),
+ case SameCluster of
+ true ->
+ Legacy = check_legacy_gossip(Ring, State),
+ OtherNode = riak_core_ring:owner_node(OtherRing),
+ riak_core:join(Legacy, node(), OtherNode),
+ {noreply, State};
+ false ->
+ {noreply, State}
+ end;
handle_cast(_, State) ->
{noreply, State}.
@@ -153,27 +360,95 @@ schedule_next_gossip() ->
Interval = random:uniform(MaxInterval),
timer:apply_after(Interval, gen_server, cast, [?MODULE, gossip_ring]).
-claim_until_balanced(Ring) ->
+reconcile(Ring0, [OtherRing0]) ->
+ %% Due to rolling upgrades and legacy gossip, a ring's cluster name
+ %% may be temporarily undefined. This is eventually fixed by the claimant.
+ {Ring, OtherRing} = riak_core_ring:reconcile_names(Ring0, OtherRing0),
+ Node = node(),
+ OtherNode = riak_core_ring:owner_node(OtherRing),
+ Members = riak_core_ring:reconcile_members(Ring, OtherRing),
+ WrongCluster = (riak_core_ring:cluster_name(Ring) /=
+ riak_core_ring:cluster_name(OtherRing)),
+ PreStatus = riak_core_ring:member_status(Members, OtherNode),
+ IgnoreGossip = (WrongCluster or
+ (PreStatus =:= invalid) or
+ (PreStatus =:= down)),
+ case IgnoreGossip of
+ true ->
+ Ring2 = Ring,
+ Changed = false;
+ false ->
+ {Changed, Ring2} =
+ riak_core_ring:reconcile(OtherRing, Ring)
+ end,
+ OtherStatus = riak_core_ring:member_status(Ring2, OtherNode),
+ case {WrongCluster, OtherStatus, Changed} of
+ {true, _, _} ->
+ %% TODO: Tell other node to stop gossiping to this node.
+ riak_core_stat:update(ignored_gossip),
+ ignore;
+ {_, down, _} ->
+ %% Tell other node to rejoin the cluster.
+ riak_core_gossip:rejoin(OtherNode, Ring2),
+ ignore;
+ {_, invalid, _} ->
+ %% Exiting/Removed node never saw shutdown cast, re-send.
+ ClusterName = riak_core_ring:cluster_name(Ring),
+ riak_core_ring_manager:refresh_ring(OtherNode, ClusterName),
+ ignore;
+ {_, _, new_ring} ->
+ Ring3 = riak_core_ring:ring_changed(Node, Ring2),
+ riak_core_stat:update(rings_reconciled),
+ log_membership_changes(Ring, Ring3),
+ {reconciled_ring, Ring3};
+ {_, _, _} ->
+ ignore
+ end.
+
+log_membership_changes(OldRing, NewRing) ->
+ OldStatus = orddict:from_list(riak_core_ring:all_member_status(OldRing)),
+ NewStatus = orddict:from_list(riak_core_ring:all_member_status(NewRing)),
+
+ %% Pad both old and new status to the same length
+ OldDummyStatus = [{Node, undefined} || {Node, _} <- NewStatus],
+ OldStatus2 = orddict:merge(fun(_, Status, _) ->
+ Status
+ end, OldStatus, OldDummyStatus),
+
+ NewDummyStatus = [{Node, undefined} || {Node, _} <- OldStatus],
+ NewStatus2 = orddict:merge(fun(_, Status, _) ->
+ Status
+ end, NewStatus, NewDummyStatus),
+
+ %% Merge again to determine changed status
+ orddict:merge(fun(_, Same, Same) ->
+ Same;
+ (Node, undefined, New) ->
+ lager:info("'~s' joined cluster with status '~s'~n",
+ [Node, New]);
+ (Node, Old, undefined) ->
+ lager:info("'~s' removed from cluster (previously: "
+ "'~s')~n", [Node, Old]);
+ (Node, Old, New) ->
+ lager:info("'~s' changed from '~s' to '~s'~n",
+ [Node, Old, New])
+ end, OldStatus2, NewStatus2),
+ ok.
+
+claim_until_balanced(Ring, Node) ->
{WMod, WFun} = app_helper:get_env(riak_core, wants_claim_fun),
- NeedsIndexes = apply(WMod, WFun, [Ring]),
+ NeedsIndexes = apply(WMod, WFun, [Ring, Node]),
case NeedsIndexes of
no ->
Ring;
{yes, _NumToClaim} ->
{CMod, CFun} = app_helper:get_env(riak_core, choose_claim_fun),
- NewRing = CMod:CFun(Ring),
- claim_until_balanced(NewRing)
+ NewRing = CMod:CFun(Ring, Node),
+ claim_until_balanced(NewRing, Node)
end.
-
-remove_from_cluster(ExitingNode) ->
- % Set the remote node to stop claiming.
- % Ignore return of rpc as this should succeed even if node is offline
- rpc:call(ExitingNode, application, set_env,
- [riak_core, wants_claim_fun, {riak_core_claim, never_wants_claim}]),
-
+remove_from_cluster(Ring, ExitingNode) ->
% Get a list of indices owned by the ExitingNode...
- {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
AllOwners = riak_core_ring:all_owners(Ring),
% Transfer indexes to other nodes...
@@ -185,7 +460,7 @@ remove_from_cluster(ExitingNode) ->
%% re-diagonalize
%% first hand off all claims to *any* one else,
%% just so rebalance doesn't include exiting node
- Members = riak_core_ring:all_members(Ring),
+ Members = riak_core_ring:claiming_members(Ring),
Other = hd(lists:delete(ExitingNode, Members)),
TempRing = lists:foldl(
fun({I,N}, R) when N == ExitingNode ->
@@ -196,21 +471,14 @@ remove_from_cluster(ExitingNode) ->
AllOwners),
riak_core_claim:claim_rebalance_n(TempRing, Other)
end,
-
- % Send the new ring to all nodes except the exiting node
- distribute_ring(ExitRing),
-
- % Set the new ring on the exiting node. This will trigger
- % it to begin handoff and cleanly leave the cluster.
- rpc:call(ExitingNode, riak_core_ring_manager, set_my_ring, [ExitRing]).
-
+ ExitRing.
attempt_simple_transfer(Ring, Owners, ExitingNode) ->
TargetN = app_helper:get_env(riak_core, target_n_val),
attempt_simple_transfer(Ring, Owners,
TargetN,
ExitingNode, 0,
- [{O,-TargetN} || O <- riak_core_ring:all_members(Ring),
+ [{O,-TargetN} || O <- riak_core_ring:claiming_members(Ring),
O /= ExitingNode]).
attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) ->
%% handoff
View
261 src/riak_core_gossip_legacy.erl
@@ -0,0 +1,261 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core: Core Riak Application
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc riak_core_gossip takes care of the mechanics of shuttling a from one
+%% node to another upon request by other Riak processes.
+%%
+%% Additionally, it occasionally checks to make sure the current node has its
+%% fair share of partitions, and also sends a copy of the ring to some other
+%% random node, ensuring that all nodes eventually synchronize on the same
+%% understanding of the Riak cluster. This interval is configurable, but
+%% defaults to once per minute.
+
+-module(riak_core_gossip_legacy).
+
+-behaviour(gen_server).
+
+-export([start_link/0, stop/0]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+-export ([distribute_ring/1, send_ring/1, send_ring/2, remove_from_cluster/1]).
+
+-include_lib("riak_core_ring.hrl").
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
+%% ===================================================================
+%% Public API
+%% ===================================================================
+
+%% distribute_ring/1 -
+%% Distribute a ring to all members of that ring.
+distribute_ring(Ring) ->
+ gen_server:cast({?MODULE, node()}, {distribute_ring, Ring}).
+
+%% send_ring/1 -
+%% Send the current node's ring to some other node.
+send_ring(ToNode) -> send_ring(node(), ToNode).
+
+%% send_ring/2 -
+%% Send the ring from one node to another node.
+%% Does nothing if the two nodes are the same.
+send_ring(Node, Node) ->
+ ok;
+send_ring(FromNode, ToNode) ->
+ gen_server:cast({?MODULE, FromNode}, {send_ring_to, ToNode}).
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+stop() ->
+ gen_server:cast(?MODULE, stop).
+
+
+%% ===================================================================
+%% gen_server behaviour
+%% ===================================================================
+
+%% @private
+init(_State) ->
+ schedule_next_gossip(),
+ {ok, true}.
+
+
+%% @private
+handle_call(_, _From, State) ->
+ {reply, ok, State}.
+
+
+%% @private
+handle_cast({send_ring_to, Node}, RingChanged) ->
+ {ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
+ gen_server:cast({?MODULE, Node}, {reconcile_ring, MyRing}),
+ {noreply, RingChanged};
+
+handle_cast({distribute_ring, Ring}, RingChanged) ->
+ Nodes = riak_core_ring:all_members(Ring),
+ gen_server:abcast(Nodes, ?MODULE, {reconcile_ring, Ring}),
+ {noreply, RingChanged};
+
+handle_cast({reconcile_ring, OtherRing}, RingChanged) ->
+ % Compare the two rings, see if there is anything that
+ % must be done to make them equal...
+ {ok, MyRing0} = riak_core_ring_manager:get_raw_ring(),
+ MyRing = riak_core_ring:downgrade(?LEGACY_RING_VSN, MyRing0),
+ case riak_core_ring:legacy_reconcile(OtherRing, MyRing) of
+ {no_change, _} ->
+ {noreply, RingChanged};
+
+ {new_ring, ReconciledRing} ->
+ % Rebalance the new ring and save it
+ BalancedRing = claim_until_balanced(ReconciledRing),
+ riak_core_ring_manager:set_my_ring(BalancedRing),
+
+ BalancedRing2 = riak_core_ring:upgrade(BalancedRing),
+ riak_core_gossip:random_gossip(BalancedRing2),
+ {noreply, true}
+ end;
+
+handle_cast(gossip_ring, _RingChanged) ->
+ % First, schedule the next round of gossip...
+ schedule_next_gossip(),
+
+ % Gossip the ring to some random other node...
+ {ok, MyRing} = riak_core_ring_manager:get_raw_ring(),
+ case riak_core_ring:random_other_node(MyRing) of
+ no_node -> % must be single node cluster
+ ok;
+ RandomNode ->
+ send_ring(node(), RandomNode)
+ end,
+ {noreply, false};
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%% @private
+handle_info(_Info, State) -> {noreply, State}.
+
+%% @private
+terminate(_Reason, _State) ->
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%% ===================================================================
+%% Internal functions
+%% ===================================================================
+
+schedule_next_gossip() ->
+ MaxInterval = app_helper:get_env(riak_core, gossip_interval),
+ Interval = random:uniform(MaxInterval),
+ timer:apply_after(Interval, gen_server, cast, [?MODULE, gossip_ring]).
+
+claim_until_balanced(Ring) ->
+ {WMod, WFun} = app_helper:get_env(riak_core, wants_claim_fun),
+ NeedsIndexes = apply(WMod, WFun, [Ring]),
+ case NeedsIndexes of
+ no ->
+ Ring;
+ {yes, _NumToClaim} ->
+ {CMod, CFun} = app_helper:get_env(riak_core, choose_claim_fun),
+ NewRing0 = CMod:CFun(Ring),
+ NewRing = riak_core_ring:downgrade(?LEGACY_RING_VSN, NewRing0),
+ claim_until_balanced(NewRing)
+ end.
+
+
+remove_from_cluster(ExitingNode) ->
+ % Set the remote node to stop claiming.
+ % Ignore return of rpc as this should succeed even if node is offline
+ rpc:call(ExitingNode, application, set_env,
+ [riak_core, wants_claim_fun, {riak_core_claim, never_wants_claim}]),
+
+ % Get a list of indices owned by the ExitingNode...
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ AllOwners = riak_core_ring:all_owners(Ring),
+
+ % Transfer indexes to other nodes...
+ ExitRing0 =
+ case attempt_simple_transfer(Ring, AllOwners, ExitingNode) of
+ {ok, NR} ->
+ NR;
+ target_n_fail ->
+ %% re-diagonalize
+ %% first hand off all claims to *any* one else,
+ %% just so rebalance doesn't include exiting node
+ Members = riak_core_ring:all_members(Ring),
+ Other = hd(lists:delete(ExitingNode, Members)),
+ TempRing = lists:foldl(
+ fun({I,N}, R) when N == ExitingNode ->
+ riak_core_ring:transfer_node(I, Other, R);
+ (_, R) -> R
+ end,
+ Ring,
+ AllOwners),
+ TempRing2 = riak_core_ring:downgrade(?LEGACY_RING_VSN,
+ TempRing),
+ TempRing3 = riak_core_claim:claim_rebalance_n(TempRing2, Other),
+ riak_core_ring:upgrade(TempRing3)
+ end,
+
+ ExitRing = riak_core_ring:downgrade(?LEGACY_RING_VSN, ExitRing0),
+
+ % Send the new ring to all nodes except the exiting node
+ riak_core_gossip:distribute_ring(ExitRing0),
+
+ % Set the new ring on the exiting node. This will trigger
+ % it to begin handoff and cleanly leave the cluster.
+ rpc:call(ExitingNode, riak_core_ring_manager, set_my_ring, [ExitRing]),
+
+ ok.
+
+
+attempt_simple_transfer(Ring, Owners, ExitingNode) ->
+ TargetN = app_helper:get_env(riak_core, target_n_val),
+ attempt_simple_transfer(Ring, Owners,
+ TargetN,
+ ExitingNode, 0,
+ [{O,-TargetN} || O <- riak_core_ring:all_members(Ring),
+ O /= ExitingNode]).
+attempt_simple_transfer(Ring, [{P, Exit}|Rest], TargetN, Exit, Idx, Last) ->
+ %% handoff
+ case [ N || {N, I} <- Last, Idx-I >= TargetN ] of
+ [] ->
+ target_n_fail;
+ Candidates ->
+ %% these nodes don't violate target_n in the reverse direction
+ StepsToNext = fun(Node) ->
+ length(lists:takewhile(
+ fun({_, Owner}) -> Node /= Owner end,
+ Rest))
+ end,
+ case lists:filter(fun(N) ->
+ Next = StepsToNext(N),
+ (Next+1 >= TargetN)
+ orelse (Next == length(Rest))
+ end,
+ Candidates) of
+ [] ->
+ target_n_fail;
+ Qualifiers ->
+ %% these nodes don't violate target_n forward
+ Chosen = lists:nth(random:uniform(length(Qualifiers)),
+ Qualifiers),
+ %% choose one, and do the rest of the ring
+ attempt_simple_transfer(
+ riak_core_ring:transfer_node(P, Chosen, Ring),
+ Rest, TargetN, Exit, Idx+1,
+ lists:keyreplace(Chosen, 1, Last, {Chosen, Idx}))
+ end
+ end;
+attempt_simple_transfer(Ring, [{_, N}|Rest], TargetN, Exit, Idx, Last) ->
+ %% just keep track of seeing this node
+ attempt_simple_transfer(Ring, Rest, TargetN, Exit, Idx+1,
+ lists:keyreplace(N, 1, Last, {N, Idx}));
+attempt_simple_transfer(Ring, [], _, _, _, _) ->
+ {ok, Ring}.
View
2 src/riak_core_handoff_manager.erl
@@ -62,7 +62,7 @@ handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) ->
handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
{noreply, State#state{excl=ordsets:del_element({Mod, Idx}, Excl)}};
handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
riak_core_ring_events:ring_update(Ring),
{noreply, State#state{excl=ordsets:add_element({Mod, Idx}, Excl)}}.
View
1,383 src/riak_core_ring.erl
@@ -30,7 +30,7 @@
-export([all_members/1,
all_owners/1,
- all_preflists/2,
+ all_preflists/2,
diff_nodes/2,
equal_rings/2,
fresh/0,
@@ -46,26 +46,86 @@
random_node/1,
random_other_index/1,
random_other_index/2,
- random_other_node/1,
- reconcile/2,
- rename_node/3,
+ random_other_node/1,
+ reconcile/2,
+ rename_node/3,
responsible_index/2,
- transfer_node/3,
- update_meta/3]).
+ transfer_node/3,
+ update_meta/3]).
+
+-export([cluster_name/1,
+ legacy_ring/1,
+ legacy_reconcile/2,
+ upgrade/1,
+ downgrade/2,
+ claimant/1,
+ member_status/2,
+ all_member_status/1,
+ update_member_meta/5,
+ get_member_meta/3,
+ add_member/3,
+ remove_member/3,
+ leave_member/3,
+ exit_member/3,
+ down_member/3,
+ active_members/1,
+ claiming_members/1,
+ ready_members/1,
+ random_other_active_node/1,
+ down_members/1,
+ set_owner/2,
+ indices/2,
+ future_indices/2,
+ disowning_indices/2,
+ pending_changes/1,
+ next_owner/2,
+ next_owner/3,
+ handoff_complete/3,
+ ring_ready/0,
+ ring_ready/1,
+ ring_ready_info/1,
+ ring_changed/2,
+ set_cluster_name/2,
+ reconcile_names/2,
+ reconcile_members/2]).
-export_type([riak_core_ring/0]).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
+-define(ROUT(S,A),ok).
+%%-define(ROUT(S,A),?debugFmt(S,A)).
+%%-define(ROUT(S,A),io:format(S,A)).
+
+-define(CHSTATE, #chstate_v2).
+-record(chstate_v2, {
+ nodename :: node(), % the Node responsible for this chstate
+ vclock :: vclock:vclock(), % for this chstate object, entries are
+ % {Node, Ctr}
+ chring :: chash:chash(), % chash ring of {IndexAsInt, Node} mappings
+ meta :: dict(), % dict of cluster-wide other data (primarily
+ % bucket N-value, etc)
+
+ clustername :: {node(), term()},
+ next :: [{integer(), node(), node(), [module()], awaiting | complete}],
+ members :: [{node(), {member_status(), vclock:vclock(), []}}],
+ claimant :: node(),
+ seen :: [{node(), vclock:vclock()}],
+ rvsn :: vclock:vclock()
+}).
+
+%% Legacy chstate
-record(chstate, {
nodename :: node(), % the Node responsible for this chstate
vclock, % for this chstate object, entries are {Node, Ctr}
chring :: chash:chash(), % chash ring of {IndexAsInt, Node} mappings
meta % dict of cluster-wide other data (primarily bucket N-value, etc)
}).
+-type member_status() :: valid | invalid | leaving | exiting.
+
%% type meta_entry(). Record for each entry in #chstate.meta
-record(meta_entry, {
value, % The value stored under this entry
@@ -75,22 +135,84 @@
}).
%% riak_core_ring() is the opaque data type used for partition ownership
--type riak_core_ring() :: #chstate{}.
+-type riak_core_ring() :: ?CHSTATE{}.
-type chstate() :: riak_core_ring().
+-type pending_change() :: {Owner :: node(),
+ NextOwner :: node(),
+ awaiting | complete}
+ | {undefined, undefined, undefined}.
+
%% ===================================================================
%% Public API
%% ===================================================================
-%% @doc Produce a list of all nodes that own any partitions.
+%% @doc Returns true if the given ring is a legacy ring.
+legacy_ring(#chstate{}) ->
+ true;
+legacy_ring(_) ->
+ false.
+
+%% @doc Upgrade old ring structures to the latest format.
+upgrade(Old=?CHSTATE{}) ->
+ Old;
+upgrade(Old=#chstate{}) ->
+ #chstate{nodename=Node,
+ vclock=VC,
+ chring=Ring,
+ meta=Meta} = Old,
+ New1 = ?CHSTATE{nodename=Node,
+ vclock=VC,
+ chring=Ring,
+ meta=Meta,
+ clustername=undefined,
+ next=[],
+ members=[],
+ claimant=undefined,
+ seen=[],
+ rvsn=VC},
+ MemberVC = vclock:increment(Node, vclock:fresh()),
+ Members = [{Member, {valid, MemberVC, []}}
+ || Member <- chash:members(Ring)],
+ New2 = New1?CHSTATE{members=Members},
+ case node() of
+ Node ->
+ GVsn = riak_core_gossip:gossip_version(),
+ update_member_meta(Node, New2, Node,
+ gossip_vsn, GVsn, same_vclock);
+ _ ->
+ New2
+ end.
+
+%% @doc Downgrade the latest ring structure to a specified version.
+downgrade(1,?CHSTATE{nodename=Node,
+ vclock=VC,
+ chring=Ring,
+ meta=Meta}) ->
+ #chstate{nodename=Node,
+ vclock=VC,
+ chring=Ring,
+ meta=Meta};
+downgrade(2,State=?CHSTATE{}) ->
+ State.
+
+%% @doc Produce a list of all nodes that are members of the cluster
-spec all_members(State :: chstate()) -> [Node :: term()].
-all_members(State) ->
- chash:members(State#chstate.chring).
+all_members(?CHSTATE{members=Members}) ->
+ get_members(Members).
+
+%% @doc Produce a list of all active (not marked as down) cluster members
+active_members(?CHSTATE{members=Members}) ->
+ get_members(Members, [joining, valid, leaving, exiting]).
+
+%% @doc Returns a list of members guaranteed safe for requests
+ready_members(?CHSTATE{members=Members}) ->
+ get_members(Members, [valid, leaving]).
%% @doc Provide all ownership information in the form of {Index,Node} pairs.
-spec all_owners(State :: chstate()) -> [{Index :: integer(), Node :: term()}].
all_owners(State) ->
- chash:nodes(State#chstate.chring).
+ chash:nodes(State?CHSTATE.chring).
%% @doc Provide every preflist in the ring, truncated at N.
-spec all_preflists(State :: chstate(), N :: integer()) ->
@@ -108,7 +230,7 @@ diff_nodes(State1,State2) ->
lists:usort(lists:flatten(AllDiff)).
-spec equal_rings(chstate(), chstate()) -> boolean().
-equal_rings(_A=#chstate{chring=RA,meta=MA},_B=#chstate{chring=RB,meta=MB}) ->
+equal_rings(_A=?CHSTATE{chring=RA,meta=MA},_B=?CHSTATE{chring=RB,meta=MB}) ->
MDA = lists:sort(dict:to_list(MA)),
MDB = lists:sort(dict:to_list(MB)),
case MDA =:= MDB of
@@ -132,24 +254,32 @@ fresh(NodeName) ->
%% Called by fresh/1, and otherwise only intended for testing purposes.
-spec fresh(RingSize :: integer(), NodeName :: term()) -> chstate().
fresh(RingSize, NodeName) ->
- #chstate{nodename=NodeName,
- vclock=vclock:fresh(),
- chring=chash:fresh(RingSize, NodeName),
- meta=dict:new()}.
+ VClock=vclock:increment(NodeName, vclock:fresh()),
+ GossipVsn = riak_core_gossip:gossip_version(),
+ ?CHSTATE{nodename=NodeName,
+ clustername={NodeName, erlang:now()},
+ members=[{NodeName, {valid, VClock, [{gossip_vsn, GossipVsn}]}}],
+ chring=chash:fresh(RingSize, NodeName),
+ next=[],
+ claimant=NodeName,
+ seen=[{NodeName, VClock}],
+ rvsn=VClock,
+ vclock=VClock,
+ meta=dict:new()}.
% @doc Return a value from the cluster metadata dict
-spec get_meta(Key :: term(), State :: chstate()) ->
{ok, term()} | undefined.
get_meta(Key, State) ->
- case dict:find(Key, State#chstate.meta) of
+ case dict:find(Key, State?CHSTATE.meta) of
error -> undefined;
{ok, M} -> {ok, M#meta_entry.value}
end.
%% @doc return the names of all the custom buckets stored in the ring.
-spec get_buckets(State :: chstate()) -> [term()].
get_buckets(State) ->
- Keys = dict:fetch_keys(State#chstate.meta),
+ Keys = dict:fetch_keys(State?CHSTATE.meta),
lists:foldl(
fun({bucket, Bucket}, Acc) ->
[Bucket|Acc];
@@ -170,18 +300,18 @@ my_indices(State) ->
%% @doc Return the number of partitions in this Riak ring.
-spec num_partitions(State :: chstate()) -> integer().
num_partitions(State) ->
- chash:size(State#chstate.chring).
+ chash:size(State?CHSTATE.chring).
%% @doc Return the node that is responsible for a given chstate.
-spec owner_node(State :: chstate()) -> Node :: term().
owner_node(State) ->
- State#chstate.nodename.
+ State?CHSTATE.nodename.
%% @doc For a given object key, produce the ordered list of
%% {partition,node} pairs that could be responsible for that object.
-spec preflist(Key :: binary(), State :: chstate()) ->
[{Index :: integer(), Node :: term()}].
-preflist(Key, State) -> chash:successors(Key, State#chstate.chring).
+preflist(Key, State) -> chash:successors(Key, State?CHSTATE.chring).
%% @doc Return a randomly-chosen node from amongst the owners.
-spec random_node(State :: chstate()) -> Node :: term().
@@ -219,44 +349,34 @@ random_other_node(State) ->
lists:nth(random:uniform(length(L)), L)
end.
-% @doc Incorporate another node's state into our view of the Riak world.
+%% @doc Return a randomly-chosen active node other than this one.
+-spec random_other_active_node(State :: chstate()) -> Node :: term() | no_node.
+random_other_active_node(State) ->
+ case lists:delete(node(), active_members(State)) of
+ [] ->
+ no_node;
+ L ->
+ lists:nth(random:uniform(length(L)), L)
+ end.
+
+%% @doc Incorporate another node's state into our view of the Riak world.
-spec reconcile(ExternState :: chstate(), MyState :: chstate()) ->
{no_change, chstate()} | {new_ring, chstate()}.
reconcile(ExternState, MyState) ->
- case vclock:equal(MyState#chstate.vclock, vclock:fresh()) of
- true ->
- {new_ring, #chstate{nodename=MyState#chstate.nodename,
- vclock=ExternState#chstate.vclock,
- chring=ExternState#chstate.chring,
- meta=ExternState#chstate.meta}};
- false ->
- case ancestors([ExternState, MyState]) of
- [OlderState] ->
- case vclock:equal(OlderState#chstate.vclock,
- MyState#chstate.vclock) of
- true ->
- {new_ring,
- #chstate{nodename=MyState#chstate.nodename,
- vclock=ExternState#chstate.vclock,
- chring=ExternState#chstate.chring,
- meta=ExternState#chstate.meta}};
- false -> {no_change, MyState}
- end;
- [] ->
- case equal_rings(ExternState,MyState) of
- true -> {no_change, MyState};
- false -> {new_ring, reconcile(MyState#chstate.nodename,
- ExternState, MyState)}
- end
- end
+ case internal_reconcile(MyState, ExternState) of
+ {false, State} ->
+ {no_change, State};
+ {true, State} ->
+ {new_ring, State}
end.
%% @doc Rename OldNode to NewNode in a Riak ring.
-spec rename_node(State :: chstate(), OldNode :: atom(), NewNode :: atom()) ->
chstate().
-rename_node(State=#chstate{chring=Ring, nodename=ThisNode}, OldNode, NewNode)
+rename_node(State=?CHSTATE{chring=Ring, nodename=ThisNode, members=Members,
+ seen=Seen}, OldNode, NewNode)
when is_atom(OldNode), is_atom(NewNode) ->
- State#chstate{
+ State?CHSTATE{
chring=lists:foldl(
fun({Idx, Owner}, AccIn) ->
case Owner of
@@ -265,34 +385,35 @@ rename_node(State=#chstate{chring=Ring, nodename=ThisNode}, OldNode, NewNode)
_ -> AccIn
end
end, Ring, riak_core_ring:all_owners(State)),
+ members=proplists:substitute_aliases([{OldNode, NewNode}], Members),
+ seen=proplists:substitute_aliases([{OldNode, NewNode}], Seen),
nodename=case ThisNode of OldNode -> NewNode; _ -> ThisNode end,
- vclock=vclock:increment(NewNode, State#chstate.vclock)}.
+ vclock=vclock:increment(NewNode, State?CHSTATE.vclock)}.
%% @doc Determine the integer ring index responsible
%% for a chash key.
-spec responsible_index(chash:index(), chstate()) -> integer().
-responsible_index(ChashKey, #chstate{chring=Ring}) ->
+responsible_index(ChashKey, ?CHSTATE{chring=Ring}) ->
<<IndexAsInt:160/integer>> = ChashKey,
chash:next_index(IndexAsInt, Ring).
-spec transfer_node(Idx :: integer(), Node :: term(), MyState :: chstate()) ->
chstate().
transfer_node(Idx, Node, MyState) ->
- case chash:lookup(Idx, MyState#chstate.chring) of
+ case chash:lookup(Idx, MyState?CHSTATE.chring) of
Node ->
MyState;
_ ->
- Me = MyState#chstate.nodename,
- VClock = vclock:increment(Me, MyState#chstate.vclock),
- CHRing = chash:update(Idx, Node, MyState#chstate.chring),
- #chstate{nodename=Me,vclock=VClock,chring=CHRing,
- meta=MyState#chstate.meta}
+ Me = MyState?CHSTATE.nodename,
+ VClock = vclock:increment(Me, MyState?CHSTATE.vclock),
+ CHRing = chash:update(Idx, Node, MyState?CHSTATE.chring),
+ MyState?CHSTATE{vclock=VClock,chring=CHRing}
end.
% @doc Set a key in the cluster metadata dict
-spec update_meta(Key :: term(), Val :: term(), State :: chstate()) -> chstate().
update_meta(Key, Val, State) ->
- Change = case dict:find(Key, State#chstate.meta) of
+ Change = case dict:find(Key, State?CHSTATE.meta) of
{ok, OldM} ->
Val /= OldM#meta_entry.value;
error ->
@@ -304,17 +425,258 @@ update_meta(Key, Val, State) ->
calendar:universal_time()),
value = Val
},
- VClock = vclock:increment(State#chstate.nodename,
- State#chstate.vclock),
- State#chstate{vclock=VClock,
- meta=dict:store(Key, M, State#chstate.meta)};
+ VClock = vclock:increment(State?CHSTATE.nodename,
+ State?CHSTATE.vclock),
+ State?CHSTATE{vclock=VClock,
+ meta=dict:store(Key, M, State?CHSTATE.meta)};
true ->
State
end.
-%% ====================================================================
-%% Internal functions
-%% ====================================================================
+%% @doc Return the current claimant.
+-spec claimant(State :: chstate()) -> node().
+claimant(?CHSTATE{claimant=Claimant}) ->
+ Claimant.
+
+%% @doc Returns the unique identifer for this cluster.
+-spec cluster_name(State :: chstate()) -> term().
+cluster_name(State) ->
+ State?CHSTATE.clustername.
+
+%% @doc Sets the unique identifer for this cluster.
+set_cluster_name(State, Name) ->
+ State?CHSTATE{clustername=Name}.
+
+reconcile_names(RingA=?CHSTATE{clustername=NameA},
+ RingB=?CHSTATE{clustername=NameB}) ->
+ case (NameA =:= undefined) or (NameB =:= undefined) of
+ true ->
+ {RingA?CHSTATE{clustername=undefined},
+ RingB?CHSTATE{clustername=undefined}};
+ false ->
+ {RingA, RingB}
+ end.
+
+%% @doc Returns the current membership status for a node in the cluster.
+-spec member_status(State :: chstate(), Node :: node()) -> member_status().
+member_status(?CHSTATE{members=Members}, Node) ->
+ member_status(Members, Node);
+member_status(Members, Node) ->
+ case orddict:find(Node, Members) of
+ {ok, {Status, _, _}} ->
+ Status;
+ _ ->
+ invalid
+ end.
+
+%% @doc Returns the current membership status for all nodes in the cluster.
+-spec all_member_status(State :: chstate()) -> [{node(), member_status()}].
+all_member_status(?CHSTATE{members=Members}) ->
+ [{Node, Status} || {Node, {Status, _VC, _}} <- Members, Status /= invalid].
+
+get_member_meta(State, Member, Key) ->
+ case orddict:find(Member, State?CHSTATE.members) of
+ error -> undefined;
+ {ok, {_, _, Meta}} ->
+ case orddict:find(Key, Meta) of
+ error ->
+ undefined;
+ {ok, Value} ->
+ Value
+ end
+ end.
+
+%% @doc Set a key in the member metadata orddict
+update_member_meta(Node, State, Member, Key, Val) ->
+ VClock = vclock:increment(Node, State?CHSTATE.vclock),
+ State2 = update_member_meta(Node, State, Member, Key, Val, same_vclock),
+ State2?CHSTATE{vclock=VClock}.
+
+update_member_meta(Node, State, Member, Key, Val, same_vclock) ->
+ Members = State?CHSTATE.members,
+ case orddict:is_key(Member, Members) of
+ true ->
+ Members2 = orddict:update(Member,
+ fun({Status, VC, MD}) ->
+ {Status,
+ vclock:increment(Node, VC),
+ orddict:store(Key, Val, MD)}
+ end,
+ Members),
+ State?CHSTATE{members=Members2};
+ false ->
+ State
+ end.
+
+add_member(PNode, State, Node) ->
+ set_member(PNode, State, Node, joining).
+
+remove_member(PNode, State, Node) ->
+ set_member(PNode, State, Node, invalid).
+
+leave_member(PNode, State, Node) ->
+ set_member(PNode, State, Node, leaving).
+
+exit_member(PNode, State, Node) ->
+ set_member(PNode, State, Node, exiting).
+
+down_member(PNode, State, Node) ->
+ set_member(PNode, State, Node, down).
+
+%% @doc Return a list of all members of the cluster that are eligible to
+%% claim partitions.
+-spec claiming_members(State :: chstate()) -> [Node :: node()].
+claiming_members(?CHSTATE{members=Members}) ->
+ get_members(Members, [joining, valid, down]).
+
+%% @doc Return a list of all members of the cluster that are marked as down.
+-spec down_members(State :: chstate()) -> [Node :: node()].
+down_members(?CHSTATE{members=Members}) ->
+ get_members(Members, [down]).
+
+%% @doc Set the node that is responsible for a given chstate.
+-spec set_owner(State :: chstate(), Node :: node()) -> chstate().
+set_owner(State, Node) ->
+ State?CHSTATE{nodename=Node}.
+
+%% @doc Return all partition indices owned by a node.
+-spec indices(State :: chstate(), Node :: node()) -> [integer()].
+indices(State, Node) ->
+ AllOwners = all_owners(State),
+ [Idx || {Idx, Owner} <- AllOwners, Owner =:= Node].
+
+%% @doc Return all partition indices that will be owned by a node after all
+%% pending ownership transfers have completed.
+-spec future_indices(State :: chstate(), Node :: node()) -> [integer()].
+future_indices(State, Node) ->
+ FutureState = change_owners(State, all_next_owners(State)),
+ indices(FutureState, Node).
+
+%% @doc Return all indices that a node is scheduled to give to another.
+disowning_indices(State, Node) ->
+ [Idx || {Idx, Owner, _NextOwner, _Mods, _Status} <- State?CHSTATE.next,
+ Owner =:= Node].
+
+%% @doc Returns a list of all pending ownership transfers.
+pending_changes(State) ->
+ %% For now, just return next directly.
+ State?CHSTATE.next.
+
+%% @doc Return details for a pending partition ownership change.
+-spec next_owner(State :: chstate(), Idx :: integer()) -> pending_change().
+next_owner(State, Idx) ->
+ case lists:keyfind(Idx, 1, State?CHSTATE.next) of
+ false ->
+ {undefined, undefined, undefined};
+ NInfo ->
+ next_owner(NInfo)
+ end.
+
+%% @doc Return details for a pending partition ownership change.
+-spec next_owner(State :: chstate(), Idx :: integer(),
+ Mod :: module()) -> pending_change().
+next_owner(State, Idx, Mod) ->
+ case lists:keyfind(Idx, 1, State?CHSTATE.next) of
+ false ->
+ {undefined, undefined, undefined};
+ {_, Owner, NextOwner, _Transfers, complete} ->
+ {Owner, NextOwner, complete};
+ {_, Owner, NextOwner, Transfers, _Status} ->
+ case ordsets:is_element(Mod, Transfers) of
+ true ->
+ {Owner, NextOwner, complete};
+ false ->
+ {Owner, NextOwner, awaiting}
+ end
+ end.
+
+%% @doc Returns true if all cluster members have seen the current ring.
+-spec ring_ready(State :: chstate()) -> boolean().
+ring_ready(State0) ->
+ Owner = owner_node(State0),
+ State = update_seen(Owner, State0),
+ Seen = State?CHSTATE.seen,
+ Members = get_members(State?CHSTATE.members, [valid, leaving]),
+ VClock = State?CHSTATE.vclock,
+ R = [begin
+ case orddict:find(Node, Seen) of
+ error ->
+ false;
+ {ok, VC} ->
+ vclock:equal(VClock, VC)
+ end
+ end || Node <- Members],
+ Ready = lists:all(fun(X) -> X =:= true end, R),
+ Ready.
+
+ring_ready() ->
+ {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
+ ring_ready(Ring).
+
+ring_ready_info(State0) ->
+ Owner = owner_node(State0),
+ State = update_seen(Owner, State0),
+ Seen = State?CHSTATE.seen,
+ Members = get_members(State?CHSTATE.members, [valid, leaving]),
+ RecentVC =
+ orddict:fold(fun(_, VC, Recent) ->
+ case vclock:descends(VC, Recent) of
+ true ->
+ VC;
+ false ->
+ Recent
+ end
+ end, State?CHSTATE.vclock, Seen),
+ Outdated =
+ orddict:filter(fun(Node, VC) ->
+ (not vclock:equal(VC, RecentVC))
+ and lists:member(Node, Members)
+ end, Seen),
+ Outdated.
+
+%% @doc Marks a pending transfer as completed.
+-spec handoff_complete(State :: chstate(), Idx :: integer(),
+ Mod :: module()) -> chstate().
+handoff_complete(State, Idx, Mod) ->
+ transfer_complete(State, Idx, Mod).
+
+ring_changed(Node, State) ->
+ internal_ring_changed(Node, State).
+
+%% ===================================================================
+%% Legacy reconciliation
+%% ===================================================================
+
+%% @doc Incorporate another node's state into our view of the Riak world.
+legacy_reconcile(ExternState, MyState) ->
+ case vclock:equal(MyState#chstate.vclock, vclock:fresh()) of
+ true ->
+ {new_ring, #chstate{nodename=MyState#chstate.nodename,
+ vclock=ExternState#chstate.vclock,
+ chring=ExternState#chstate.chring,
+ meta=ExternState#chstate.meta}};
+ false ->
+ case ancestors([ExternState, MyState]) of
+ [OlderState] ->
+ case vclock:equal(OlderState#chstate.vclock,
+ MyState#chstate.vclock) of
+ true ->
+ {new_ring,
+ #chstate{nodename=MyState#chstate.nodename,
+ vclock=ExternState#chstate.vclock,
+ chring=ExternState#chstate.chring,
+ meta=ExternState#chstate.meta}};
+ false -> {no_change, MyState}
+ end;
+ [] ->
+ case legacy_equal_rings(ExternState,MyState) of
+ true -> {no_change, MyState};
+ false -> {new_ring,
+ legacy_reconcile(MyState#chstate.nodename,
+ ExternState, MyState)}
+ end
+ end
+ end.
%% @private
ancestors(RingStates) ->
@@ -325,23 +687,22 @@ ancestors(RingStates) ->
lists:flatten(Ancest).
%% @private
-merge_meta(M1,M2) ->
- dict:merge(fun(_,D1,D2) -> pick_val(D1,D2) end, M1, M2).
-
-%% @private
-pick_val(M1,M2) ->
- case M1#meta_entry.lastmod > M2#meta_entry.lastmod of
- true -> M1;
- false -> M2
- end.
+legacy_equal_rings(_A=#chstate{chring=RA,meta=MA},
+ _B=#chstate{chring=RB,meta=MB}) ->
+ MDA = lists:sort(dict:to_list(MA)),
+ MDB = lists:sort(dict:to_list(MB)),
+ case MDA =:= MDB of
+ false -> false;
+ true -> RA =:= RB
+ end.
%% @private
% @doc If two states are mutually non-descendant, merge them anyway.
% This can cause a bit of churn, but should converge.
% @spec reconcile(MyNodeName :: term(),
% StateA :: chstate(), StateB :: chstate())
% -> chstate()
-reconcile(MyNodeName, StateA, StateB) ->
+legacy_reconcile(MyNodeName, StateA, StateB) ->
% take two states (non-descendant) and merge them
VClock = vclock:increment(MyNodeName,
vclock:merge([StateA#chstate.vclock,
@@ -353,6 +714,711 @@ reconcile(MyNodeName, StateA, StateB) ->
chring=CHRing,
meta=Meta}.
+%% =========================================================================
+%% Claimant rebalance/reassign logic
+%% (TODO: Consider refactoring into riak_core_gossip or riak_core_claimant)
+%% =========================================================================
+
+%% @private
+internal_ring_changed(Node, CState0) ->
+ CState = update_seen(Node, CState0),
+ case ring_ready(CState) of
+ false ->
+ CState;
+ true ->
+ {C1, CState2} = maybe_update_claimant(Node, CState),
+ {C2, CState3} = maybe_handle_joining(Node, CState2),
+ case C2 of
+ true ->
+ Changed = true,
+ CState5 = CState3;
+ false ->
+ {C3, CState4} = maybe_update_ring(Node, CState3),
+ {C4, CState5} = maybe_remove_exiting(Node, CState4),
+ Changed = (C1 or C2 or C3 or C4)
+ end,
+
+ %% Start/stop converge and rebalance delay timers
+ %% (converge delay)
+ %% -- Starts when claimant changes the ring
+ %% -- Stops when the ring converges (ring_ready)
+ %% (rebalance delay)
+ %% -- Starts when next changes from empty to non-empty
+ %% -- Stops when next changes from non-empty to empty
+ %%
+ IsClaimant = (CState5?CHSTATE.claimant =:= Node),
+ WasPending = ([] /= pending_changes(CState)),
+ IsPending = ([] /= pending_changes(CState5)),
+
+ %% Outer case statement already checks for ring_ready
+ case {IsClaimant, Changed} of
+ {true, true} ->
+ riak_core_stat:update(converge_timer_end),
+ riak_core_stat:update(converge_timer_begin);
+ {true, false} ->
+ riak_core_stat:update(converge_timer_end);
+ _ ->
+ ok
+ end,
+
+ case {IsClaimant, WasPending, IsPending} of
+ {true, false, true} ->
+ riak_core_stat:update(rebalance_timer_begin);
+ {true, true, false} ->
+ riak_core_stat:update(rebalance_timer_end);
+ _ ->
+ ok
+ end,
+
+ %% Set cluster name if it is undefined
+ case {IsClaimant, cluster_name(CState5)} of
+ {true, undefined} ->
+ ClusterName = {Node, erlang:now()},
+ riak_core_util:rpc_every_member(riak_core_ring_manager,
+ set_cluster_name,
+ [ClusterName],
+ 1000),
+ ok;
+ _ ->
+ ClusterName = cluster_name(CState5),
+ ok
+ end,
+
+ case Changed of
+ true ->
+ VClock = vclock:increment(Node, CState5?CHSTATE.vclock),
+ CState5?CHSTATE{vclock=VClock, clustername=ClusterName};
+ false ->
+ CState5
+ end
+ end.
+
+%% @private
+maybe_update_claimant(Node, CState) ->
+ Members = get_members(CState?CHSTATE.members, [valid, leaving]),
+ Claimant = CState?CHSTATE.claimant,
+ RVsn = CState?CHSTATE.rvsn,
+ NextClaimant = hd(Members ++ [undefined]),
+ ClaimantMissing = not lists:member(Claimant, Members),
+
+ case {ClaimantMissing, NextClaimant} of
+ {true, Node} ->
+ %% Become claimant
+ RVsn2 = vclock:increment(Claimant, RVsn),
+ CState2 = CState?CHSTATE{claimant=Node, rvsn=RVsn2},
+ {true, CState2};
+ _ ->
+ {false, CState}
+ end.
+
+%% @private
+maybe_update_ring(Node, CState) ->
+ Claimant = CState?CHSTATE.claimant,
+ case Claimant of
+ Node ->
+ case claiming_members(CState) of
+ [] ->
+ %% Consider logging an error/warning here or even
+ %% intentionally crashing. This state makes no logical
+ %% sense given that it represents a cluster without any
+ %% active nodes.
+ {false, CState};
+ _ ->
+ {Changed, CState2} = update_ring(Node, CState),
+ {Changed, CState2}
+ end;
+ _ ->
+ {false, CState}
+ end.
+
+%% @private
+maybe_remove_exiting(Node, CState) ->
+ Claimant = CState?CHSTATE.claimant,
+ case Claimant of
+ Node ->
+ Exiting = get_members(CState?CHSTATE.members, [exiting]),
+ Changed = (Exiting /= []),
+ CState2 =
+ lists:foldl(fun(ENode, CState0) ->
+ %% Tell exiting node to shutdown.
+ CName = cluster_name(CState),
+ riak_core_ring_manager:refresh_ring(ENode,
+ CName),
+ set_member(Node, CState0, ENode,
+ invalid, same_vclock)
+ end, CState, Exiting),
+ {Changed, CState2};
+ _ ->
+ {false, CState}
+ end.
+
+%% @private
+maybe_handle_joining(Node, CState) ->
+ Claimant = CState?CHSTATE.claimant,
+ case Claimant of
+ Node ->
+ Joining = get_members(CState?CHSTATE.members, [joining]),
+ Changed = (Joining /= []),
+ CState2 =
+ lists:foldl(fun(JNode, CState0) ->
+ set_member(Node, CState0, JNode,
+ valid, same_vclock)