Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added Delayed::Job.reserve for a cleaner way to find and lock one ava…

…ilable job
  • Loading branch information...
commit 1febad8abb4e5b9c4f96646698369f39785b2d88 1 parent def384c
@bkeepers bkeepers authored
Showing with 36 additions and 107 deletions.
  1. +6 −0 lib/delayed/backend/base.rb
  2. +30 −107 lib/delayed/backend/shared_spec.rb
View
6 lib/delayed/backend/base.rb
@@ -32,6 +32,12 @@ def enqueue(*args)
end
end
+ def reserve(worker, max_run_time = Worker.max_run_time)
+ find_available(worker, 5, max_run_time).detect do |job|
+ job.lock_exclusively!(max_run_time, worker)
+ end
+ end
+
# Hook method that is called before a new worker is forked
def before_fork
end
View
137 lib/delayed/backend/shared_spec.rb
@@ -133,77 +133,50 @@ def create_job(opts = {})
end
end
- describe "find_available" do
- it "should not find failed jobs" do
- @job = create_job :attempts => 50, :failed_at => described_class.db_time_now
- described_class.find_available('worker', 5, 1.second).should_not include(@job)
+ describe "reserve" do
+ it "should not reserve failed jobs" do
+ create_job :attempts => 50, :failed_at => described_class.db_time_now
+ described_class.reserve('worker', 1.second).should be_nil
end
- it "should not find jobs scheduled for the future" do
- @job = create_job :run_at => (described_class.db_time_now + 1.minute)
- described_class.find_available('worker', 5, 4.hours).should_not include(@job)
+ it "should not reserve jobs scheduled for the future" do
+ create_job :run_at => (described_class.db_time_now + 1.minute)
+ described_class.reserve('worker', 4.hours).should be_nil
end
- it "should not find jobs locked by another worker" do
- @job = create_job(:locked_by => 'other_worker', :locked_at => described_class.db_time_now - 1.minute)
- described_class.find_available('worker', 5, 4.hours).should_not include(@job)
+ it "should lock the job so other workers can't reserve it" do
+ job = create_job
+ described_class.reserve('worker1', 4.hours).should == job
+ described_class.reserve('worker2', 4.hours).should be_nil
end
- it "should find open jobs" do
- @job = create_job
- described_class.find_available('worker', 5, 4.hours).should include(@job)
+ it "should reserve open jobs" do
+ job = create_job
+ described_class.reserve('worker', 4.hours).should == job
end
- it "should find expired jobs" do
- @job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now - 2.minutes)
- described_class.find_available('worker', 5, 1.minute).should include(@job)
+ it "should reserve expired jobs" do
+ job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now - 2.minutes)
+ described_class.reserve('worker', 1.minute).should == job
end
- it "should find own jobs" do
- @job = create_job(:locked_by => 'worker', :locked_at => (described_class.db_time_now - 1.minutes))
- described_class.find_available('worker', 5, 4.hours).should include(@job)
+ it "should reserve own jobs" do
+ job = create_job(:locked_by => 'worker', :locked_at => (described_class.db_time_now - 1.minutes))
+ described_class.reserve('worker', 4.hours).should == job
end
- it "should find only the right amount of jobs" do
- 10.times { create_job }
- described_class.find_available('worker', 7, 4.hours).should have(7).jobs
- end
- end
-
- context "when another worker is already performing a task, it" do
- before :each do
- @job = described_class.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => described_class.db_time_now - 5.minutes
- end
-
- it "should not allow a second worker to get exclusive access" do
- @job.lock_exclusively!(4.hours, 'worker2').should == false
- end
-
- it "should allow a second worker to get exclusive access if the timeout has passed" do
- @job.lock_exclusively!(1.minute, 'worker2').should == true
- end
-
- it "should be able to get access to the task if it was started more then max_age ago" do
- @job.locked_at = described_class.db_time_now - 5.hours
- @job.save
-
- @job.lock_exclusively!(4.hours, 'worker2').should be_true
-
- described_class.find_available('worker2').should_not be_empty
- end
-
- it "should not be found by another worker" do
- described_class.find_available('worker2', 1, 6.minutes).length.should == 0
- end
+ context "when another worker is already performing a task" do
+ before :each do
+ @job = described_class.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => described_class.db_time_now - 5.minutes
+ end
- it "should be found by another worker if the time has expired" do
- described_class.find_available('worker2', 1, 4.minutes).length.should == 1
- end
+ it "should not allow a second worker to get exclusive access" do
+ described_class.reserve('worker2').should be_nil
+ end
- it "should be able to get exclusive access again when the worker name is the same" do
- @job.lock_exclusively!(5.minutes, 'worker1').should be_true
- @job.lock_exclusively!(5.minutes, 'worker1').should be_true
- @job.lock_exclusively!(5.minutes, 'worker1').should be_true
+ it "should allow a second worker to get exclusive access if the timeout has passed" do
+ described_class.reserve('worker2', 1.minute).should == @job
+ end
end
end
@@ -354,56 +327,6 @@ def create_job(opts = {})
end
- context "worker prioritization" do
- before(:each) do
- @worker = Delayed::Worker.new(:max_priority => 5, :min_priority => -5, :quiet => true)
- end
-
- it "should only work_off jobs that are >= min_priority" do
- create_job(:priority => -10)
- create_job(:priority => 0)
- @worker.work_off
-
- SimpleJob.runs.should == 1
- end
-
- it "should only work_off jobs that are <= max_priority" do
- create_job(:priority => 10)
- create_job(:priority => 0)
-
- @worker.work_off
-
- SimpleJob.runs.should == 1
- end
- end
-
- context "while running with locked and expired jobs" do
- before(:each) do
- @worker.name = 'worker1'
- end
-
- it "should not run jobs locked by another worker" do
- create_job(:locked_by => 'other_worker', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
- lambda { @worker.work_off }.should_not change { SimpleJob.runs }
- end
-
- it "should run open jobs" do
- create_job
- lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
- end
-
- it "should run expired jobs" do
- expired_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Worker.max_run_time)
- create_job(:locked_by => 'other_worker', :locked_at => expired_time)
- lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
- end
-
- it "should run own jobs" do
- create_job(:locked_by => @worker.name, :locked_at => (Delayed::Job.db_time_now - 1.minutes))
- lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
- end
- end
-
describe "failed jobs" do
before do
# reset defaults
Please sign in to comment.
Something went wrong with that request. Please try again.