Skip to content

Commit

Permalink
Correctly handle before_fork and after_fork hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyshields committed Apr 4, 2021
1 parent db87465 commit c9ea0ab
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 45 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
@@ -1,7 +1,7 @@
### 3.0.0 (Next)

* [#81](https://github.com/collectiveidea/delayed_job_mongoid/pull/81): Drop support for Mongoid 3.0 and 4.0 - [johnnyshields](https://github.com/johnnyshields).
* [#81](https://github.com/collectiveidea/delayed_job_mongoid/pull/81): Perform disconnect on after_fork on all Mongoid versions - [johnnyshields](https://github.com/johnnyshields).
* [#82](https://github.com/collectiveidea/delayed_job_mongoid/pull/82): Correctly handle before_fork and after_fork hooks - [johnnyshields](https://github.com/johnnyshields).
* Your contribution here.

### 2.3.1 (2019/02/26)
Expand Down
98 changes: 54 additions & 44 deletions lib/delayed/backend/mongoid.rb
Expand Up @@ -23,58 +23,68 @@ class Job

before_save :set_default_run_at

def self.db_time_now
Time.now.utc
def reload(*args)
reset
super
end

# Reserves one job for the worker.
#
# Atomically picks and locks one job from the collection.
def self.reserve(worker, max_run_time = Worker.max_run_time)
right_now = db_time_now
criteria = reservation_criteria(worker, right_now, max_run_time)
criteria.find_one_and_update(
{ '$set' => { locked_at: right_now, locked_by: worker.name } },
return_document: :after
)
end
class << self
def db_time_now
Time.now.utc
end

# Mongo criteria matching all the jobs the worker can reserve.
#
# Jobs are sorted by priority and run_at.
#
# @api private
def self.reservation_criteria(worker, right_now, max_run_time)
criteria = where(
run_at: { '$lte' => right_now },
failed_at: nil
).any_of(
{ locked_by: worker.name },
{ locked_at: nil },
locked_at: { '$lt' => (right_now - max_run_time) }
)
# Reserves one job for the worker.
# Atomically picks and locks one job from the collection.
def reserve(worker, max_run_time = Worker.max_run_time)
right_now = db_time_now
criteria = reservation_criteria(worker, right_now, max_run_time)
criteria.find_one_and_update(
{ '$set' => { locked_at: right_now, locked_by: worker.name } },
return_document: :after
)
end

criteria = criteria.gte(priority: Worker.min_priority.to_i) if Worker.min_priority
criteria = criteria.lte(priority: Worker.max_priority.to_i) if Worker.max_priority
criteria = criteria.any_in(queue: Worker.queues) if Worker.queues.any?
criteria = criteria.desc(:locked_by).asc(:priority).asc(:run_at)
# When a worker is exiting, make sure we don't have any locked jobs.
def clear_locks!(worker_name)
where(locked_by: worker_name).update_all(locked_at: nil, locked_by: nil)
end

criteria
end
# In a multi-process setup, this will be called at boot time
# to close unnecessary database connections on the parent process.
def before_fork
::Mongoid.disconnect_clients
end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
where(locked_by: worker_name).update_all(locked_at: nil, locked_by: nil)
end
# In a multi-process setup, this will be called to ensure fresh
# database connections are immediately made on each newly spawned child process.
def after_fork
::Mongoid::Clients.clients.each do |_name, client|
client.close
client.reconnect
end
end

def reload(*args)
reset
super
end
private

# Mongo criteria matching all the jobs the worker can reserve.
# Jobs are sorted by priority and run_at.
def reservation_criteria(worker, right_now, max_run_time)
criteria = where(
run_at: { '$lte' => right_now },
failed_at: nil
).any_of(
{ locked_by: worker.name },
{ locked_at: nil },
locked_at: { '$lt' => (right_now - max_run_time) }
)

criteria = criteria.gte(priority: Worker.min_priority.to_i) if Worker.min_priority
criteria = criteria.lte(priority: Worker.max_priority.to_i) if Worker.max_priority
criteria = criteria.any_in(queue: Worker.queues) if Worker.queues.any?
criteria = criteria.desc(:locked_by).asc(:priority).asc(:run_at)

# Hook method that is called after a new worker is forked
def self.after_fork
::Mongoid.disconnect_clients
criteria
end
end
end
end
Expand Down
15 changes: 15 additions & 0 deletions spec/delayed_job_mongoid_spec.rb
Expand Up @@ -11,4 +11,19 @@
expect(job.reload.payload_object).to be_a StoryWrapperJob
end
end

describe '.before_fork' do
it 'disconnects Mongoid' do
expect(::Mongoid).to receive(:disconnect_clients)
Delayed::Job.before_fork
end
end

describe '.after_fork' do
it 'reconnects Mongoid' do
expect_any_instance_of(::Mongo::Client).to receive(:close)
expect_any_instance_of(::Mongo::Client).to receive(:reconnect)
Delayed::Job.after_fork
end
end
end

0 comments on commit c9ea0ab

Please sign in to comment.