Skip to content

Commit

Permalink
Merge pull request #1008 from basho/bugfix/ensemble-creation-on-bucke…
Browse files Browse the repository at this point in the history
…t-type-activate

Use SC bucket types and buckets to know ensembles

Reviewed-by: jtuple

Backport merge of 3489f5b
  • Loading branch information
jtuple committed Sep 15, 2014
2 parents cb89f05 + 59883e4 commit 6f5c879
Showing 1 changed file with 34 additions and 26 deletions.
60 changes: 34 additions & 26 deletions src/riak_kv_ensembles.erl
Expand Up @@ -46,7 +46,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(state, {last_ring_id :: term()}).
-record(state, {}).

%%%===================================================================
%%% API
Expand All @@ -55,7 +55,6 @@
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).


local_ensembles() ->
Node = node(),
lists:foldl(fun(Ensemble, Acc) ->
Expand All @@ -67,7 +66,7 @@ local_ensembles() ->
[Ensemble | Acc]
end
end, [], ensembles()).

ensembles() ->
{ok, Ring} = riak_core_ring_manager:get_my_ring(),
required_ensembles(Ring).
Expand Down Expand Up @@ -99,7 +98,7 @@ check_membership(Ensemble, CHBin) ->

init([]) ->
schedule_tick(),
{ok, #state{last_ring_id = undefined}}.
{ok, #state{}}.

handle_call(_Request, _From, State) ->
{reply, ok, State}.
Expand All @@ -108,13 +107,9 @@ handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(tick, State) ->
State2 = tick(State),
_ = tick(State),
schedule_tick(),
{noreply, State2};

handle_info(reset_ring_id, State) ->
State2 = State#state{last_ring_id=undefined},
{noreply, State2};
{noreply, State};

handle_info(_Info, State) ->
{noreply, State}.
Expand All @@ -132,17 +127,9 @@ code_change(_OldVsn, State, _Extra) ->
schedule_tick() ->
erlang:send_after(10000, self(), tick).

reset_ring_id() ->
self() ! reset_ring_id.

tick(State=#state{last_ring_id=LastID}) ->
case riak_core_ring_manager:get_ring_id() of
LastID ->
State;
RingID ->
maybe_bootstrap_ensembles(),
State#state{last_ring_id=RingID}
end.
tick(State) ->
maybe_bootstrap_ensembles(),
State.

maybe_bootstrap_ensembles() ->
case riak_ensemble_manager:enabled() of
Expand Down Expand Up @@ -172,22 +159,43 @@ bootstrap_preflists(Ring, CHBin) ->
end,
Known = orddict:fetch_keys(Ensembles),
Need = Required -- Known,
L = [begin
_ = [begin
Peers = required_members(Ensemble, CHBin),
riak_ensemble_manager:create_ensemble(Ensemble, undefined, Peers,
riak_kv_ensemble_backend, [])
end || Ensemble <- Need],
Failed = [Result || Result <- L,
Result =/= ok],
(Failed =:= []) orelse reset_ring_id(),
ok.

required_ensembles(Ring) ->
AllN = riak_core_bucket:all_n(Ring),
AllN0 = riak_core_bucket:all_n(Ring),
BucketTypeNs = bucket_type_all_n(Ring),
AllN = sets:to_list(sets:union(sets:from_list(AllN0), BucketTypeNs)),
Owners = riak_core_ring:all_owners(Ring),
[{kv, Idx, N} || {Idx, _} <- Owners,
N <- AllN].

bucket_type_all_n(Ring) ->
Itr = riak_core_bucket_type:iterator(),
bucket_type_all_n(Ring, Itr, sets:new()).

bucket_type_all_n(Ring, Itr, NValSet) ->
case riak_core_bucket_type:itr_done(Itr) of
true ->
riak_core_bucket_type:itr_close(Itr),
NValSet;
false ->
{_BT, Props} = riak_core_bucket_type:itr_value(Itr),
Itr2 = riak_core_bucket_type:itr_next(Itr),
case lists:member({active, true}, Props) andalso
lists:keyfind(n_val, 1, Props) of
{n_val, NVal} ->
NewSet = sets:add_element(NVal, NValSet),
bucket_type_all_n(Ring, Itr2, NewSet);
false ->
bucket_type_all_n(Ring, Itr2, NValSet)
end
end.

required_members({kv, Idx, N}, CHBin) ->
{PL, _} = chashbin:itr_pop(N, chashbin:exact_iterator(Idx, CHBin)),
%% TODO: Make ensembles/peers use ensemble/peer as actual peer name so this is unneeded
Expand Down

0 comments on commit 6f5c879

Please sign in to comment.