Skip to content

Commit

Permalink
MB-32880: Apply config with topology on every janitor run.
Browse files Browse the repository at this point in the history
Make sure we apply the topology information along with the state
information through the set_vbucket command to memcached during
janitor runs to keep information in ns_server and memcached in sync.

No attempt is made to,
1. Remove undefined from the map during failover.
2. Handle a case where increase in replica count adds an undefined in
the map.

Expectations from KV: KV engine should understand the undefined set in
the topology and ignore them.

Change-Id: I35eca77d036da4da0e8c0de0aded28a52da6be8c
Reviewed-on: http://review.couchbase.org/104233
Reviewed-by: Aliaksey Artamonau <aliaksey.artamonau@couchbase.com>
Tested-by: Aliaksey Artamonau <aliaksey.artamonau@couchbase.com>
  • Loading branch information
anuthan authored and Aliaksey Artamonau committed Apr 5, 2019
1 parent 9227d66 commit bed1e6e
Showing 1 changed file with 73 additions and 10 deletions.
83 changes: 73 additions & 10 deletions src/janitor_agent.erl
Expand Up @@ -551,8 +551,7 @@ handle_call({apply_new_config, Caller, NewBucketConfig, IgnoredVBuckets}, _From,
#state{bucket_name = BucketName,
rebalance_pid = Rebalancer} = State) ->
%% ?log_debug("handling apply_new_config:~n~p", [NewBucketConfig]),
{ok, CurrentVBucketsList} = ns_memcached:list_vbuckets(BucketName),
CurrentVBuckets = dict:from_list(CurrentVBucketsList),
{ok, VBDetails} = get_state_topology(BucketName),
Map = proplists:get_value(map, NewBucketConfig),
true = (Map =/= undefined),
%% TODO: unignore ignored vbuckets
Expand All @@ -569,20 +568,33 @@ handle_call({apply_new_config, Caller, NewBucketConfig, IgnoredVBuckets}, _From,
[] ->
missing
end,
ActualState = case dict:find(VBucket, CurrentVBuckets) of
{ok, S} -> S;
_ -> missing
end,
{ActualState, ActualTopology} =
case dict:find(VBucket, VBDetails) of
{ok, Val} ->
StateVal = proplists:get_value("state", Val),
%% Always expect "state" to be present.
false = StateVal =:= undefined,
{StateVal, proplists:get_value("topology", Val)};
_ ->
{missing, undefined}
end,
NewWanted = [WantedState | PrevWanted],
case WantedState =:= ActualState of
case WantedState =:= ActualState andalso
is_topology_same(WantedState, Chain, ActualTopology) of
true ->
{VBucket + 1, ToSet, ToDelete, NewWanted};
false ->
case WantedState of
missing ->
{VBucket + 1, ToSet, [VBucket | ToDelete], NewWanted};
active ->
{VBucket + 1,
[{VBucket, WantedState, [Chain]} | ToSet],
ToDelete, NewWanted};
_ ->
{VBucket + 1, [{VBucket, WantedState} | ToSet], ToDelete, NewWanted}
{VBucket + 1,
[{VBucket, WantedState, undefined} | ToSet],
ToDelete, NewWanted}
end
end
end, {0, [], [], []}, Map),
Expand All @@ -608,9 +620,17 @@ handle_call({apply_new_config, Caller, NewBucketConfig, IgnoredVBuckets}, _From,
|| {Src, Pairs} <- misc:keygroup(1, lists:sort(WantedReplicas))],
ok = replication_manager:remove_undesired_replications(BucketName, WantedReplications),

SetTopology = cluster_compat_mode:is_cluster_madhatter(),
%% then we're ok to change vbucket states
[ns_memcached:set_vbucket(BucketName, VBucket, StateToSet)
|| {VBucket, StateToSet} <- ToSet],
[begin
case SetTopology of
true ->
ns_memcached:set_vbucket(BucketName, VBucket, StateToSet,
Topology);
false ->
ns_memcached:set_vbucket(BucketName, VBucket, StateToSet)
end
end || {VBucket, StateToSet, Topology} <- ToSet],

%% and ok to delete vbuckets we want to delete
[ns_memcached:delete_vbucket(BucketName, VBucket) || VBucket <- ToDelete],
Expand Down Expand Up @@ -977,3 +997,46 @@ handle_apply_vbucket_state({delete_vbucket, VBucket},
#state{bucket_name = BucketName} = AgentState) ->
pass_vbucket_states_to_set_view_manager(AgentState),
ok = ns_memcached:delete_vbucket(BucketName, VBucket).

decode_topology("null") ->
undefined;
decode_topology(Topology) ->
[lists:map(fun (null) ->
undefined;
(Node) ->
binary_to_existing_atom(Node, latin1)
end, Chain) || Chain <- ejson:decode(Topology)].

is_topology_same(active, Chain, MemcachedTopology) ->
cluster_compat_mode:is_cluster_madhatter() =:= false orelse
[Chain] =:= MemcachedTopology;
is_topology_same(_, _, _) ->
true.

get_state_topology(BucketName) ->
DecodeTable = [{"state", fun erlang:list_to_existing_atom/1},
{"topology", fun decode_topology/1}],
get_decoded_vbucket_details(BucketName, DecodeTable).

get_decoded_vbucket_details(BucketName, DecodeTable) ->
Keys = [Key || {Key, _DecodeFun} <- DecodeTable],
case ns_memcached:get_vbucket_details_stats(BucketName, Keys) of
{ok, VBDetails} ->
{ok, decode_vbucket_details(VBDetails, DecodeTable)};
Error ->
Error
end.

decode_vbucket_details(VBDetails, DecodeTable) ->
dict:map(
fun (_VB, List) ->
lists:map(
fun ({Key, Value}) ->
case lists:keyfind(Key, 1, DecodeTable) of
{Key, DecodeFun} ->
{Key, DecodeFun(Value)};
_ ->
{Key, Value}
end
end, List)
end, VBDetails).

0 comments on commit bed1e6e

Please sign in to comment.