diff --git a/src/riak_kv_ensembles.erl b/src/riak_kv_ensembles.erl index 7a4e5ac2c2..de76d454cf 100644 --- a/src/riak_kv_ensembles.erl +++ b/src/riak_kv_ensembles.erl @@ -46,7 +46,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(state, {last_ring_id :: term()}). +-record(state, {}). %%%=================================================================== %%% API @@ -55,7 +55,6 @@ start_link() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - local_ensembles() -> Node = node(), lists:foldl(fun(Ensemble, Acc) -> @@ -67,7 +66,7 @@ local_ensembles() -> [Ensemble | Acc] end end, [], ensembles()). - + ensembles() -> {ok, Ring} = riak_core_ring_manager:get_my_ring(), required_ensembles(Ring). @@ -99,7 +98,7 @@ check_membership(Ensemble, CHBin) -> init([]) -> schedule_tick(), - {ok, #state{last_ring_id = undefined}}. + {ok, #state{}}. handle_call(_Request, _From, State) -> {reply, ok, State}. @@ -108,13 +107,9 @@ handle_cast(_Msg, State) -> {noreply, State}. handle_info(tick, State) -> - State2 = tick(State), + _ = tick(State), schedule_tick(), - {noreply, State2}; - -handle_info(reset_ring_id, State) -> - State2 = State#state{last_ring_id=undefined}, - {noreply, State2}; + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -132,17 +127,9 @@ code_change(_OldVsn, State, _Extra) -> schedule_tick() -> erlang:send_after(10000, self(), tick). -reset_ring_id() -> - self() ! reset_ring_id. - -tick(State=#state{last_ring_id=LastID}) -> - case riak_core_ring_manager:get_ring_id() of - LastID -> - State; - RingID -> - maybe_bootstrap_ensembles(), - State#state{last_ring_id=RingID} - end. +tick(State) -> + maybe_bootstrap_ensembles(), + State. maybe_bootstrap_ensembles() -> case riak_ensemble_manager:enabled() of @@ -172,22 +159,43 @@ bootstrap_preflists(Ring, CHBin) -> end, Known = orddict:fetch_keys(Ensembles), Need = Required -- Known, - L = [begin + _ = [begin Peers = required_members(Ensemble, CHBin), riak_ensemble_manager:create_ensemble(Ensemble, undefined, Peers, riak_kv_ensemble_backend, []) end || Ensemble <- Need], - Failed = [Result || Result <- L, - Result =/= ok], - (Failed =:= []) orelse reset_ring_id(), ok. required_ensembles(Ring) -> - AllN = riak_core_bucket:all_n(Ring), + AllN0 = riak_core_bucket:all_n(Ring), + BucketTypeNs = bucket_type_all_n(Ring), + AllN = sets:to_list(sets:union(sets:from_list(AllN0), BucketTypeNs)), Owners = riak_core_ring:all_owners(Ring), [{kv, Idx, N} || {Idx, _} <- Owners, N <- AllN]. +bucket_type_all_n(Ring) -> + Itr = riak_core_bucket_type:iterator(), + bucket_type_all_n(Ring, Itr, sets:new()). + +bucket_type_all_n(Ring, Itr, NValSet) -> + case riak_core_bucket_type:itr_done(Itr) of + true -> + riak_core_bucket_type:itr_close(Itr), + NValSet; + false -> + {_BT, Props} = riak_core_bucket_type:itr_value(Itr), + Itr2 = riak_core_bucket_type:itr_next(Itr), + case lists:member({active, true}, Props) andalso + lists:keyfind(n_val, 1, Props) of + {n_val, NVal} -> + NewSet = sets:add_element(NVal, NValSet), + bucket_type_all_n(Ring, Itr2, NewSet); + false -> + bucket_type_all_n(Ring, Itr2, NValSet) + end + end. + required_members({kv, Idx, N}, CHBin) -> {PL, _} = chashbin:itr_pop(N, chashbin:exact_iterator(Idx, CHBin)), %% TODO: Make ensembles/peers use ensemble/peer as actual peer name so this is unneeded