Skip to content
This repository has been archived by the owner on Sep 19, 2019. It is now read-only.

Commit

Permalink
Make sure doc processor workers do not re-add deleted replication jobs
Browse files Browse the repository at this point in the history
Previously, especially in case of filtered replications, doc processor workers
could inadvertently re-add a replication job after it was deleted.

Workers after finishing fetching filter code and computing the replication
id, would try to add the replication job to the scheduler. They did that without
checking if replication document was already deleted, or another worker
was spawned.

The fix is to create a unique worker reference, pass it to the
worker, then worker confirms they are still current and document was not
deleted before adding the job, otherwise they exit with an `ignore` result.
  • Loading branch information
nickva committed Dec 8, 2016
1 parent 606d410 commit 88bf066
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 37 deletions.
2 changes: 2 additions & 0 deletions src/couch_replicator.hrl
Expand Up @@ -30,11 +30,13 @@
-type seconds() :: non_neg_integer().
-type rep_start_result() ::
{ok, rep_id()} |
ignore |
{temporary_error, binary()} |
{permanent_failure, binary()}.


-record(doc_worker_result, {
id :: db_doc_id(),
wref :: reference(),
result :: rep_start_result()
}).
48 changes: 40 additions & 8 deletions src/couch_replicator_doc_processor.erl
Expand Up @@ -16,6 +16,7 @@
-export([start_link/0]).
-export([docs/1, doc/2]).
-export([update_docs/0]).
-export([get_worker_ref/1]).

% multidb changes callback
-export([db_created/2, db_deleted/2, db_found/2, db_change/3]).
Expand Down Expand Up @@ -88,6 +89,18 @@ db_change(DbName, {ChangeProps} = Change, Server) ->
Server.


-spec get_worker_ref(db_doc_id()) -> reference() | nil.
get_worker_ref({DbName, DocId}) when is_binary(DbName), is_binary(DocId) ->
case ets:lookup(?MODULE, {DbName, DocId}) of
[#rdoc{worker = WRef}] when is_reference(WRef) ->
WRef;
[#rdoc{worker = nil}] ->
nil;
[] ->
nil
end.


% Private helpers for multidb changes API, these updates into the doc
% processor gen_server

Expand Down Expand Up @@ -194,8 +207,8 @@ handle_cast(Msg, State) ->
{stop, {error, unexpected_message, Msg}, State}.


handle_info({'DOWN', Ref, _, _, #doc_worker_result{id = Id, result = Res}},
State) ->
handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref,
result = Res}}, State) ->
ok = worker_returned(Ref, Id, Res),
{noreply, State};

Expand Down Expand Up @@ -315,6 +328,9 @@ worker_returned(Ref, Id, {ok, RepId}) ->
end,
ok;

worker_returned(_Ref, _Id, ignore) ->
ok;

