Skip to content

Commit

Permalink
Merge pull request #82 from johnnyshields/forking-improvements
Browse files Browse the repository at this point in the history
Improve how forking is handled
  • Loading branch information
dblock committed Apr 6, 2021
2 parents db87465 + c4f10f6 commit 2cfd640
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 58 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
@@ -1,7 +1,7 @@
### 3.0.0 (Next)

* [#81](https://github.com/collectiveidea/delayed_job_mongoid/pull/81): Drop support for Mongoid 3.0 and 4.0 - [johnnyshields](https://github.com/johnnyshields).
* [#81](https://github.com/collectiveidea/delayed_job_mongoid/pull/81): Perform disconnect on after_fork on all Mongoid versions - [johnnyshields](https://github.com/johnnyshields).
* [#82](https://github.com/collectiveidea/delayed_job_mongoid/pull/82): Correctly handle before_fork and after_fork hooks - [johnnyshields](https://github.com/johnnyshields).
* Your contribution here.

### 2.3.1 (2019/02/26)
Expand Down
1 change: 0 additions & 1 deletion Gemfile
Expand Up @@ -10,7 +10,6 @@ group :test do
gem 'rspec', '>= 3'
gem 'rubocop', '0.65.0'
gem 'simplecov', '>= 0.9'
gem 'yardstick'
end

case version = ENV['MONGOID_VERSION'] || '7.0'
Expand Down
13 changes: 1 addition & 12 deletions Rakefile
Expand Up @@ -13,15 +13,4 @@ task test: :spec
require 'rubocop/rake_task'
RuboCop::RakeTask.new

require 'yardstick/rake/measurement'
Yardstick::Rake::Measurement.new do |measurement|
measurement.output = 'measurement/report.txt'
end

require 'yardstick/rake/verify'
Yardstick::Rake::Verify.new do |verify|
verify.require_exact_threshold = false
verify.threshold = 53.3
end

task default: %i[spec rubocop verify_measurements]
task default: %i[spec rubocop]
98 changes: 54 additions & 44 deletions lib/delayed/backend/mongoid.rb
Expand Up @@ -23,58 +23,68 @@ class Job

before_save :set_default_run_at

def self.db_time_now
Time.now.utc
def reload(*args)
reset
super
end

# Reserves one job for the worker.
#
# Atomically picks and locks one job from the collection.
def self.reserve(worker, max_run_time = Worker.max_run_time)
right_now = db_time_now
criteria = reservation_criteria(worker, right_now, max_run_time)
criteria.find_one_and_update(
{ '$set' => { locked_at: right_now, locked_by: worker.name } },
return_document: :after
)
end
class << self
def db_time_now
Time.now.utc
end

# Mongo criteria matching all the jobs the worker can reserve.
#
# Jobs are sorted by priority and run_at.
#
# @api private
def self.reservation_criteria(worker, right_now, max_run_time)
criteria = where(
run_at: { '$lte' => right_now },
failed_at: nil
).any_of(
{ locked_by: worker.name },
{ locked_at: nil },
locked_at: { '$lt' => (right_now - max_run_time) }
)
# Reserves one job for the worker.
# Atomically picks and locks one job from the collection.
def reserve(worker, max_run_time = Worker.max_run_time)
right_now = db_time_now
criteria = reservation_criteria(worker, right_now, max_run_time)
criteria.find_one_and_update(
{ '$set' => { locked_at: right_now, locked_by: worker.name } },
return_document: :after
)
end

criteria = criteria.gte(priority: Worker.min_priority.to_i) if Worker.min_priority
criteria = criteria.lte(priority: Worker.max_priority.to_i) if Worker.max_priority
criteria = criteria.any_in(queue: Worker.queues) if Worker.queues.any?
criteria = criteria.desc(:locked_by).asc(:priority).asc(:run_at)
# When a worker is exiting, make sure we don't have any locked jobs.
def clear_locks!(worker_name)
where(locked_by: worker_name).update_all(locked_at: nil, locked_by: nil)
end

criteria
end
# In a multi-process setup, this will be called at boot time
# to close unnecessary database connections on the parent process.
def before_fork
::Mongoid.disconnect_clients
end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
where(locked_by: worker_name).update_all(locked_at: nil, locked_by: nil)
end
# In a multi-process setup, this will be called to ensure fresh
# database connections are immediately made on each newly spawned child process.
def after_fork
::Mongoid::Clients.clients.each do |_name, client|
client.close
client.reconnect
end
end

def reload(*args)
reset
super
end
private

# Mongo criteria matching all the jobs the worker can reserve.
# Jobs are sorted by priority and run_at.
def reservation_criteria(worker, right_now, max_run_time)
criteria = where(
run_at: { '$lte' => right_now },
failed_at: nil
).any_of(
{ locked_by: worker.name },
{ locked_at: nil },
locked_at: { '$lt' => (right_now - max_run_time) }
)

criteria = criteria.gte(priority: Worker.min_priority.to_i) if Worker.min_priority
criteria = criteria.lte(priority: Worker.max_priority.to_i) if Worker.max_priority
criteria = criteria.any_in(queue: Worker.queues) if Worker.queues.any?
criteria = criteria.desc(:locked_by).asc(:priority).asc(:run_at)

# Hook method that is called after a new worker is forked
def self.after_fork
::Mongoid.disconnect_clients
criteria
end
end
end
end
Expand Down
18 changes: 18 additions & 0 deletions spec/delayed_job_mongoid_spec.rb
Expand Up @@ -11,4 +11,22 @@
expect(job.reload.payload_object).to be_a StoryWrapperJob
end
end

describe '.before_fork' do
it 'disconnects Mongoid' do
expect(::Mongoid).to receive(:disconnect_clients)
Delayed::Job.before_fork
end
end

describe '.after_fork' do
# ensure Mongoid is connected
before { ::Delayed::Job.first }

it 'reconnects Mongoid' do
expect_any_instance_of(::Mongo::Client).to receive(:close)
expect_any_instance_of(::Mongo::Client).to receive(:reconnect)
Delayed::Job.after_fork
end
end
end

0 comments on commit 2cfd640

Please sign in to comment.