Conversation
24aeae3
to
4e79f47
Compare
case ets:lookup(?MODULE, {DbName, DocId}) of | ||
[#rdoc{worker = WRef}] when is_reference(WRef) -> | ||
WRef; | ||
_Other -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's be clear what we expect here. presumably [] is the only other case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would we expect an #rdoc without a worker ref, etc? We should avoid this kind of defensive programming in general.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. You're right. _Other in this case should be [] or a worker with a nil reference. I will fix it
{ok, RepId}; | ||
% Before adding a the job check that this worker is still the current | ||
% worker. This is to handle a race condition where a worker which was | ||
% sleeping and then checking a replication filter my inadvertently re-add |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/my/may/
4e79f47
to
88bf066
Compare
@rnewson Fixed typo. Added explicit cases for nil and worker with nil reference in |
@@ -194,8 +207,8 @@ handle_cast(Msg, State) -> | |||
{stop, {error, unexpected_message, Msg}, State}. | |||
|
|||
|
|||
handle_info({'DOWN', Ref, _, _, #doc_worker_result{id = Id, result = Res}}, | |||
State) -> | |||
handle_info({'DOWN', _, _, _, #doc_worker_result{id = Id, wref = Ref, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Accidentally added a space there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. Good catch. Fixing it
couch_log:warning("replicator scheduler: ~p was already added", [Rep]) | ||
end, | ||
{ok, RepId}; | ||
% Before adding a the job check that this worker is still the current |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Little comment typo here. adding a the job
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. Fixing it
Looks like it'd work, but it'd be nice if we could simplify things a bit. Would it make sense for the doc processor to kill the worker and |
Interesting idea. That might be a bit tricky because the worker is actually 2 processes: a top level wrapper and the actual worker process. The wrapper is used to have a guarantee that no-matter what happens (blocked network request, a redirect loop, etc. ...) workers always returns and don't block indefinitely. Killing the wrapper process will still leave the process which start the replication running. Linking the wrapper and the executor might appear to work but then if executor dies the wrapper will exit with an unspecified exit signal (now it is expected to exit with a well known result record only). I think linking but then also trapping exits in the worker might work. But not sure if that would still simplify the logic overall or just shift some of the tricky bits. |
{_Pid, WRef} = spawn_monitor(fun() -> worker_fun(Id, Rep, WaitSec) end), | ||
WRef. | ||
-spec spawn_worker(db_doc_id(), #rep{}, seconds(), reference()) -> pid(). | ||
spawn_worker(Id, Rep, WaitSec, WRef) -> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need to pass in the ref? Why can't you just add the ref returned by spawn_monitor
to the worker
field in ets?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is done to avoid a race condition.
A reference for the worker is created first and inserted in the ets table. Then the workers with that ref is started. The race condition could happen because the worker after it starts will check if it is still the current worker (for the latest document update):
https://github.com/apache/couchdb-fabric/blob/master/src/fabric_doc_open_revs.erl#L461-L468
There is a (probably mostly theoretical) race condition there if after the worker is spawned the doc processor main process is put to sleep and before it gets a chance to add the reference to ets, in the meantime worker process keeps going, checks that it is not the current worker and exits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM after a squash and rebase.
add_job/1 doesn't return `{error, already_added}` anymore so fix spec to conform.
…able When replication filter changes, replication id record in doc processor ETS table was not updated. This led to the new replication job not showing up in the _scheduler/docs output.
Previously, especially in case of filtered replications, doc processor workers could inadvertently re-add a replication job after it was deleted. Workers after finishing fetching filter code and computing the replication id, would try to add the replication job to the scheduler. They did that without checking if replication document was already deleted, or another worker was spawned. The fix is to create a unique worker reference, pass it to the worker, then worker confirms they are still current and document was not deleted before adding the job, otherwise they exit with an `ignore` result.
8db2107
to
700a929
Compare
@sagelywizard Thank you! 🐎 |
When replication filter changes, replication id record in doc processor ETS
table was not updated. This led to the new replication job not showing up in
the _scheduler/docs output.
Make sure doc processor workers do not re-add deleted replication jobs.
Previously, especially in case of filtered replications, doc processor workers
could inadvertently re-add a replication job after it was deleted. Workers after
finishing fetching filter code and computing the replication id, would try to add
the replication job to the scheduler. They did that without checking if replication
document was already deleted, or another worker was spawned.
Fix add_job/1 spec for scheduler