diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index cbf47e211..147926620 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -16,6 +16,10 @@ class Job < ::ActiveRecord::Base } scope :by_priority, order('priority ASC, run_at ASC') + scope :locked_by_worker, lambda{|worker_name, max_run_time| + where(['locked_by = ? AND locked_at > ?', worker_name, db_time_now - max_run_time]) + } + def self.before_fork ::ActiveRecord::Base.clear_all_connections! end @@ -28,15 +32,33 @@ def self.after_fork def self.clear_locks!(worker_name) update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end - - # Find a few candidate jobs to run (in case some immediately get locked by others). - def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) + + def self.jobs_available_to_worker(worker_name, max_run_time) scope = self.ready_to_run(worker_name, max_run_time) scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority - + scope.by_priority + end + + # Reserve a single job in a single update query. This causes workers to serialize on the + # database and avoids contention. + def self.reserve(worker, max_run_time = Worker.max_run_time) + affected_rows = 0 + ::ActiveRecord::Base.silence do + affected_rows = update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name], jobs_available_to_worker(worker.name, max_run_time).scope(:find)[:conditions], :limit => 1) + end + + if affected_rows == 1 + locked_by_worker(worker.name, max_run_time).first + else + nil + end + end + + # Find a few candidate jobs to run (in case some immediately get locked by others). + def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) ::ActiveRecord::Base.silence do - scope.by_priority.all(:limit => limit) + jobs_available_to_worker(worker_name, max_run_time).all(:limit => limit) end end diff --git a/lib/generators/delayed_job/templates/migration.rb b/lib/generators/delayed_job/templates/migration.rb index ac579dfcd..a4fd61282 100644 --- a/lib/generators/delayed_job/templates/migration.rb +++ b/lib/generators/delayed_job/templates/migration.rb @@ -13,6 +13,7 @@ def self.up end add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority' + add_index :delayed_jobs, :locked_by, :name => 'delayed_jobs_locked_by' end def self.down