worker_returned(Ref, Id, {temporary_error, Reason}) ->
case ets:lookup(?MODULE, Id) of
[#rdoc{worker = Ref, errcnt = ErrCnt} = Row] ->
Expand Down Expand Up @@ -404,8 +420,9 @@ maybe_start_worker(Id) ->
ok;
[#rdoc{rep = Rep} = Doc] ->
Wait = get_worker_wait(Doc),
WRef = couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait),
true = ets:insert(?MODULE, Doc#rdoc{worker = WRef}),
Ref = make_ref(),
true = ets:insert(?MODULE, Doc#rdoc{worker = Ref}),
couch_replicator_doc_processor_worker:spawn_worker(Id, Rep, Wait, Ref),
ok
end.

Expand Down Expand Up @@ -733,6 +750,22 @@ normalize_rep_test_() ->
}.


get_worker_ref_test_() ->
{
setup,
fun() -> ets:new(?MODULE, [named_table, public, {keypos, #rdoc.id}]) end,
fun(_) -> ets:delete(?MODULE) end,
?_test(begin
Id = {<<"db">>, <<"doc">>},
?assertEqual(nil, get_worker_ref(Id)),
ets:insert(?MODULE, #rdoc{id = Id, worker = nil}),
?assertEqual(nil, get_worker_ref(Id)),
Ref = make_ref(),
ets:insert(?MODULE, #rdoc{id = Id, worker = Ref}),
?assertEqual(Ref, get_worker_ref(Id))
end)
}.


% Test helper functions

Expand All @@ -745,7 +778,7 @@ setup() ->
meck:expect(config, get, fun(_, _, Default) -> Default end),
meck:expect(config, listen_for_changes, 2, ok),
meck:expect(couch_replicator_clustering, owner, 2, node()),
meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 3, wref),
meck:expect(couch_replicator_doc_processor_worker, spawn_worker, 4, pid),
meck:expect(couch_replicator_scheduler, remove_job, 1, ok),
meck:expect(couch_replicator_docs, remove_state_fields, 2, ok),
meck:expect(couch_replicator_docs, update_failed, 4, ok),
Expand All @@ -763,9 +796,8 @@ removed_state_fields() ->
meck:called(couch_replicator_docs, remove_state_fields, [?DB, ?DOC1]).


started_worker(Id) ->
meck:called(couch_replicator_doc_processor_worker, spawn_worker,
[Id, '_', '_']).
started_worker(_Id) ->
1 == meck:num_calls(couch_replicator_doc_processor_worker, spawn_worker, 4).


removed_job(Id) ->
Expand Down
94 changes: 65 additions & 29 deletions src/couch_replicator_doc_processor_worker.erl
Expand Up @@ -12,7 +12,7 @@

-module(couch_replicator_doc_processor_worker).

-export([spawn_worker/3]).
-export([spawn_worker/4]).

-include("couch_replicator.hrl").

Expand All @@ -27,19 +27,19 @@
% a worker will then exit with the #doc_worker_result{} record within
% ?WORKER_TIMEOUT_MSEC timeout period.A timeout is considered a `temporary_error`.
% Result will be sent as the `Reason` in the {'DOWN',...} message.
-spec spawn_worker(db_doc_id(), #rep{}, seconds()) -> reference().
spawn_worker(Id, Rep, WaitSec) ->
{_Pid, WRef} = spawn_monitor(fun() -> worker_fun(Id, Rep, WaitSec) end),
WRef.
-spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid().
spawn_worker(Id, Rep, WaitSec, WRef) ->
{Pid, _Ref} = spawn_monitor(fun() -> worker_fun(Id, Rep, WaitSec, WRef) end),
Pid.


% Private functions

-spec worker_fun(db_doc_id(), #rep{}, seconds()) -> no_return().
worker_fun(Id, Rep, WaitSec) ->
-spec worker_fun(db_doc_id(), #rep{}, seconds(), reference()) -> no_return().
worker_fun(Id, Rep, WaitSec, WRef) ->
timer:sleep(WaitSec * 1000),
Fun = fun() ->
try maybe_start_replication(Id, Rep) of
try maybe_start_replication(Id, Rep, WRef) of
Res ->
exit(Res)
catch
Expand All @@ -52,7 +52,7 @@ worker_fun(Id, Rep, WaitSec) ->
{Pid, Ref} = spawn_monitor(Fun),
receive
{'DOWN', Ref, _, Pid, Result} ->
exit(#doc_worker_result{id = Id, result = Result})
exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
after ?WORKER_TIMEOUT_MSEC ->
erlang:demonitor(Ref, [flush]),
exit(Pid, kill),
Expand All @@ -61,17 +61,19 @@ worker_fun(Id, Rep, WaitSec) ->
Msg = io_lib:format("Replication for db ~p doc ~p failed to start due "
"to timeout after ~B seconds", [DbName, DocId, TimeoutSec]),
Result = {temporary_error, couch_util:to_binary(Msg)},
exit(#doc_worker_result{id = Id, result = Result})
exit(#doc_worker_result{id = Id, wref = WRef, result = Result})
end.


% Try to start a replication. Used by a worker. This function should return
% rep_start_result(), also throws {filter_fetch_error, Reason} if cannot fetch filter.
% It can also block for an indeterminate amount of time while fetching the
% filter.
maybe_start_replication(Id, RepWithoutId) ->
maybe_start_replication(Id, RepWithoutId, WRef) ->
Rep = couch_replicator_docs:update_rep_id(RepWithoutId),
case maybe_add_job_to_scheduler(Id, Rep) of
case maybe_add_job_to_scheduler(Id, Rep, WRef) of
ignore ->
ignore;
{ok, RepId} ->
{ok, RepId};
{temporary_error, Reason} ->
Expand All @@ -84,18 +86,23 @@ maybe_start_replication(Id, RepWithoutId) ->
end.


-spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}) -> rep_start_result().
maybe_add_job_to_scheduler({_DbName, DocId}, Rep) ->
-spec maybe_add_job_to_scheduler(db_doc_id(), #rep{}, reference()) ->
rep_start_result().
maybe_add_job_to_scheduler({DbName, DocId}, Rep, WRef) ->
RepId = Rep#rep.id,
case couch_replicator_scheduler:rep_state(RepId) of
nil ->
case couch_replicator_scheduler:add_job(Rep) of
ok ->
ok;
{error, already_added} ->
couch_log:warning("replicator scheduler: ~p was already added", [Rep])
end,
{ok, RepId};
% Before adding a the job check that this worker is still the current
% worker. This is to handle a race condition where a worker which was
% sleeping and then checking a replication filter may inadvertently re-add
% a replication which was already deleted.
case couch_replicator_doc_processor:get_worker_ref({DbName, DocId}) of
WRef ->
ok = couch_replicator_scheduler:add_job(Rep),
{ok, RepId};
_NilOrOtherWRef ->
ignore
end;
#rep{doc_id = DocId} ->
{ok, RepId};
#rep{doc_id = null} ->
Expand Down Expand Up @@ -130,7 +137,9 @@ doc_processor_worker_test_() ->
t_already_running_same_docid(),
t_already_running_transient(),
t_already_running_other_db_other_doc(),
t_spawn_worker()
t_spawn_worker(),
t_ignore_if_doc_deleted(),
t_ignore_if_worker_ref_does_not_match()
]
}.

Expand All @@ -140,7 +149,7 @@ t_should_add_job() ->
?_test(begin
Id = {?DB, ?DOC1},
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep)),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
?assert(added_job())
end).

Expand All @@ -151,7 +160,7 @@ t_already_running_same_docid() ->
Id = {?DB, ?DOC1},
mock_already_running(?DB, ?DOC1),
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep)),
?assertEqual({ok, ?R1}, maybe_start_replication(Id, Rep, nil)),
?assert(did_not_add_job())
end).

Expand All @@ -162,7 +171,7 @@ t_already_running_transient() ->
Id = {?DB, ?DOC1},
mock_already_running(null, null),
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep)),
?assertMatch({temporary_error, _}, maybe_start_replication(Id, Rep, nil)),
?assert(did_not_add_job())
end).

Expand All @@ -174,7 +183,7 @@ t_already_running_other_db_other_doc() ->
Id = {?DB, ?DOC1},
mock_already_running(<<"otherdb">>, <<"otherdoc">>),
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep)),
?assertMatch({permanent_failure, _}, maybe_start_replication(Id, Rep, nil)),
?assert(did_not_add_job()),
1 == meck:num_calls(couch_replicator_docs, update_failed, '_')
end).
Expand All @@ -185,15 +194,41 @@ t_spawn_worker() ->
?_test(begin
Id = {?DB, ?DOC1},
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
Ref = spawn_worker(Id, Rep, 0),
Res = receive {'DOWN', Ref, _, _, Reason} -> Reason
WRef = make_ref(),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, WRef),
Pid = spawn_worker(Id, Rep, 0, WRef),
Res = receive {'DOWN', _Ref, process, Pid, Reason} -> Reason
after 1000 -> timeout end,
Expect = #doc_worker_result{id = Id, result = {ok, ?R1}},
Expect = #doc_worker_result{id = Id, wref = WRef, result = {ok, ?R1}},
?assertEqual(Expect, Res),
?assert(added_job())
end).


% Should not add job if by the time worker got to fetching the filter
% and getting a replication id, replication doc was deleted
t_ignore_if_doc_deleted() ->
?_test(begin
Id = {?DB, ?DOC1},
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
?assertNot(added_job())
end).


% Should not add job if by the time worker got to fetchign the filter
% and building a replication id, another worker was spawned.
t_ignore_if_worker_ref_does_not_match() ->
?_test(begin
Id = {?DB, ?DOC1},
Rep = couch_replicator_docs:parse_rep_doc_without_id(change()),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, make_ref()),
?assertEqual(ignore, maybe_start_replication(Id, Rep, make_ref())),
?assertNot(added_job())
end).


% Test helper functions

setup() ->
Expand All @@ -202,6 +237,7 @@ setup() ->
meck:expect(couch_server, get_uuid, 0, this_is_snek),
meck:expect(couch_replicator_docs, update_failed, 4, ok),
meck:expect(couch_replicator_scheduler, rep_state, 1, nil),
meck:expect(couch_replicator_doc_processor, get_worker_ref, 1, nil),
ok.


Expand Down

0 comments on commit 88bf066

Please sign in to comment.