From 1febad8abb4e5b9c4f96646698369f39785b2d88 Mon Sep 17 00:00:00 2001 From: Brandon Keepers Date: Thu, 16 Sep 2010 23:36:39 -0400 Subject: [PATCH] Added Delayed::Job.reserve for a cleaner way to find and lock one available job --- lib/delayed/backend/base.rb | 6 ++ lib/delayed/backend/shared_spec.rb | 137 +++++++---------------------- 2 files changed, 36 insertions(+), 107 deletions(-) diff --git a/lib/delayed/backend/base.rb b/lib/delayed/backend/base.rb index e864dc449..23491deb1 100644 --- a/lib/delayed/backend/base.rb +++ b/lib/delayed/backend/base.rb @@ -32,6 +32,12 @@ def enqueue(*args) end end + def reserve(worker, max_run_time = Worker.max_run_time) + find_available(worker, 5, max_run_time).detect do |job| + job.lock_exclusively!(max_run_time, worker) + end + end + # Hook method that is called before a new worker is forked def before_fork end diff --git a/lib/delayed/backend/shared_spec.rb b/lib/delayed/backend/shared_spec.rb index eeb375ffe..7011eb627 100644 --- a/lib/delayed/backend/shared_spec.rb +++ b/lib/delayed/backend/shared_spec.rb @@ -133,77 +133,50 @@ def create_job(opts = {}) end end - describe "find_available" do - it "should not find failed jobs" do - @job = create_job :attempts => 50, :failed_at => described_class.db_time_now - described_class.find_available('worker', 5, 1.second).should_not include(@job) + describe "reserve" do + it "should not reserve failed jobs" do + create_job :attempts => 50, :failed_at => described_class.db_time_now + described_class.reserve('worker', 1.second).should be_nil end - it "should not find jobs scheduled for the future" do - @job = create_job :run_at => (described_class.db_time_now + 1.minute) - described_class.find_available('worker', 5, 4.hours).should_not include(@job) + it "should not reserve jobs scheduled for the future" do + create_job :run_at => (described_class.db_time_now + 1.minute) + described_class.reserve('worker', 4.hours).should be_nil end - it "should not find jobs locked by another worker" do - @job = create_job(:locked_by => 'other_worker', :locked_at => described_class.db_time_now - 1.minute) - described_class.find_available('worker', 5, 4.hours).should_not include(@job) + it "should lock the job so other workers can't reserve it" do + job = create_job + described_class.reserve('worker1', 4.hours).should == job + described_class.reserve('worker2', 4.hours).should be_nil end - it "should find open jobs" do - @job = create_job - described_class.find_available('worker', 5, 4.hours).should include(@job) + it "should reserve open jobs" do + job = create_job + described_class.reserve('worker', 4.hours).should == job end - it "should find expired jobs" do - @job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now - 2.minutes) - described_class.find_available('worker', 5, 1.minute).should include(@job) + it "should reserve expired jobs" do + job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now - 2.minutes) + described_class.reserve('worker', 1.minute).should == job end - it "should find own jobs" do - @job = create_job(:locked_by => 'worker', :locked_at => (described_class.db_time_now - 1.minutes)) - described_class.find_available('worker', 5, 4.hours).should include(@job) + it "should reserve own jobs" do + job = create_job(:locked_by => 'worker', :locked_at => (described_class.db_time_now - 1.minutes)) + described_class.reserve('worker', 4.hours).should == job end - it "should find only the right amount of jobs" do - 10.times { create_job } - described_class.find_available('worker', 7, 4.hours).should have(7).jobs - end - end - - context "when another worker is already performing a task, it" do - before :each do - @job = described_class.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => described_class.db_time_now - 5.minutes - end - - it "should not allow a second worker to get exclusive access" do - @job.lock_exclusively!(4.hours, 'worker2').should == false - end - - it "should allow a second worker to get exclusive access if the timeout has passed" do - @job.lock_exclusively!(1.minute, 'worker2').should == true - end - - it "should be able to get access to the task if it was started more then max_age ago" do - @job.locked_at = described_class.db_time_now - 5.hours - @job.save - - @job.lock_exclusively!(4.hours, 'worker2').should be_true - - described_class.find_available('worker2').should_not be_empty - end - - it "should not be found by another worker" do - described_class.find_available('worker2', 1, 6.minutes).length.should == 0 - end + context "when another worker is already performing a task" do + before :each do + @job = described_class.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => described_class.db_time_now - 5.minutes + end - it "should be found by another worker if the time has expired" do - described_class.find_available('worker2', 1, 4.minutes).length.should == 1 - end + it "should not allow a second worker to get exclusive access" do + described_class.reserve('worker2').should be_nil + end - it "should be able to get exclusive access again when the worker name is the same" do - @job.lock_exclusively!(5.minutes, 'worker1').should be_true - @job.lock_exclusively!(5.minutes, 'worker1').should be_true - @job.lock_exclusively!(5.minutes, 'worker1').should be_true + it "should allow a second worker to get exclusive access if the timeout has passed" do + described_class.reserve('worker2', 1.minute).should == @job + end end end @@ -354,56 +327,6 @@ def create_job(opts = {}) end - context "worker prioritization" do - before(:each) do - @worker = Delayed::Worker.new(:max_priority => 5, :min_priority => -5, :quiet => true) - end - - it "should only work_off jobs that are >= min_priority" do - create_job(:priority => -10) - create_job(:priority => 0) - @worker.work_off - - SimpleJob.runs.should == 1 - end - - it "should only work_off jobs that are <= max_priority" do - create_job(:priority => 10) - create_job(:priority => 0) - - @worker.work_off - - SimpleJob.runs.should == 1 - end - end - - context "while running with locked and expired jobs" do - before(:each) do - @worker.name = 'worker1' - end - - it "should not run jobs locked by another worker" do - create_job(:locked_by => 'other_worker', :locked_at => (Delayed::Job.db_time_now - 1.minutes)) - lambda { @worker.work_off }.should_not change { SimpleJob.runs } - end - - it "should run open jobs" do - create_job - lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1) - end - - it "should run expired jobs" do - expired_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Worker.max_run_time) - create_job(:locked_by => 'other_worker', :locked_at => expired_time) - lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1) - end - - it "should run own jobs" do - create_job(:locked_by => @worker.name, :locked_at => (Delayed::Job.db_time_now - 1.minutes)) - lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1) - end - end - describe "failed jobs" do before do # reset defaults