Skip to content

Commit

Permalink
Ensure design docs are uploaded individually when replicating with _b…
Browse files Browse the repository at this point in the history
…ulk_get

Previously, when replication jobs used _bulk_get they didn't upload design
docs individually like they do when not using _bulk_get.

Here were are preserving an already existing behavior which we had in the
replicator without _bulk_get usage for more than 2 years. It was introduced
here in #2426. Related to these issues #2415 and #2413.

Add tests to cover both attachments and ddoc cases. meck:num_calls/3 is helpful
as it allows to nicely assert which API function was called and how many times.
  • Loading branch information
nickva committed Jan 13, 2023
1 parent 01c161b commit a14922f
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 8 deletions.
22 changes: 15 additions & 7 deletions src/couch_replicator/src/couch_replicator_worker.erl
Original file line number Diff line number Diff line change
Expand Up @@ -297,17 +297,25 @@ queue_fetch_loop(#fetch_st{} = St) ->
{changes, ChangesManager, Changes, ReportSeq} ->
% Find missing revisions (POST to _revs_diff)
{IdRevs, RdSt1} = find_missing(Changes, Target, Parent, RdSt),
{Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs, Parent, BgSt),
% Documents without attachments can be uploaded right away
BatchFun = fun({_, #doc{} = Doc}) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
% Filter out and handle design docs individually
DDocFilter = fun
({<<?DESIGN_DOC_PREFIX, _/binary>>, _Rev}, _PAs) -> true;
({_Id, _Rev}, _PAs) -> false
end,
lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
% Fetch individually if _bulk_get failed or there are attachments
DDocIdRevs = maps:filter(DDocFilter, IdRevs),
FetchFun = fun({Id, Rev}, PAs) ->
ok = gen_server:call(Parent, {fetch_doc, {Id, [Rev], PAs}}, infinity)
end,
maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs)),
maps:map(FetchFun, DDocIdRevs),
% IdRevs1 is all the docs without design docs. Bulk get those.
IdRevs1 = maps:without(maps:keys(DDocIdRevs), IdRevs),
{Docs, BgSt1} = bulk_get(UseBulkGet, Source, IdRevs1, Parent, BgSt),
BatchFun = fun({_, #doc{} = Doc}) ->
ok = gen_server:call(Parent, {batch_doc, Doc}, infinity)
end,
lists:foreach(BatchFun, lists:sort(maps:to_list(Docs))),
% Invidually upload docs with attachments.
maps:map(FetchFun, maps:without(maps:keys(Docs), IdRevs1)),
{ok, Stats} = gen_server:call(Parent, flush, infinity),
ok = report_seq_done(Cp, ReportSeq, Stats),
couch_log:debug("Worker reported completion of seq ~p", [ReportSeq]),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ bulk_get_test_() ->
fun couch_replicator_test_helper:test_teardown/1,
[
?TDEF_FE(use_bulk_get),
?TDEF_FE(use_bulk_get_with_ddocs),
?TDEF_FE(use_bulk_get_with_attachments),
?TDEF_FE(dont_use_bulk_get),
?TDEF_FE(dont_use_bulk_get_ddocs),
?TDEF_FE(dont_use_bulk_get_attachments),
?TDEF_FE(job_enable_overrides_global_disable),
?TDEF_FE(global_disable_works)
]
Expand All @@ -39,7 +43,33 @@ use_bulk_get({_Ctx, {Source, Target}}) ->
replicate(Source, Target, true),
BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
?assertEqual(0, JustGets),
?assertEqual(0, DocUpdates),
?assert(BulkGets >= 1),
compare_dbs(Source, Target).

use_bulk_get_with_ddocs({_Ctx, {Source, Target}}) ->
populate_db_ddocs(Source, ?DOC_COUNT),
meck:new(couch_replicator_api_wrap, [passthrough]),
replicate(Source, Target, true),
BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
?assertEqual(?DOC_COUNT, JustGets),
?assertEqual(?DOC_COUNT, DocUpdates),
?assert(BulkGets >= 1),
compare_dbs(Source, Target).

use_bulk_get_with_attachments({_Ctx, {Source, Target}}) ->
populate_db_atts(Source, ?DOC_COUNT),
meck:new(couch_replicator_api_wrap, [passthrough]),
replicate(Source, Target, true),
BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
?assertEqual(?DOC_COUNT, JustGets),
?assertEqual(?DOC_COUNT, DocUpdates),
?assert(BulkGets >= 1),
compare_dbs(Source, Target).

Expand All @@ -49,10 +79,36 @@ dont_use_bulk_get({_Ctx, {Source, Target}}) ->
replicate(Source, Target, false),
BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
?assertEqual(0, BulkGets),
?assertEqual(0, DocUpdates),
?assertEqual(?DOC_COUNT, JustGets),
compare_dbs(Source, Target).

dont_use_bulk_get_ddocs({_Ctx, {Source, Target}}) ->
populate_db_ddocs(Source, ?DOC_COUNT),
meck:new(couch_replicator_api_wrap, [passthrough]),
replicate(Source, Target, false),
BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
?assertEqual(0, BulkGets),
?assertEqual(?DOC_COUNT, JustGets),
?assertEqual(?DOC_COUNT, DocUpdates),
compare_dbs(Source, Target).

dont_use_bulk_get_attachments({_Ctx, {Source, Target}}) ->
populate_db_atts(Source, ?DOC_COUNT),
meck:new(couch_replicator_api_wrap, [passthrough]),
replicate(Source, Target, false),
BulkGets = meck:num_calls(couch_replicator_api_wrap, bulk_get, 3),
JustGets = meck:num_calls(couch_replicator_api_wrap, open_doc_revs, 6),
DocUpdates = meck:num_calls(couch_replicator_api_wrap, update_doc, 4),
?assertEqual(0, BulkGets),
?assertEqual(?DOC_COUNT, JustGets),
?assertEqual(?DOC_COUNT, DocUpdates),
compare_dbs(Source, Target).

job_enable_overrides_global_disable({_Ctx, {Source, Target}}) ->
populate_db(Source, ?DOC_COUNT),
Persist = false,
Expand All @@ -78,10 +134,31 @@ global_disable_works({_Ctx, {Source, Target}}) ->
compare_dbs(Source, Target).

populate_db(DbName, DocCount) ->
Fun = fun(Id, Acc) -> [#doc{id = integer_to_binary(Id)} | Acc] end,
IdFun = fun(Id) -> integer_to_binary(Id) end,
Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id)} | Acc] end,
Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
{ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).

populate_db_ddocs(DbName, DocCount) ->
IdFun = fun(Id) -> <<"_design/", (integer_to_binary(Id))/binary>> end,
Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id)} | Acc] end,
Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
{ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).

populate_db_atts(DbName, DocCount) ->
IdFun = fun(Id) -> integer_to_binary(Id) end,
Fun = fun(Id, Acc) -> [#doc{id = IdFun(Id), atts = [att(<<"a">>)]} | Acc] end,
Docs = lists:foldl(Fun, [], lists:seq(1, DocCount)),
{ok, _} = fabric:update_docs(DbName, Docs, [?ADMIN_CTX]).

att(Name) when is_binary(Name) ->
couch_att:new([
{name, Name},
{att_len, 1},
{type, <<"app/binary">>},
{data, <<"x">>}
]).

compare_dbs(Source, Target) ->
couch_replicator_test_helper:cluster_compare_dbs(Source, Target).

Expand Down

0 comments on commit a14922f

Please sign in to comment.