Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: basho/riak_core
base: cet-bg-mgr-proto
...
head fork: basho/riak_core
compare: jdb-large-final.old
Checking mergeability… Don't worry, you can still create the pull request.
  • 16 commits
  • 15 files changed
  • 0 commit comments
  • 1 contributor
Commits on May 10, 2013
@jtuple jtuple Change riak_core_ring_manager to use gen_server 891c907
@jtuple jtuple Hybrid ring/mochiglobal approach 5633d4c
Commits on May 14, 2013
@jtuple jtuple Add chashbin and bucket metadata dc65891
@jtuple jtuple Optimize riak_core_apl using chashbin e811f21
@jtuple jtuple Optimize the "up nodes" logic of riak_core_apl
Change riak_core_apl to use an unordered list and lists:member rather
than ordsets and ordsets:is_element for determining up nodes. This
approach is much faster in practice. Both ordsets:is_element and
lists:member are O(N); however, lists:member is a BIF and has a
significantly lower constant factor. Likewise, this approach avoids
the lists:usort performed by ordsets:from_list.
d7ac297
@jtuple jtuple Optimize coverage plan using chashbin 2a3c363
@jtuple jtuple Update vnode manager to use chashbin e9a9acb
@jtuple jtuple Change vnode + capability managers to use intelligent ring polling c2e9df6
@jtuple jtuple Make vnode manager responsible for ensuring vnodes started ff09710
@jtuple jtuple Change handoff manager to use sets instead of ordsets acb7735
@jtuple jtuple Reduce file server bottleneck when writing ring to disk bc75dfe
@jtuple jtuple Optimize vnode manager
Change certain uses of orddict to dict.

