From 45d739af3fcf8b4f8e3ccca152cb3c2d781dc2fc Mon Sep 17 00:00:00 2001 From: Nick Vatamaniuc Date: Tue, 28 Feb 2017 14:00:22 -0500 Subject: [PATCH] Restore adding some jitter-ed sleep to shard scanning code. Otherwise a large cluster will flood replicator manager with potentially hundreds of thousands of `{resume, Shard}` messages. For each one, it would try to open a changes feed which can add significant load and has been seen in production to hit varios system limits. This brings back the change from before the switch to using mem3 shards for replicator db scans. Also adds a few tests. COUCHDB-3311 --- src/couch_replicator_manager.erl | 74 ++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 14 deletions(-) diff --git a/src/couch_replicator_manager.erl b/src/couch_replicator_manager.erl index bdc3b8f..4e5073e 100644 --- a/src/couch_replicator_manager.erl +++ b/src/couch_replicator_manager.erl @@ -934,22 +934,30 @@ scan_all_dbs(Server) when is_pid(Server) -> {ok, Db} = mem3_util:ensure_exists( config:get("mem3", "shards_db", "_dbs")), ChangesFun = couch_changes:handle_changes(#changes_args{}, nil, Db, nil), - ChangesFun(fun({change, {Change}, _}, _) -> - DbName = couch_util:get_value(<<"id">>, Change), - case DbName of <<"_design/", _/binary>> -> ok; _Else -> - case couch_replicator_utils:is_deleted(Change) of - true -> - ok; - false -> - [gen_server:cast(Server, {resume_scan, ShardName}) - || ShardName <- replicator_shards(DbName)], - ok - end - end; - (_, _) -> ok - end), + ChangesFun({fun scan_changes_cb/3, {Server, 1}}), couch_db:close(Db). +scan_changes_cb({change, {Change}, _}, _, {Server, AccCount}) -> + DbName = couch_util:get_value(<<"id">>, Change), + case DbName of <<"_design/", _/binary>> -> {Server, AccCount}; _Else -> + case couch_replicator_utils:is_deleted(Change) of + true -> + {Server, AccCount}; + false -> + UpdatedCount = lists:foldl(fun(ShardName, Count) -> + spawn_link(fun() -> + timer:sleep(jitter(Count)), + gen_server:cast(Server, {resume_scan, ShardName}) + end), + Count + 1 + end, AccCount, replicator_shards(DbName)), + {Server, UpdatedCount} + end + end; + +scan_changes_cb(_, _, {Server, AccCount}) -> + {Server, AccCount}. + replicator_shards(DbName) -> case is_replicator_db(DbName) of @@ -1027,4 +1035,42 @@ t_fail_non_replicator_shard() -> end). +scan_dbs_test_() -> +{ + foreach, + fun() -> test_util:start_couch([mem3, fabric]) end, + fun(Ctx) -> test_util:stop_couch(Ctx) end, + [ + t_resume_db_shard(), + t_sleep_based_on_count() + ] +}. + + +t_resume_db_shard() -> + ?_test(begin + DbName0 = ?tempdb(), + DbName = <>, + ok = fabric:create_db(DbName, [?CTX]), + Change = {[{<<"id">>, DbName}]}, + scan_changes_cb({change, Change, req}, type, {self(), 1}), + ResumeMsg = receive Msg -> Msg after 1000 -> timeout end, + ?assertMatch({'$gen_cast', {resume_scan, <<"shards/", _/binary>>}}, ResumeMsg), + fabric:delete_db(DbName, [?CTX]) + end). + + +t_sleep_based_on_count() -> + ?_test(begin + DbName0 = ?tempdb(), + DbName = <>, + ok = fabric:create_db(DbName, [?CTX]), + Change = {[{<<"id">>, DbName}]}, + scan_changes_cb({change, Change, req}, type, {self(), 1000}), + Timeout = receive Msg -> Msg after 100 -> timeout end, + ?assertEqual(timeout, Timeout), + fabric:delete_db(DbName, [?CTX]) + end). + + -endif.