Skip to content
Browse files

MB-6216: validate all replicators connections each janitor run

So that rebalance cannot fail due to failure to change vbucket on
stale connection.

Change-Id: I49718edf22bc02eee42abbe8b3fadae4493782cc
Reviewed-on: http://review.couchbase.org/19848
Tested-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
Reviewed-by: Aliaksey Kandratsenka <alkondratenko@gmail.com>
  • Loading branch information...
1 parent 1d03653 commit 2dbb22478550ec4ce6a0ce9669f014159e0063ad Aliaksey Kandratsenka committed with Peter Wansch Aug 19, 2012
Showing with 28 additions and 2 deletions.
  1. +10 −1 src/ebucketmigrator_srv.erl
  2. +1 −0 src/janitor_agent.erl
  3. +17 −1 src/ns_vbm_new_sup.erl
View
11 src/ebucketmigrator_srv.erl
@@ -70,7 +70,8 @@
start_vbucket_filter_change/2,
start_old_vbucket_filter_change/1,
set_controlling_process/2,
- had_backfill/2]).
+ had_backfill/2,
+ ping_connections/2]).
-include("mc_constants.hrl").
-include("mc_entry.hrl").
@@ -235,6 +236,11 @@ process_last_messages(State) ->
end
end).
+handle_call(ping_connections, _From, #state{upstream_aux = UpstreamAux,
+ downstream_aux = DownstreamAux} = State) ->
+ _ = mc_client_binary:get_vbucket(UpstreamAux, 0),
+ _ = mc_client_binary:get_vbucket(DownstreamAux, 0),
+ {reply, ok, State};
handle_call(start_old_vbucket_filter_change, {Pid, _} = _From,
#state{vb_filter_change_state=VBFilterChangeState} = State)
when VBFilterChangeState =/= not_started ->
@@ -722,6 +728,9 @@ start_vbucket_filter_change(Pid, Args) ->
start_old_vbucket_filter_change(Pid) ->
gen_server:call(Pid, start_old_vbucket_filter_change, 30000).
+ping_connections(Pid, Timeout) ->
+ gen_server:call(Pid, ping_connections, Timeout).
+
-spec set_controlling_process(#state{}, pid()) -> ok.
set_controlling_process(#state{upstream=Upstream,
upstream_aux=UpstreamAux,
View
1 src/janitor_agent.erl
@@ -568,6 +568,7 @@ handle_call({apply_new_config, NewBucketConfig, IgnoredVBuckets}, _From, #state{
Dst =:= node()],
WantedReplications = [{Src, [VB || {_, VB} <- Pairs]}
|| {Src, Pairs} <- misc:keygroup(1, lists:sort(WantedReplicas))],
+ ns_vbm_new_sup:ping_all_replicators(BucketName),
ok = replication_changes:set_incoming_replication_map(BucketName, WantedReplications),
[case dict:find(VBucket, CurrentVBuckets) of
{ok, dead} ->
View
18 src/ns_vbm_new_sup.erl
@@ -30,7 +30,8 @@
%% Callbacks
-export([server_name/1, supervisor_node/2,
- make_replicator/3, replicator_nodes/2, replicator_vbuckets/1]).
+ make_replicator/3, replicator_nodes/2, replicator_vbuckets/1,
+ ping_all_replicators/1]).
-export([local_change_vbucket_filter/4]).
@@ -183,3 +184,18 @@ perform_vbucket_filter_change_loop(ThePid, OldState, SentAlready) ->
end,
perform_vbucket_filter_change_loop(ThePid, OldState, true)
end.
+
+%% make sure all migrators have up-to-date connections
+ping_all_replicators(Bucket) ->
+ Childs = try supervisor:which_children(server_name(Bucket))
+ catch exit:{noproc, _} ->
+ []
+ end,
+ [ping_some_replicator(Pid) || {_Id, Pid, _, _} <- Childs],
+ ok.
+
+ping_some_replicator(Pid) ->
+ try ebucketmigrator_srv:ping_connections(Pid, infinity)
+ catch T:E ->
+ ?log_error("Pinging migrator ~p failed:~n~p", [Pid, {T,E}])
+ end.

0 comments on commit 2dbb224

Please sign in to comment.
Something went wrong with that request. Please try again.