Change update_never_started to only query ring indices when there
are actually unknown modules.
301616d
@jtuple jtuple Optimize riak_core_vnode_manager:compute_forwarding
Change from O(n*m) to O(m) + O(n)
09a9c11
@jtuple jtuple Change how vnode manager uses ETS to avoid table scans 09ff97b
@jtuple jtuple Randomize vnode inactivity timeout
The goal is to prevent multiple vnodes timing out simultaneously
and overloading the vnode manager with tons of messages.
4f28eb1
@jtuple jtuple Coalesce multiple handoff completion events to reduce ring churn
Introduces the new handoff helper gen_server that buffers completion
events for up to 100ms, updating the ring once with all events that
happened during the interval.
97b1988
View
2  ebin/riak_core.app
@@ -8,6 +8,7 @@
app_helper,
bloom,
chash,
+ chashbin,
gen_nb_server,
riak_core_gen_server,
json_pp,
@@ -75,6 +76,7 @@
riak_core_vnode_sup,
riak_core_vnode_worker,
riak_core_vnode_worker_pool,
+ riak_core_vnode_handoff_helper,
riak_core_web,
riak_core_wm_urlmap,
supervisor_pre_r14b04,
View
289 src/chashbin.erl
@@ -0,0 +1,289 @@
+-module(chashbin).
+-compile(export_all).
+
+%% 20 bytes for SHA, 2 bytes for node id
+-define(UNIT, 176).
+-define(ENTRY, binary-unit:?UNIT).
+
+create({Size, Owners}) ->
+ Nodes1 = [Node || {_, Node} <- Owners],
+ Nodes2 = lists:usort(Nodes1),
+ Nodes3 = lists:zip(Nodes2, lists:seq(1, length(Nodes2))),
+ Bin = create_bin(Owners, Nodes3, <<>>),
+ {Size, Bin, list_to_tuple(Nodes2)}.
+
+create_bin([], _, Bin) ->
+ Bin;
+create_bin([{Idx, Owner}|Owners], Nodes, Bin) ->
+ {Owner, Id} = lists:keyfind(Owner, 1, Nodes),
+ Bin2 = <<Bin/binary, Idx:160/integer, Id:16/integer>>,
+ create_bin(Owners, Nodes, Bin2).
+
+to_list({Size, Bin, Nodes}) ->
+ L = [{Idx, element(Id, Nodes)}
+ || <<Idx:160/integer, Id:16/integer>> <= Bin],
+ {Size, L}.
+
+to_list_filter(Pred, {_, Bin, Nodes}) ->
+ [{Idx, element(Id,Nodes)}
+ || <<Idx:160/integer, Id:16/integer>> <= Bin,
+ Pred({Idx, element(Id,Nodes)})].
+
+responsible_index(<<HashKey:160/integer>>, CHBin) ->
+ responsible_index(HashKey, CHBin);
+responsible_index(HashKey, {Size, _, _}) ->
+ Inc = chash:ring_increment(Size),
+ (((HashKey div Inc) + 1) rem Size) * Inc.
+
+responsible_position(<<HashKey:160/integer>>, CHBin) ->
+ responsible_position(HashKey, CHBin);
+responsible_position(HashKey, {Size, _, _}) ->
+ Inc = chash:ring_increment(Size),
+ ((HashKey div Inc) + 1) rem Size.
+
+index_position(<<Idx:160/integer>>, CHBin) ->
+ index_position(Idx, CHBin);
+index_position(Idx, {Size, _, _}) ->
+ Inc = chash:ring_increment(Size),
+ (Idx div Inc) rem Size.
+
+index_owner(Idx, CHBin) ->
+ case itr_value(exact_iterator(Idx, CHBin)) of
+ {Idx, Owner} ->
+ Owner;
+ Other ->
+ %% Match the behavior for riak_core_ring:index_owner/2
+ throw({badmatch, Idx, Other})
+ end.
+
+num_partitions({Size, _, _}) ->
+ Size.
+
+%%%%
+%% iterator
+%%%
+
+-type chashbin() :: {pos_integer(), binary(), tuple(node())}.
+-type iterator() :: {pos_integer(), chashbin()}.
+
+-spec iterator(integer(), chashbin()) -> iterator().
+iterator(first, CHBin) ->
+ {0, 0, CHBin};
+iterator(<<Idx:160/integer>>, CHBin) ->
+ iterator(Idx, CHBin);
+iterator(Idx, CHBin) ->
+ Pos = responsible_position(Idx, CHBin),
+ {Pos, Pos, CHBin}.
+
+exact_iterator(<<Idx:160/integer>>, CHBin) ->
+ exact_iterator(Idx, CHBin);
+exact_iterator(Idx, CHBin) ->
+ Pos = index_position(Idx, CHBin),
+ {Pos, Pos, CHBin}.
+
+itr_value({Pos, _, {_, Bin, Nodes}}) ->
+ <<_:Pos/binary-unit:?UNIT, Idx:160/integer, Id:16/integer, _/binary>> = Bin,
+ Owner = element(Id, Nodes),
+ {Idx, Owner}.
+
+itr_next({Pos, Start, CHBin={Size, _, _}}) ->
+ Pos2 = (Pos + 1) rem Size,
+ case Pos2 of
+ Start ->
+ done;
+ _ ->
+ {Pos2, Start, CHBin}
+ end.
+
+itr_pop(N, {Pos, Start, CHBin={Size, Bin, Nodes}}) ->
+ L =
+ case Bin of
+ <<_:Pos/?ENTRY, Bin2:N/?ENTRY, _/binary>> ->
+ [{Idx, element(Id, Nodes)}
+ || <<Idx:160/integer, Id:16/integer>> <= Bin2];
+ _ ->
+ Left = (N + Pos) - Size,
+ Skip = Pos - Left,
+ <<Bin3:Left/?ENTRY, _:Skip/?ENTRY, Bin2/binary>> = Bin,
+ L1 = [{Idx, element(Id, Nodes)}
+ || <<Idx:160/integer, Id:16/integer>> <= Bin2],
+ L2 = [{Idx, element(Id, Nodes)}
+ || <<Idx:160/integer, Id:16/integer>> <= Bin3],
+ L1 ++ L2
+ end,
+ Pos2 = (Pos + N) rem Size,
+ Itr2 = {Pos2, Start, CHBin},
+ {L, Itr2}.
+
+itr_next_while(Pred, Itr) ->
+ case Pred(itr_value(Itr)) of
+ false ->
+ Itr;
+ true ->
+ itr_next_while(Pred, itr_next(Itr))
+ end.
+
+key(Pos, Bin) ->
+ <<_:Pos/binary-unit:?UNIT, Key:160/integer, _/binary>> = Bin,
+ Key.
+
+r1(N, W) ->
+ Idx = <<0:160/integer>>,
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ UpNodes = riak_core_node_watcher:nodes(riak_kv),
+ parallel_time(N, W, fun() ->
+ riak_core_apl:get_apl(Idx, 3, Ring, UpNodes)
+ end).
+
+r2(N, W) ->
+ Idx = <<0:160/integer>>,
+ {ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
+ UpNodes = riak_core_node_watcher:nodes(riak_kv),
+ parallel_time(N, W, fun() ->
+ %% riak_core_apl:get_apl(Idx, 3, riak_kv)
+ riak_core_apl:get_apl_chbin(Idx, 3, CHBin, UpNodes)
+ end).
+
+parallel_time(N, W, F) ->
+ Work = fun() ->
+ loop(N, F)
+ end,
+ timer:tc(fun spawn_wait/2, [W, Work]).
+
+tt() ->
+ tt(100000).
+
+tt(N) ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ Owners = riak_core_ring:all_owners(Ring),
+ {First, _} = hd(Owners),
+ {Last, _} = hd(lists:reverse(Owners)),
+ {X1, Y1} = go(N, First),
+ {X2, Y2} = go(N, Last),
+ {X1, Y1, X2, Y2}.
+
+go(N) ->
+ DIdx = 68507889249886074290797726533575766546371837952,
+ go(N, DIdx).
+
+go(N, DIdx) ->
+ Idx = <<DIdx:160/integer>>,
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ {ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
+ UpNodes = riak_core_node_watcher:nodes(riak_kv),
+ T1 = time(N, fun() ->
+ riak_core_apl:get_apl(Idx, 3, Ring, UpNodes)
+ end),
+ T2 = time(N, fun() ->
+ %% riak_core_apl:get_apl(Idx, 3, riak_kv)
+ riak_core_apl:get_apl_chbin(Idx, 3, CHBin, UpNodes)
+ end),
+ {T1, T2}.
+
+apl2(N, DIdx) ->
+ Idx = <<DIdx:160/integer>>,
+ loop(N, fun() ->
+ riak_core_apl2:get_apl(Idx, 3, riak_kv)
+ end).
+
+time(N, F) ->
+ timer:tc(fun loop/2, [N, F]).
+
+loop(0, _) ->
+ ok;
+loop(N, F) ->
+ F(),
+ loop(N-1, F).
+
+spawn_wait(N, F) ->
+ spawn_n(N, F),
+ wait(N).
+
+spawn_n(0, _) ->
+ ok;
+spawn_n(N, F) ->
+ Self = self(),
+ spawn_link(fun() ->
+ F(),
+ Self ! done
+ end),
+ spawn_n(N-1, F).
+
+wait(0) ->
+ ok;
+wait(N) ->
+ receive
+ done ->
+ wait(N-1)
+ end.
+
+do_test(N) ->
+ [begin
+ {{Min1, Max1}} = do_test(Size, NumNodes, N),
+ io:format("~5s ~4s ~5s ~5s~n", [integer_to_list(Size),
+ integer_to_list(NumNodes),
+ integer_to_list(Min1),
+ integer_to_list(Max1)])
+ %% {{Min1, Max1}, {Min2, Max2}} = do_test(Size, NumNodes, N),
+ %% io:format("~5s ~4s ~5s ~5s ~5s ~5s~n", [integer_to_list(Size),
+ %% integer_to_list(NumNodes),
+ %% integer_to_list(Min1),
+ %% integer_to_list(Max1),
+ %% integer_to_list(Min2),
+ %% integer_to_list(Max2)])
+ end || Size <- [64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384],
+ NumNodes <- [1, 10, 100, 1000]],
+ ok.
+
+do_test(Size, NumNodes, N) ->
+ {Ring, Nodes} = fake_ring(Size, NumNodes),
+ Owners = riak_core_ring:all_owners(Ring),
+ CHBin = chashbin:create(riak_core_ring:chash(Ring)),
+ Idx1 = <<0:160/integer>>,
+ [_,_,_,{IdxInt, _}|_] = lists:reverse(Owners),
+ Idx2 = <<IdxInt:160/integer>>,
+ UpNodes = Nodes,
+ %% UpNodes = lists:usort(Nodes),
+ %% T1 = none,
+ {T1,_} = time(N, fun() ->
+ riak_core_apl:get_apl(Idx1, 3, Ring, UpNodes)
+ end),
+ %% {T2,_} = time(N, fun() ->
+ %% riak_core_apl:get_apl_chbin(Idx1, 3, CHBin, UpNodes)
+ %% end),
+ {T3,_} = time(N, fun() ->
+ riak_core_apl:get_apl(Idx2, 3, Ring, UpNodes)
+ end),
+ %% {T4,_} = time(N, fun() ->
+ %% riak_core_apl:get_apl_chbin(Idx2, 3, CHBin, UpNodes)
+ %% end),
+ %% T2 = eprof:profile(fun() ->
+ %% time(N, fun() ->
+ %% riak_core_apl:get_apl_chbin(Idx, 3, CHBin, UpNodes)
+ %% end)
+ %% end),
+ %% {T1, T2}.
+ {{round(T1 div N), round(T3 div N)}}.
+ %% {{round(T1 div N), round(T3 div N)},
+ %% {round(T2 div N), round(T4 div N)}}.
+
+fake_ring(Size, NumNodes) ->
+ Inc = chash:ring_increment(Size),
+ Nodes = [list_to_atom("dev" ++ integer_to_list(X) ++ "@127.0.0.1") || _ <- lists:seq(0,Size div NumNodes),
+ X <- lists:seq(1,NumNodes)],
+ Nodes2 = lists:sublist(Nodes, Size),
+ Indices = lists:seq(0, (Size-1)*Inc, Inc),
+ Owners = lists:zip(Indices, Nodes2),
+ Ring = riak_core_ring:fresh(Size, hd(Nodes2)),
+ Ring2 = lists:foldl(fun({Idx, Owner}, RingAcc) ->
+ riak_core_ring:transfer_node(Idx, Owner, RingAcc)
+ end, Ring, Owners),
+ {Ring2, Nodes2}.
+
+htest(N, NumNodes) ->
+ Nodes = [list_to_atom("dev" ++ integer_to_list(X) ++ "@127.0.0.1") || X <- lists:seq(1,NumNodes)],
+ time(N, fun() ->
+ [erlang:phash(Node, (1 bsl 27) - 1) || Node <- Nodes],
+ %% [erlang:phash2(Node) || Node <- Nodes],
+ ok
+ end).
View
95 src/riak_core_apl.erl
@@ -24,8 +24,9 @@
%% -------------------------------------------------------------------
-module(riak_core_apl).
-export([active_owners/1, active_owners/2,
- get_apl/3, get_apl/4, get_apl_ann/4,
- get_primary_apl/3, get_primary_apl/4
+ get_apl/3, get_apl/4, get_apl_ann/3, get_apl_ann/4,
+ get_primary_apl/3, get_primary_apl/4,
+ first_up/2, offline_owners/1, offline_owners/2
]).
-export_type([preflist/0, preflist2/0]).
@@ -38,6 +39,8 @@
-type ring() :: riak_core_ring:riak_core_ring().
-type preflist() :: [{index(), node()}].
-type preflist2() :: [{{index(), node()}, primary|fallback}].
+-type iterator() :: term().
+-type chashbin() :: term().
%% Return preflist of all active primary nodes (with no
%% substituion of fallbacks). Used to simulate a
@@ -49,7 +52,7 @@ active_owners(Service) ->
-spec active_owners(ring(), [node()]) -> preflist().
active_owners(Ring, UpNodes) ->
- UpNodes1 = ordsets:from_list(UpNodes),
+ UpNodes1 = UpNodes,
Primaries = riak_core_ring:all_owners(Ring),
{Up, _Pangs} = check_up(Primaries, UpNodes1, [], []),
Up.
@@ -57,8 +60,15 @@ active_owners(Ring, UpNodes) ->
%% Get the active preflist taking account of which nodes are up
-spec get_apl(binary(), n_val(), atom()) -> preflist().
get_apl(DocIdx, N, Service) ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- get_apl(DocIdx, N, Ring, riak_core_node_watcher:nodes(Service)).
+ {ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
+ get_apl_chbin(DocIdx, N, CHBin, riak_core_node_watcher:nodes(Service)).
+
+%% Get the active preflist taking account of which nodes are up
+%% for a given chash/upnodes list
+-spec get_apl_chbin(binary(), n_val(), ring(), [node()]) -> preflist().
+get_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
+ [{Partition, Node} || {{Partition, Node}, _Type} <-
+ get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes)].
%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list
@@ -68,11 +78,28 @@ get_apl(DocIdx, N, Ring, UpNodes) ->
get_apl_ann(DocIdx, N, Ring, UpNodes)].
%% Get the active preflist taking account of which nodes are up
+%% and annotate each node with type of primary/fallback
+get_apl_ann(DocIdx, N, UpNodes) ->
+ {ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
+ get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes).
+
+%% Get the active preflist taking account of which nodes are up
+%% for a given chash/upnodes list and annotate each node with type of
+%% primary/fallback
+-spec get_apl_ann_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist2().
+get_apl_ann_chbin(DocIdx, N, CHBin, UpNodes) ->
+ UpNodes1 = UpNodes,
+ Itr = chashbin:iterator(DocIdx, CHBin),
+ {Primaries, Itr2} = chashbin:itr_pop(N, Itr),
+ {Up, Pangs} = check_up(Primaries, UpNodes1, [], []),
+ Up ++ find_fallbacks_chbin(Pangs, Itr2, UpNodes1, []).
+
+%% Get the active preflist taking account of which nodes are up
%% for a given ring/upnodes list and annotate each node with type of
%% primary/fallback
-spec get_apl_ann(binary(), n_val(), ring(), [node()]) -> preflist2().
get_apl_ann(DocIdx, N, Ring, UpNodes) ->
- UpNodes1 = ordsets:from_list(UpNodes),
+ UpNodes1 = UpNodes,
Preflist = riak_core_ring:preflist(DocIdx, Ring),
{Primaries, Fallbacks} = lists:split(N, Preflist),
@@ -82,18 +109,49 @@ get_apl_ann(DocIdx, N, Ring, UpNodes) ->
%% Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), atom()) -> preflist2().
get_primary_apl(DocIdx, N, Service) ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- get_primary_apl(DocIdx, N, Ring, riak_core_node_watcher:nodes(Service)).
+ {ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
+ get_primary_apl_chbin(DocIdx, N, CHBin, riak_core_node_watcher:nodes(Service)).
+
+%% Same as get_apl, but returns only the primaries.
+-spec get_primary_apl_chbin(binary(), n_val(), chashbin(), [node()]) -> preflist2().
+get_primary_apl_chbin(DocIdx, N, CHBin, UpNodes) ->
+ UpNodes1 = UpNodes,
+ Itr = chashbin:iterator(DocIdx, CHBin),
+ {Primaries, _} = chashbin:itr_pop(N, Itr),
+ {Up, _} = check_up(Primaries, UpNodes1, [], []),
+ Up.
%% Same as get_apl, but returns only the primaries.
-spec get_primary_apl(binary(), n_val(), ring(), [node()]) -> preflist2().
get_primary_apl(DocIdx, N, Ring, UpNodes) ->
- UpNodes1 = ordsets:from_list(UpNodes),
+ UpNodes1 = UpNodes,
Preflist = riak_core_ring:preflist(DocIdx, Ring),
{Primaries, _} = lists:split(N, Preflist),
{Up, _} = check_up(Primaries, UpNodes1, [], []),
Up.
+%% Return the first entry that is up in the preflist for `DocIdx'. This
+%% will crash if all owning nodes are offline.
+first_up(DocIdx, Service) ->
+ CHBin = riak_core_ring_manager:get_chash_bin(),
+ Itr = chashbin:iterator(DocIdx, CHBin),
+ UpSet = ordsets:from_list(riak_core_node_watcher:nodes(Service)),
+ Itr2 = chashbin:itr_next_while(fun({_P, Node}) ->
+ not ordsets:is_element(Node, UpSet)
+ end, Itr),
+ chashbin:itr_value(Itr2).
+
+offline_owners(Service) ->
+ {ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
+ offline_owners(Service, CHBin).
+
+offline_owners(Service, CHBin) ->
+ UpSet = ordsets:from_list(riak_core_node_watcher:nodes(Service)),
+ DownVNodes = chashbin:to_list_filter(fun({_Index, Node}) ->
+ not is_up(Node, UpSet)
+ end, CHBin),
+ DownVNodes.
+
%% Split a preference list into up and down lists
-spec check_up(preflist(), [node()], preflist2(), preflist()) -> {preflist2(), preflist()}.
check_up([], _UpNodes, Up, Pangs) ->
@@ -121,9 +179,26 @@ find_fallbacks([{Partition, _Node}|Rest]=Pangs, [{_,FN}|Fallbacks], UpNodes, Sec
find_fallbacks(Pangs, Fallbacks, UpNodes, Secondaries)
end.
+%% Find fallbacks for downed nodes in the preference list
+-spec find_fallbacks_chbin(preflist(), iterator(),[node()], preflist2()) -> preflist2().
+find_fallbacks_chbin([], _Fallbacks, _UpNodes, Secondaries) ->
+ lists:reverse(Secondaries);
+find_fallbacks_chbin(_, done, _UpNodes, Secondaries) ->
+ lists:reverse(Secondaries);
+find_fallbacks_chbin([{Partition, _Node}|Rest]=Pangs, Itr, UpNodes, Secondaries) ->
+ {_, FN} = chashbin:itr_value(Itr),
+ Itr2 = chashbin:itr_next(Itr),
+ case is_up(FN, UpNodes) of
+ true ->
+ find_fallbacks_chbin(Rest, Itr2, UpNodes,
+ [{{Partition, FN}, fallback} | Secondaries]);
+ false ->
+ find_fallbacks_chbin(Pangs, Itr2, UpNodes, Secondaries)
+ end.
+
%% Return true if a node is up
is_up(Node, UpNodes) ->
- ordsets:is_element(Node, UpNodes).
+ lists:member(Node, UpNodes).
-ifdef(TEST).
View
11 src/riak_core_bucket.erl
@@ -96,15 +96,18 @@ merge_props(Overriding, Other) ->
%% </pre>
%%
get_bucket(Name) ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- get_bucket(Name, Ring).
-
+ Meta = riak_core_ring_manager:get_bucket_meta(Name),
+ get_bucket_props(Name, Meta).
%% @spec get_bucket(Name, Ring::riak_core_ring:riak_core_ring()) ->
%% BucketProps :: riak_core_bucketprops()
%% @private
get_bucket(Name, Ring) ->
- case riak_core_ring:get_meta({bucket, Name}, Ring) of
+ Meta = riak_core_ring:get_meta({bucket, Name}, Ring),
+ get_bucket_props(Name, Meta).
+
+get_bucket_props(Name, Meta) ->
+ case Meta of
undefined ->
[{name, Name}
|app_helper:get_env(riak_core, default_bucket_props)];
View
35 src/riak_core_capability.erl
@@ -81,8 +81,7 @@
get/1,
get/2,
all/0,
- update_ring/1,
- ring_changed/1]).
+ update_ring/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
@@ -98,6 +97,7 @@
-type registered() :: [{capability(), #capability{}}].
-record(state, {registered :: registered(),
+ last_ring_id,
supported :: [{node(), [{capability(), [mode()]}]}],
unknown :: [node()],
negotiated :: [{capability(), mode()}]
@@ -179,12 +179,6 @@ update_ring(Ring) ->
add_supported_to_ring(node(), Supported, Ring)
end.
-%% @doc Internal callback used by `riak_core_ring_handler' to notify the
-%% capability manager of a new ring
-%% @hidden
-ring_changed(Ring) ->
- gen_server:call(?MODULE, {ring_changed, Ring}, infinity).
-
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -210,23 +204,20 @@ handle_call({register, Capability, Info}, _From, State) ->
publish_supported(State4),
update_local_cache(State4),
save_registered(State4#state.registered),
- {reply, ok, State4};
-
-handle_call({ring_changed, Ring}, _From, State) ->
- State2 = update_supported(Ring, State),
- {reply, ok, State2}.
+ {reply, ok, State4}.
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(tick, State) ->
schedule_tick(),
- State2 =
+ State2 = maybe_update_supported(State),
+ State3 =
lists:foldl(fun(Node, StateAcc) ->
add_node(Node, [], StateAcc)
- end, State, State#state.unknown),
- State3 = renegotiate_capabilities(State2),
- {noreply, State3};
+ end, State2, State2#state.unknown),
+ State4 = renegotiate_capabilities(State3),
+ {noreply, State4};
handle_info(_Info, State) ->
{noreply, State}.
@@ -241,6 +232,16 @@ code_change(_OldVsn, State, _Extra) ->
%%% Internal functions
%%%===================================================================
+maybe_update_supported(State=#state{last_ring_id=LastID}) ->
+ case riak_core_ring_manager:get_ring_id() of
+ LastID ->
+ State;
+ RingID ->
+ {ok, Ring} = riak_core_ring_manager:get_my_ring(),
+ State2 = update_supported(Ring, State),
+ State2#state{last_ring_id=RingID}
+ end.
+
capability_info(Supported, Default, Legacy) ->
#capability{supported=Supported, default=Default, legacy=Legacy}.
View
19 src/riak_core_coverage_plan.erl
@@ -45,23 +45,14 @@
req_id(), atom()) ->
{error, term()} | coverage_plan().
create_plan(VNodeSelector, NVal, PVC, ReqId, Service) ->
- {ok, Ring} = riak_core_ring_manager:get_my_ring(),
- PartitionCount = riak_core_ring:num_partitions(Ring),
- %% Get the list of all nodes and the list of available
- %% nodes so we can have a list of unavailable nodes
- %% while creating a coverage plan.
- Nodes = riak_core_ring:all_members(Ring),
- %% Check which nodes are up for the specified service
- %% so we can determine which VNodes are ineligible
- %% to be part of the coverage plan.
- UpNodes = riak_core_node_watcher:nodes(Service),
+ {ok, CHBin} = riak_core_ring_manager:get_chash_bin(),
+ PartitionCount = chashbin:num_partitions(CHBin),
%% Create a coverage plan with the requested primary
%% preference list VNode coverage.
%% Get a list of the VNodes owned by any unavailble nodes
DownVNodes = [Index ||
- {Index, Node}
- <- riak_core_ring:all_owners(Ring),
- lists:member(Node, (Nodes -- UpNodes))],
+ {Index, _Node}
+ <- riak_core_apl:offline_owners(Service, CHBin)],
%% Calculate an offset based on the request id to offer
%% the possibility of different sets of VNodes being
%% used even when all nodes are available.
@@ -79,7 +70,7 @@ create_plan(VNodeSelector, NVal, PVC, ReqId, Service) ->
%% ring position and the increment of
%% ring index values.
VNodeIndex = (Position rem PartitionCount) * RingIndexInc,
- Node = riak_core_ring:index_owner(Ring, VNodeIndex),
+ Node = chashbin:index_owner(VNodeIndex, CHBin),
CoverageVNode = {VNodeIndex, Node},
case length(KeySpaces) < NVal of
true ->
View
20 src/riak_core_handoff_manager.erl
@@ -71,7 +71,7 @@ start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
init([]) ->
- {ok, #state{excl=ordsets:new(), handoffs=[]}}.
+ {ok, #state{excl=sets:new(), handoffs=[]}}.
add_outbound(Module,Idx,Node,VnodePid) ->
case application:get_env(riak_core, disable_outbound_handoff) of
@@ -146,7 +146,7 @@ get_exclusions(Module) ->
%%%===================================================================
handle_call({get_exclusions, Module}, _From, State=#state{excl=Excl}) ->
- Reply = [I || {M, I} <- ordsets:to_list(Excl), M =:= Module],
+ Reply = [I || {M, I} <- sets:to_list(Excl), M =:= Module],
{reply, {ok, Reply}, State};
handle_call({add_outbound,Mod,Idx,Node,Pid},_From,State=#state{handoffs=HS}) ->
case send_handoff(Mod,Idx,Node,Pid,HS) of
@@ -213,19 +213,15 @@ handle_call(get_concurrency, _From, State) ->
{reply, Concurrency, State}.
handle_cast({del_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
- Excl2 = ordsets:del_element({Mod, Idx}, Excl),
+ Excl2 = sets:del_element({Mod, Idx}, Excl),
{noreply, State#state{excl=Excl2}};
handle_cast({add_exclusion, {Mod, Idx}}, State=#state{excl=Excl}) ->
- {ok, Ring} = riak_core_ring_manager:get_raw_ring(),
- case riak_core_ring:my_indices(Ring) of
- [] ->
- %% Trigger a ring update to ensure the node shuts down
- riak_core_ring_events:ring_update(Ring);
- _ ->
- ok
- end,
- Excl2 = ordsets:add_element({Mod, Idx}, Excl),
+ %% Note: This function used to trigger a ring event after adding an
+ %% exclusion to ensure that an exiting node would eventually shutdown
+ %% after all vnodes had finished handoff. This behavior is now handled
+ %% by riak_core_vnode_manager:maybe_ensure_vnodes_started
+ Excl2 = sets:add_element({Mod, Idx}, Excl),
{noreply, State#state{excl=Excl2}};
handle_cast({status_update, ModSrcTgt, StatsUpdate}, State=#state{handoffs=HS}) ->
View
41 src/riak_core_ring.erl
@@ -97,9 +97,11 @@
next_owner/1,
next_owner/2,
next_owner/3,
+ completed_next_owners/2,
all_next_owners/1,
change_owners/2,
handoff_complete/3,
+ handoff_complete/2,
ring_ready/0,
ring_ready/1,
ring_ready_info/1,
@@ -354,7 +356,8 @@ get_buckets(State) ->
%% @doc Return the node that owns the given index.
-spec index_owner(State :: chstate(), Idx :: integer()) -> Node :: term().
index_owner(State, Idx) ->
- hd([Owner || {I, Owner} <- ?MODULE:all_owners(State), I =:= Idx]).
+ {Idx, Owner} = lists:keyfind(Idx, 1, all_owners(State)),
+ Owner.
%% @doc Return all partition indices owned by the node executing this function.
-spec my_indices(State :: chstate()) -> [integer()].
@@ -714,7 +717,11 @@ next_owner(State, Idx) ->
-spec next_owner(State :: chstate(), Idx :: integer(),
Mod :: module()) -> pending_change().
next_owner(State, Idx, Mod) ->
- case lists:keyfind(Idx, 1, State?CHSTATE.next) of
+ NInfo = lists:keyfind(Idx, 1, State?CHSTATE.next),
+ next_owner_status(NInfo, Mod).
+
+next_owner_status(NInfo, Mod) ->
+ case NInfo of
false ->
{undefined, undefined, undefined};
{_, Owner, NextOwner, _Transfers, complete} ->
@@ -732,6 +739,10 @@ next_owner(State, Idx, Mod) ->
next_owner({_, Owner, NextOwner, _Transfers, Status}) ->
{Owner, NextOwner, Status}.
+completed_next_owners(Mod, ?CHSTATE{next=Next}) ->
+ [{Idx, O, NO} || NInfo={Idx, _, _, _, _} <- Next,
+ {O, NO, complete} <- [next_owner_status(NInfo, Mod)]].
+
%% @doc Returns true if all cluster members have seen the current ring.
-spec ring_ready(State :: chstate()) -> boolean().
ring_ready(State0) ->
@@ -782,7 +793,10 @@ ring_ready_info(State0) ->
-spec handoff_complete(State :: chstate(), Idx :: integer(),
Mod :: module()) -> chstate().
handoff_complete(State, Idx, Mod) ->
- transfer_complete(State, Idx, Mod).
+ transfer_complete(State, [{Idx, Mod}]).
+
+handoff_complete(State, Complete) ->
+ transfer_complete(State, Complete).
ring_changed(Node, State) ->
check_tainted(State,
@@ -1190,11 +1204,21 @@ merge_status(_, _) ->
invalid.
%% @private
-transfer_complete(CState=?CHSTATE{next=Next, vclock=VClock}, Idx, Mod) ->
- {Idx, Owner, NextOwner, Transfers, Status} = lists:keyfind(Idx, 1, Next),
- Transfers2 = ordsets:add_element(Mod, Transfers),
+transfer_complete(CState=?CHSTATE{next=Next, vclock=VClock}, Complete) ->
VNodeMods =
ordsets:from_list([VMod || {_, VMod} <- riak_core:vnode_modules()]),
+ Complete2 = dict:from_list(Complete),
+ Next2 = [case dict:find(Idx, Complete2) of
+ {ok, Mod} ->
+ update_transfer(Transfer, Mod, VNodeMods);
+ error ->
+ Transfer
+ end || Transfer={Idx, _, _, _, _} <- Next],
+ VClock2 = vclock:increment(node(), VClock),
+ CState?CHSTATE{next=Next2, vclock=VClock2}.
+
+update_transfer({Idx, Owner, NextOwner, Transfers, Status}, Mod, VNodeMods) ->
+ Transfers2 = ordsets:add_element(Mod, Transfers),
Status2 = case {Status, Transfers2} of
{complete, _} ->
complete;
@@ -1203,10 +1227,7 @@ transfer_complete(CState=?CHSTATE{next=Next, vclock=VClock}, Idx, Mod) ->
_ ->
awaiting
end,
- Next2 = lists:keyreplace(Idx, 1, Next,
- {Idx, Owner, NextOwner, Transfers2, Status2}),
- VClock2 = vclock:increment(Owner, VClock),
- CState?CHSTATE{next=Next2, vclock=VClock2}.
+ {Idx, Owner, NextOwner, Transfers2, Status2}.
%% @private
get_members(Members) ->
View
7 src/riak_core_ring_handler.erl
@@ -20,6 +20,7 @@
%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2,
handle_info/2, terminate/2, code_change/3]).
+-export([ensure_vnodes_started/1]).
-record(state, {}).
@@ -33,11 +34,7 @@ init([]) ->
ensure_vnodes_started(Ring),
{ok, #state{}}.
-handle_event({ring_update, Ring}, State) ->
- %% Make sure all vnodes are started...
- ensure_vnodes_started(Ring),
- riak_core_vnode_manager:ring_changed(Ring),
- riak_core_capability:ring_changed(Ring),
+handle_event({ring_update, _Ring}, State) ->
{ok, State}.
handle_call(_Event, State) ->
View
219 src/riak_core_ring_manager.erl
@@ -24,12 +24,16 @@
-module(riak_core_ring_manager).
-define(RING_KEY, riak_ring).
--behaviour(riak_core_gen_server).
+-behaviour(gen_server).
-export([start_link/0,
start_link/1,
get_my_ring/0,
get_raw_ring/0,
+ get_raw_ring_chashbin/0,
+ get_chash_bin/0,
+ get_ring_id/0,
+ get_bucket_meta/1,
refresh_my_ring/0,
refresh_ring/2,
set_my_ring/1,
@@ -49,7 +53,9 @@
-record(state, {
mode,
- raw_ring
+ raw_ring,
+ ring_changed_time,
+ inactivity_timer
}).
-export([set_ring_global/1]). %% For EUnit testing purposes only
@@ -57,50 +63,86 @@
-include_lib("eunit/include/eunit.hrl").
-endif.
+-define(ETS, ets_riak_core_ring_manager).
+
+-define(PROMOTE_TIMEOUT, 90000).
+
+-define(WIDTH, 16). %% Keep this a power of two
+
%% ===================================================================
%% Public API
%% ===================================================================
start_link() ->
- riak_core_gen_server:start_link({local, ?MODULE}, ?MODULE, [live], []).
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [live], []).
%% Testing entry point
start_link(test) ->
- riak_core_gen_server:start_link({local, ?MODULE}, ?MODULE, [test], []).
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [test], []).
%% @spec get_my_ring() -> {ok, riak_core_ring:riak_core_ring()} | {error, Reason}
get_my_ring() ->
- case mochiglobal:get(?RING_KEY) of
+ Ring = case mochiglobal:get(?RING_KEY) of
+ ets ->
+ %% X = erlang:system_info(scheduler_id),
+ %% Rnd = X band (?WIDTH-1),
+ %% case ets:lookup(?ETS, {ring,Rnd}) of
+ case ets:lookup(?ETS, ring) of
+ [{_, RingETS}] ->
+ RingETS;
+ _ ->
+ undefined
+ end;
+ RingMochi ->
+ RingMochi
+ end,
+ case Ring of
Ring when is_tuple(Ring) -> {ok, Ring};
undefined -> {error, no_ring}
end.
get_raw_ring() ->
- riak_core_gen_server:call(?MODULE, get_raw_ring, infinity).
+ try
+ Ring = ets:lookup_element(?ETS, raw_ring, 2),
+ {ok, Ring}
+ catch
+ _:_ ->
+ gen_server:call(?MODULE, get_raw_ring, infinity)
+ end.
+
+get_raw_ring_chashbin() ->
+ try
+ Ring = ets:lookup_element(?ETS, raw_ring, 2),
+ {ok, CHBin} = get_chash_bin(),
+ {ok, Ring, CHBin}
+ catch
+ _:_ ->
+ gen_server:call(?MODULE, get_raw_ring_chashbin, infinity)
+ end.
%% @spec refresh_my_ring() -> ok
refresh_my_ring() ->
- riak_core_gen_server:call(?MODULE, refresh_my_ring, infinity).
+ gen_server:call(?MODULE, refresh_my_ring, infinity).
refresh_ring(Node, ClusterName) ->
- riak_core_gen_server:cast({?MODULE, Node}, {refresh_my_ring, ClusterName}).
+ gen_server:cast({?MODULE, Node}, {refresh_my_ring, ClusterName}).
%% @spec set_my_ring(riak_core_ring:riak_core_ring()) -> ok
set_my_ring(Ring) ->
- riak_core_gen_server:call(?MODULE, {set_my_ring, Ring}, infinity).
+ gen_server:call(?MODULE, {set_my_ring, Ring}, infinity).
%% @spec write_ringfile() -> ok
write_ringfile() ->
- riak_core_gen_server:cast(?MODULE, write_ringfile).
+ gen_server:cast(?MODULE, write_ringfile).
ring_trans(Fun, Args) ->
- riak_core_gen_server:call(?MODULE, {ring_trans, Fun, Args}, infinity).
+ gen_server:call(?MODULE, {ring_trans, Fun, Args}, infinity).
set_cluster_name(Name) ->
- riak_core_gen_server:call(?MODULE, {set_cluster_name, Name}, infinity).
+ gen_server:call(?MODULE, {set_cluster_name, Name}, infinity).
%% @doc Exposed for support/debug purposes. Forces the node to change its
%% ring in a manner that will trigger reconciliation on gossip.
@@ -207,7 +249,7 @@ prune_ringfiles() ->
%% @private (only used for test instances)
stop() ->
- riak_core_gen_server:cast(?MODULE, stop).
+ gen_server:cast(?MODULE, stop).
%% ===================================================================
@@ -215,10 +257,24 @@ stop() ->
%% ===================================================================
init([Mode]) ->
+ ets:new(?ETS, [named_table, protected, {write_concurrency, false}, {read_concurrency, true}]),
+ %% ets:new(?ETS, [named_table, protected]),
+ Id = reset_ring_id(),
+ ets:insert(?ETS, [{changes, 0}, {promoted, 0}, {id, Id}]),
Ring = reload_ring(Mode),
- set_ring_global(Ring),
+ State = set_ring(Ring, #state{mode = Mode}),
riak_core_ring_events:ring_update(Ring),
- {ok, #state{mode = Mode, raw_ring=Ring}}.
+ {ok, State}.
+
+reset_ring_id() ->
+ Epoch = case mochiglobal:get(riak_ring_id_epoch) of
+ undefined ->
+ 0;
+ Value ->
+ Value
+ end,
+ mochiglobal:put(riak_ring_id_epoch, Epoch + 1),
+ {Epoch + 1, 0}.
reload_ring(test) ->
riak_core_ring:fresh(16,node());
@@ -251,14 +307,17 @@ reload_ring(live) ->
handle_call(get_raw_ring, _From, #state{raw_ring=Ring} = State) ->
{reply, {ok, Ring}, State};
+handle_call(get_raw_ring_chashbin, _From, #state{raw_ring=Ring} = State) ->
+ {ok, CHBin} = get_chash_bin(),
+ {reply, {ok, Ring, CHBin}, State};
handle_call({set_my_ring, RingIn}, _From, State) ->
Ring = riak_core_ring:upgrade(RingIn),
- prune_write_notify_ring(Ring),
- {reply,ok,State#state{raw_ring=Ring}};
+ State2 = prune_write_notify_ring(Ring, State),
+ {reply,ok,State2};
handle_call(refresh_my_ring, _From, State) ->
%% This node is leaving the cluster so create a fresh ring file
FreshRing = riak_core_ring:fresh(),
- set_ring_global(FreshRing),
+ State2 = set_ring(FreshRing, State),
%% Make sure the fresh ring gets written before stopping
do_write_ringfile(FreshRing),
@@ -266,20 +325,20 @@ handle_call(refresh_my_ring, _From, State) ->
%% so we can safely stop now.
riak_core:stop("node removal completed, exiting."),
- {reply,ok,State#state{raw_ring=FreshRing}};
+ {reply,ok,State2};
handle_call({ring_trans, Fun, Args}, _From, State=#state{raw_ring=Ring}) ->
case catch Fun(Ring, Args) of
{new_ring, NewRing} ->
- prune_write_notify_ring(NewRing),
+ State2 = prune_write_notify_ring(NewRing, State),
riak_core_gossip:random_recursive_gossip(NewRing),
- {reply, {ok, NewRing}, State#state{raw_ring=NewRing}};
+ {reply, {ok, NewRing}, State2};
{set_only, NewRing} ->
- prune_write_ring(NewRing),
- {reply, {ok, NewRing}, State#state{raw_ring=NewRing}};
+ State2 = prune_write_ring(NewRing, State),
+ {reply, {ok, NewRing}, State2};
{reconciled_ring, NewRing} ->
- prune_write_notify_ring(NewRing),
+ State2 = prune_write_notify_ring(NewRing, State),
riak_core_gossip:recursive_gossip(NewRing),
- {reply, {ok, NewRing}, State#state{raw_ring=NewRing}};
+ {reply, {ok, NewRing}, State2};
ignore ->
{reply, not_changed, State};
{ignore, Reason} ->
@@ -291,8 +350,8 @@ handle_call({ring_trans, Fun, Args}, _From, State=#state{raw_ring=Ring}) ->
end;
handle_call({set_cluster_name, Name}, _From, State=#state{raw_ring=Ring}) ->
NewRing = riak_core_ring:set_cluster_name(Ring, Name),
- prune_write_notify_ring(NewRing),
- {reply, ok, State#state{raw_ring=NewRing}}.
+ State2 = prune_write_notify_ring(NewRing, State),
+ {reply, ok, State2}.
handle_cast(stop, State) ->
{stop,normal,State};
@@ -316,7 +375,27 @@ handle_cast(write_ringfile, State=#state{raw_ring=Ring}) ->
do_write_ringfile(Ring),
{noreply,State}.
-
+handle_info(increment_ring_id, State) ->
+ {Epoch, Id} = get_ring_id(),
+ ets:insert(?ETS, {id, {Epoch, Id+1}}),
+ {noreply, State};
+
+handle_info(inactivity_timeout, State=#state{ring_changed_time=Then}) ->
+ DeltaUS = erlang:max(0, timer:now_diff(os:timestamp(), Then)),
+ DeltaMS = DeltaUS div 1000,
+ %% io:format("Ring inactivity: ~p~n", [DeltaMS]),
+ case DeltaMS >= ?PROMOTE_TIMEOUT of
+ true ->
+ io:format("Promoting ring after ~p~n", [DeltaMS]),
+ {ok, Ring} = get_my_ring(),
+ mochiglobal:put(?RING_KEY, Ring),
+ {noreply, State};
+ false ->
+ Remaining = ?PROMOTE_TIMEOUT - DeltaMS,
+ %% io:format("Waiting another ~p~n", [Remaining]),
+ State2 = set_timer(Remaining, State),
+ {noreply, State2}
+ end;
handle_info(_Info, State) ->
{noreply, State}.
@@ -366,6 +445,27 @@ run_fixups([{App, Fixup}|T], BucketName, BucketProps) ->
end,
run_fixups(T, BucketName, BP).
+set_ring(Ring, State) ->
+ set_ring_global(Ring),
+ Now = os:timestamp(),
+ State2 = State#state{raw_ring=Ring, ring_changed_time=Now},
+ State3 = maybe_set_timer(?PROMOTE_TIMEOUT, State2),
+ State3.
+
+maybe_set_timer(Duration, State=#state{inactivity_timer=undefined}) ->
+ set_timer(Duration, State);
+maybe_set_timer(Duration, State=#state{inactivity_timer=Timer}) ->
+ case erlang:read_timer(Timer) of
+ false ->
+ set_timer(Duration, State);
+ _ ->
+ State
+ end.
+
+set_timer(Duration, State) ->
+ %% State.
+ Timer = erlang:send_after(Duration, self(), inactivity_timeout),
+ State#state{inactivity_timer=Timer}.
%% Set the ring in mochiglobal. Exported during unit testing
%% to make test setup simpler - no need to spin up a riak_core_ring_manager
@@ -404,18 +504,67 @@ set_ring_global(Ring) ->
%% relied upon for any non-local ring operations.
TaintedRing = riak_core_ring:set_tainted(FixedRing),
%% store the modified ring in mochiglobal
- mochiglobal:put(?RING_KEY, TaintedRing).
+ OldBuckets = ets:select(?ETS, [{{{bucket, '$1'}, '_'}, [], ['$1']}]),
+ BucketDefaults = [{{bucket, Bucket}, undefined} || Bucket <- OldBuckets],
+ BucketMeta =
+ [{{bucket, Bucket}, Meta}
+ || Bucket <- riak_core_ring:get_buckets(TaintedRing),
+ {ok,Meta} <- [riak_core_ring:get_meta({bucket, Bucket}, TaintedRing)]],
+ BucketMeta2 = lists:ukeysort(1, BucketMeta ++ BucketDefaults),
+ CHBin = chashbin:create(riak_core_ring:chash(TaintedRing)),
+ {Epoch, Id} = ets:lookup_element(?ETS, id, 2),
+ Actions = [{ring, TaintedRing},
+ {raw_ring, Ring},
+ {id, {Epoch,Id+1}},
+ {chashbin, CHBin} | BucketMeta2],
+ ets:insert(?ETS, Actions),
+ ets:match_delete(?ETS, {{bucket, '_'}, undefined}),
+ case mochiglobal:get(?RING_KEY) of
+ ets ->
+ ok;
+ _ ->
+ mochiglobal:put(?RING_KEY, ets)
+ end,
+ ok.
+
+get_ring_id() ->
+ case ets:lookup(?ETS, id) of
+ [{_, Id}] ->
+ Id;
+ _ ->
+ {0,0}
+ end.
+
+get_bucket_meta(Bucket) ->
+ case ets:lookup(?ETS, {bucket, Bucket}) of
+ [] ->
+ undefined;
+ [{_, undefined}] ->
+ undefined;
+ [{_, Meta}] ->
+ {ok, Meta}
+ end.
+
+get_chash_bin() ->
+ case ets:lookup(?ETS, chashbin) of
+ [{chashbin, CHBin}] ->
+ {ok, CHBin};
+ _ ->
+ {error, no_ring}
+ end.
%% Persist a new ring file, set the global value and notify any listeners
-prune_write_notify_ring(Ring) ->
- prune_write_ring(Ring),
- riak_core_ring_events:ring_update(Ring).
+