Skip to content

Commit

Permalink
Added Delayed::Job.reserve for a cleaner way to find and lock one ava…
Browse files Browse the repository at this point in the history
…ilable job
  • Loading branch information
bkeepers committed Nov 12, 2010
1 parent def384c commit 1febad8
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 107 deletions.
6 changes: 6 additions & 0 deletions lib/delayed/backend/base.rb
Expand Up @@ -32,6 +32,12 @@ def enqueue(*args)
end
end

def reserve(worker, max_run_time = Worker.max_run_time)
find_available(worker, 5, max_run_time).detect do |job|
job.lock_exclusively!(max_run_time, worker)
end
end

# Hook method that is called before a new worker is forked
def before_fork
end
Expand Down
137 changes: 30 additions & 107 deletions lib/delayed/backend/shared_spec.rb
Expand Up @@ -133,77 +133,50 @@ def create_job(opts = {})
end
end

describe "find_available" do
it "should not find failed jobs" do
@job = create_job :attempts => 50, :failed_at => described_class.db_time_now
described_class.find_available('worker', 5, 1.second).should_not include(@job)
describe "reserve" do
it "should not reserve failed jobs" do
create_job :attempts => 50, :failed_at => described_class.db_time_now
described_class.reserve('worker', 1.second).should be_nil
end

it "should not find jobs scheduled for the future" do
@job = create_job :run_at => (described_class.db_time_now + 1.minute)
described_class.find_available('worker', 5, 4.hours).should_not include(@job)
it "should not reserve jobs scheduled for the future" do
create_job :run_at => (described_class.db_time_now + 1.minute)
described_class.reserve('worker', 4.hours).should be_nil
end

it "should not find jobs locked by another worker" do
@job = create_job(:locked_by => 'other_worker', :locked_at => described_class.db_time_now - 1.minute)
described_class.find_available('worker', 5, 4.hours).should_not include(@job)
it "should lock the job so other workers can't reserve it" do
job = create_job
described_class.reserve('worker1', 4.hours).should == job
described_class.reserve('worker2', 4.hours).should be_nil
end

it "should find open jobs" do
@job = create_job
described_class.find_available('worker', 5, 4.hours).should include(@job)
it "should reserve open jobs" do
job = create_job
described_class.reserve('worker', 4.hours).should == job
end

it "should find expired jobs" do
@job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now - 2.minutes)
described_class.find_available('worker', 5, 1.minute).should include(@job)
it "should reserve expired jobs" do
job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now - 2.minutes)
described_class.reserve('worker', 1.minute).should == job
end

it "should find own jobs" do
@job = create_job(:locked_by => 'worker', :locked_at => (described_class.db_time_now - 1.minutes))
described_class.find_available('worker', 5, 4.hours).should include(@job)
it "should reserve own jobs" do
job = create_job(:locked_by => 'worker', :locked_at => (described_class.db_time_now - 1.minutes))
described_class.reserve('worker', 4.hours).should == job
end

it "should find only the right amount of jobs" do
10.times { create_job }
described_class.find_available('worker', 7, 4.hours).should have(7).jobs
end
end

context "when another worker is already performing a task, it" do
before :each do
@job = described_class.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => described_class.db_time_now - 5.minutes
end

it "should not allow a second worker to get exclusive access" do
@job.lock_exclusively!(4.hours, 'worker2').should == false
end

it "should allow a second worker to get exclusive access if the timeout has passed" do
@job.lock_exclusively!(1.minute, 'worker2').should == true
end

it "should be able to get access to the task if it was started more then max_age ago" do
@job.locked_at = described_class.db_time_now - 5.hours
@job.save

@job.lock_exclusively!(4.hours, 'worker2').should be_true

described_class.find_available('worker2').should_not be_empty
end

it "should not be found by another worker" do
described_class.find_available('worker2', 1, 6.minutes).length.should == 0
end
context "when another worker is already performing a task" do
before :each do
@job = described_class.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => described_class.db_time_now - 5.minutes
end

it "should be found by another worker if the time has expired" do
described_class.find_available('worker2', 1, 4.minutes).length.should == 1
end
it "should not allow a second worker to get exclusive access" do
described_class.reserve('worker2').should be_nil
end

it "should be able to get exclusive access again when the worker name is the same" do
@job.lock_exclusively!(5.minutes, 'worker1').should be_true
@job.lock_exclusively!(5.minutes, 'worker1').should be_true
@job.lock_exclusively!(5.minutes, 'worker1').should be_true
it "should allow a second worker to get exclusive access if the timeout has passed" do
described_class.reserve('worker2', 1.minute).should == @job
end
end
end

Expand Down Expand Up @@ -354,56 +327,6 @@ def create_job(opts = {})

end

context "worker prioritization" do
before(:each) do
@worker = Delayed::Worker.new(:max_priority => 5, :min_priority => -5, :quiet => true)
end

it "should only work_off jobs that are >= min_priority" do
create_job(:priority => -10)
create_job(:priority => 0)
@worker.work_off

SimpleJob.runs.should == 1
end

it "should only work_off jobs that are <= max_priority" do
create_job(:priority => 10)
create_job(:priority => 0)

@worker.work_off

SimpleJob.runs.should == 1
end
end

context "while running with locked and expired jobs" do
before(:each) do
@worker.name = 'worker1'
end

it "should not run jobs locked by another worker" do
create_job(:locked_by => 'other_worker', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
lambda { @worker.work_off }.should_not change { SimpleJob.runs }
end

it "should run open jobs" do
create_job
lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
end

it "should run expired jobs" do
expired_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Worker.max_run_time)
create_job(:locked_by => 'other_worker', :locked_at => expired_time)
lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
end

it "should run own jobs" do
create_job(:locked_by => @worker.name, :locked_at => (Delayed::Job.db_time_now - 1.minutes))
lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
end
end

describe "failed jobs" do
before do
# reset defaults
Expand Down

0 comments on commit 1febad8

Please sign in to comment.