Skip to content
Browse files

Revert "When using the activerecord backend, reserve jobs with an upd…

…ate query instead of trying to lock a batch of available jobs. This causes workers to serialize on the database and avoids contention between them."

This reverts commit 5c57091.
  • Loading branch information...
1 parent 075fc7e commit edb5bc3d549524a140bf2a5c6631a5130c518107 @betamatt betamatt committed Dec 2, 2010
View
1 generators/delayed_job/templates/migration.rb
@@ -13,7 +13,6 @@ def self.up
end
add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
- add_index :delayed_jobs, :locked_by, :name => 'delayed_jobs_locked_by'
end
def self.down
View
32 lib/delayed/backend/active_record.rb
@@ -30,10 +30,6 @@ class Job < ::ActiveRecord::Base
}
named_scope :by_priority, :order => 'priority ASC, run_at ASC'
- named_scope :locked_by_worker, lambda{|worker_name, max_run_time|
- {:conditions => ['locked_by = ? AND locked_at > ?', worker_name, db_time_now - max_run_time]}
- }
-
def self.after_fork
::ActiveRecord::Base.connection.reconnect!
end
@@ -42,33 +38,15 @@ def self.after_fork
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
-
- def self.jobs_available_to_worker(worker_name, max_run_time)
+
+ # Find a few candidate jobs to run (in case some immediately get locked by others).
+ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
- scope.by_priority
- end
-
- # Reserve a single job in a single update query. This causes workers to serialize on the
- # database and avoids contention.
- def self.reserve(worker, max_run_time = Worker.max_run_time)
- affected_rows = 0
- ::ActiveRecord::Base.silence do
- affected_rows = update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name], jobs_available_to_worker(worker.name, max_run_time).scope(:find)[:conditions], :limit => 1)
- end
-
- if affected_rows == 1
- locked_by_worker(worker.name, max_run_time).first
- else
- nil
- end
- end
-
- # Find a few candidate jobs to run (in case some immediately get locked by others).
- def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
+
::ActiveRecord::Base.silence do
- jobs_available_to_worker(worker_name, max_run_time).all(:limit => limit)
+ scope.by_priority.all(:limit => limit)
end
end
View
4 lib/delayed/backend/base.rb
@@ -24,8 +24,8 @@ def enqueue(*args)
def reserve(worker, max_run_time = Worker.max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
- find_available(worker.name, 5, max_run_time).detect do |job|
- job.lock_exclusively!(max_run_time, worker.name)
+ find_available(worker, 5, max_run_time).detect do |job|
+ job.lock_exclusively!(max_run_time, worker)
end
end
View
2 lib/delayed/worker.rb
@@ -169,7 +169,7 @@ def handle_failed_job(job, error)
# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def reserve_and_run_one_job
- job = Delayed::Job.reserve(self)
+ job = Delayed::Job.reserve(worker, self.class.max_run_time)
run(job) if job
end
end
View
40 spec/backend/shared_backend_spec.rb
@@ -173,46 +173,6 @@ def create_job(opts = {})
end
end
- describe "reserve" do
- before do
- Delayed::Worker.max_run_time = 2.minutes
- @worker = Delayed::Worker.new(:quiet => true)
- end
-
- it "should not reserve failed jobs" do
- create_job :attempts => 50, :failed_at => described_class.db_time_now
- described_class.reserve(@worker).should be_nil
- end
-
- it "should not reserve jobs scheduled for the future" do
- create_job :run_at => (described_class.db_time_now + 1.minute)
- described_class.reserve(@worker).should be_nil
- end
-
- it "should lock the job so other workers can't reserve it" do
- job = create_job
- described_class.reserve(@worker).should == job
- new_worker = Delayed::Worker.new(:quiet => true)
- new_worker.name = 'worker2'
- described_class.reserve(new_worker).should be_nil
- end
-
- it "should reserve open jobs" do
- job = create_job
- described_class.reserve(@worker).should == job
- end
-
- it "should reserve expired jobs" do
- job = create_job(:locked_by => @worker.name, :locked_at => described_class.db_time_now - 3.minutes)
- described_class.reserve(@worker).should == job
- end
-
- it "should reserve own jobs" do
- job = create_job(:locked_by => @worker.name, :locked_at => (described_class.db_time_now - 1.minutes))
- described_class.reserve(@worker).should == job
- end
- end
-
context "#name" do
it "should be the class name of the job that was enqueued" do
@backend.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'

0 comments on commit edb5bc3

Please sign in to comment.
Something went wrong with that request. Please try again.