diff --git a/src/janitor_agent.erl b/src/janitor_agent.erl index ee36d456e..7cd0327cb 100644 --- a/src/janitor_agent.erl +++ b/src/janitor_agent.erl @@ -551,8 +551,7 @@ handle_call({apply_new_config, Caller, NewBucketConfig, IgnoredVBuckets}, _From, #state{bucket_name = BucketName, rebalance_pid = Rebalancer} = State) -> %% ?log_debug("handling apply_new_config:~n~p", [NewBucketConfig]), - {ok, CurrentVBucketsList} = ns_memcached:list_vbuckets(BucketName), - CurrentVBuckets = dict:from_list(CurrentVBucketsList), + {ok, VBDetails} = get_state_topology(BucketName), Map = proplists:get_value(map, NewBucketConfig), true = (Map =/= undefined), %% TODO: unignore ignored vbuckets @@ -569,20 +568,33 @@ handle_call({apply_new_config, Caller, NewBucketConfig, IgnoredVBuckets}, _From, [] -> missing end, - ActualState = case dict:find(VBucket, CurrentVBuckets) of - {ok, S} -> S; - _ -> missing - end, + {ActualState, ActualTopology} = + case dict:find(VBucket, VBDetails) of + {ok, Val} -> + StateVal = proplists:get_value("state", Val), + %% Always expect "state" to be present. + false = StateVal =:= undefined, + {StateVal, proplists:get_value("topology", Val)}; + _ -> + {missing, undefined} + end, NewWanted = [WantedState | PrevWanted], - case WantedState =:= ActualState of + case WantedState =:= ActualState andalso + is_topology_same(WantedState, Chain, ActualTopology) of true -> {VBucket + 1, ToSet, ToDelete, NewWanted}; false -> case WantedState of missing -> {VBucket + 1, ToSet, [VBucket | ToDelete], NewWanted}; + active -> + {VBucket + 1, + [{VBucket, WantedState, [Chain]} | ToSet], + ToDelete, NewWanted}; _ -> - {VBucket + 1, [{VBucket, WantedState} | ToSet], ToDelete, NewWanted} + {VBucket + 1, + [{VBucket, WantedState, undefined} | ToSet], + ToDelete, NewWanted} end end end, {0, [], [], []}, Map), @@ -608,9 +620,17 @@ handle_call({apply_new_config, Caller, NewBucketConfig, IgnoredVBuckets}, _From, || {Src, Pairs} <- misc:keygroup(1, lists:sort(WantedReplicas))], ok = replication_manager:remove_undesired_replications(BucketName, WantedReplications), + SetTopology = cluster_compat_mode:is_cluster_madhatter(), %% then we're ok to change vbucket states - [ns_memcached:set_vbucket(BucketName, VBucket, StateToSet) - || {VBucket, StateToSet} <- ToSet], + [begin + case SetTopology of + true -> + ns_memcached:set_vbucket(BucketName, VBucket, StateToSet, + Topology); + false -> + ns_memcached:set_vbucket(BucketName, VBucket, StateToSet) + end + end || {VBucket, StateToSet, Topology} <- ToSet], %% and ok to delete vbuckets we want to delete [ns_memcached:delete_vbucket(BucketName, VBucket) || VBucket <- ToDelete], @@ -977,3 +997,46 @@ handle_apply_vbucket_state({delete_vbucket, VBucket}, #state{bucket_name = BucketName} = AgentState) -> pass_vbucket_states_to_set_view_manager(AgentState), ok = ns_memcached:delete_vbucket(BucketName, VBucket). + +decode_topology("null") -> + undefined; +decode_topology(Topology) -> + [lists:map(fun (null) -> + undefined; + (Node) -> + binary_to_existing_atom(Node, latin1) + end, Chain) || Chain <- ejson:decode(Topology)]. + +is_topology_same(active, Chain, MemcachedTopology) -> + cluster_compat_mode:is_cluster_madhatter() =:= false orelse + [Chain] =:= MemcachedTopology; +is_topology_same(_, _, _) -> + true. + +get_state_topology(BucketName) -> + DecodeTable = [{"state", fun erlang:list_to_existing_atom/1}, + {"topology", fun decode_topology/1}], + get_decoded_vbucket_details(BucketName, DecodeTable). + +get_decoded_vbucket_details(BucketName, DecodeTable) -> + Keys = [Key || {Key, _DecodeFun} <- DecodeTable], + case ns_memcached:get_vbucket_details_stats(BucketName, Keys) of + {ok, VBDetails} -> + {ok, decode_vbucket_details(VBDetails, DecodeTable)}; + Error -> + Error + end. + +decode_vbucket_details(VBDetails, DecodeTable) -> + dict:map( + fun (_VB, List) -> + lists:map( + fun ({Key, Value}) -> + case lists:keyfind(Key, 1, DecodeTable) of + {Key, DecodeFun} -> + {Key, DecodeFun(Value)}; + _ -> + {Key, Value} + end + end, List) + end, VBDetails).