Skip to content

Commit

Permalink
Update shared spec to only call #reserve method on job
Browse files Browse the repository at this point in the history
#reserve will become part of the public backend API. Other backends can still implement #find_available and #lock_exclusively and call them, but they must implement #reserve.
  • Loading branch information
bkeepers committed Nov 24, 2010
1 parent 8787a11 commit f4fc9a2
Showing 1 changed file with 36 additions and 57 deletions.
93 changes: 36 additions & 57 deletions lib/delayed/backend/shared_spec.rb
@@ -1,6 +1,8 @@
require File.expand_path('../../../../spec/sample_jobs', __FILE__)

shared_examples_for 'a delayed_job backend' do
let(:worker) { Delayed::Worker.new }

def create_job(opts = {})
described_class.create(opts.merge(:payload_object => SimpleJob.new))
end
Expand Down Expand Up @@ -137,58 +139,39 @@ def create_job(opts = {})
describe "reserve" do
before do
Delayed::Worker.max_run_time = 2.minutes
@worker = Delayed::Worker.new(:quiet => true)
end

it "should not reserve failed jobs" do
create_job :attempts => 50, :failed_at => described_class.db_time_now
described_class.reserve(@worker).should be_nil
described_class.reserve(worker).should be_nil
end

it "should not reserve jobs scheduled for the future" do
create_job :run_at => (described_class.db_time_now + 1.minute)
described_class.reserve(@worker).should be_nil
create_job :run_at => described_class.db_time_now + 1.minute
described_class.reserve(worker).should be_nil
end

it "should lock the job so other workers can't reserve it" do
it "should not reserve jobs locked by other workers" do
job = create_job
described_class.reserve(@worker).should == job
new_worker = Delayed::Worker.new(:quiet => true)
new_worker.name = 'worker2'
described_class.reserve(new_worker).should be_nil
other_worker = Delayed::Worker.new
other_worker.name = 'other_worker'
described_class.reserve(other_worker).should == job
described_class.reserve(worker).should be_nil
end

it "should reserve open jobs" do
job = create_job
described_class.reserve(@worker).should == job
described_class.reserve(worker).should == job
end

it "should reserve expired jobs" do
job = create_job(:locked_by => @worker.name, :locked_at => described_class.db_time_now - 3.minutes)
described_class.reserve(@worker).should == job
job = create_job(:locked_by => worker.name, :locked_at => described_class.db_time_now - 3.minutes)
described_class.reserve(worker).should == job
end

it "should reserve own jobs" do
job = create_job(:locked_by => @worker.name, :locked_at => (described_class.db_time_now - 1.minutes))
described_class.reserve(@worker).should == job
end
end

context "when another worker has worked on a task since the job was found to be available, it" do

before :each do
@job = described_class.create :payload_object => SimpleJob.new
@job_copy_for_worker_2 = described_class.find(@job.id)
end

it "should not allow a second worker to get exclusive access if already successfully processed by worker1" do
@job.destroy
@job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
end

it "should not allow a second worker to get exclusive access if failed to be processed by worker1 and run_at time is now in future (due to backing off behaviour)" do
@job.update_attributes(:attempts => 1, :run_at => described_class.db_time_now + 1.day)
@job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
job = create_job(:locked_by => worker.name, :locked_at => (described_class.db_time_now - 1.minutes))
described_class.reserve(worker).should == job
end
end

Expand Down Expand Up @@ -223,7 +206,8 @@ def create_job(opts = {})

it "should fetch jobs ordered by priority" do
10.times { described_class.enqueue SimpleJob.new, rand(10) }
jobs = described_class.find_available('worker', 10)
jobs = []
10.times { jobs << described_class.reserve(worker) }
jobs.size.should == 10
jobs.each_cons(2) do |a, b|
a.priority.should <= b.priority
Expand All @@ -234,32 +218,30 @@ def create_job(opts = {})
min = 5
Delayed::Worker.min_priority = min
10.times {|i| described_class.enqueue SimpleJob.new, i }
jobs = described_class.find_available('worker', 10)
jobs.each {|job| job.priority.should >= min}
5.times { described_class.reserve(worker).priority.should >= min }
end

it "should only find jobs less than or equal to max priority" do
max = 5
Delayed::Worker.max_priority = max
10.times {|i| described_class.enqueue SimpleJob.new, i }
jobs = described_class.find_available('worker', 10)
jobs.each {|job| job.priority.should <= max}
5.times { described_class.reserve(worker).priority.should <= max }
end
end

context "clear_locks!" do
before do
@job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now)
@job = create_job(:locked_by => 'worker1', :locked_at => described_class.db_time_now)
end

it "should clear locks for the given worker" do
described_class.clear_locks!('worker')
described_class.find_available('worker2', 5, 1.minute).should include(@job)
described_class.clear_locks!('worker1')
described_class.reserve(worker).should == @job
end

it "should not clear locks for other workers" do
described_class.clear_locks!('worker1')
described_class.find_available('worker1', 5, 1.minute).should_not include(@job)
described_class.clear_locks!('different_worker')
described_class.reserve(worker).should_not == @job
end
end

Expand Down Expand Up @@ -307,9 +289,6 @@ def create_job(opts = {})
describe "worker integration" do
before do
Delayed::Job.delete_all

@worker = Delayed::Worker.new(:max_priority => nil, :min_priority => nil, :quiet => true)

SimpleJob.runs = 0
end

Expand All @@ -319,7 +298,7 @@ def create_job(opts = {})
old_max_run_time = Delayed::Worker.max_run_time
Delayed::Worker.max_run_time = 1.second
@job = Delayed::Job.create :payload_object => LongRunningJob.new
@worker.run(@job)
worker.run(@job)
@job.reload.last_error.should =~ /expired/
@job.attempts.should == 1
ensure
Expand All @@ -331,7 +310,7 @@ def create_job(opts = {})
it "should mark the job as failed" do
Delayed::Worker.destroy_failed_jobs = false
job = described_class.create! :handler => "--- !ruby/object:JobThatDoesNotExist {}"
@worker.work_off
worker.work_off
job.reload
job.failed_at.should_not be_nil
end
Expand All @@ -350,15 +329,15 @@ def create_job(opts = {})
it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.max_attempts = 1
@worker.run(@job)
worker.run(@job)
@job.reload
@job.last_error.should =~ /did not work/
@job.attempts.should == 1
@job.failed_at.should_not be_nil
end

it "should re-schedule jobs after failing" do
@worker.work_off
worker.work_off
@job.reload
@job.last_error.should =~ /did not work/
@job.last_error.should =~ /sample_jobs.rb:\d+:in `perform'/
Expand All @@ -371,7 +350,7 @@ def create_job(opts = {})

it 'should re-schedule with handler provided time if present' do
@job = Delayed::Job.enqueue(CustomRescheduleJob.new(99.minutes))
@worker.run(@job)
worker.run(@job)
@job.reload

(Delayed::Job.db_time_now + 99.minutes - @job.run_at).abs.should < 1
Expand All @@ -381,7 +360,7 @@ def create_job(opts = {})
error_with_nil_message = StandardError.new
error_with_nil_message.stub!(:message).and_return nil
@job.stub!(:invoke_job).and_raise error_with_nil_message
lambda{@worker.run(@job)}.should_not raise_error
lambda{worker.run(@job)}.should_not raise_error
end
end

Expand All @@ -399,7 +378,7 @@ def create_job(opts = {})

it "should run that hook" do
@job.payload_object.should_receive :failure
Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
end
end

Expand All @@ -422,7 +401,7 @@ def create_job(opts = {})

it "should not try to run that hook" do
lambda do
Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
end.should_not raise_exception(NoMethodError)
end
end
Expand All @@ -437,12 +416,12 @@ def create_job(opts = {})

it "should be destroyed if it failed more than Worker.max_attempts times" do
@job.should_receive(:destroy)
Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
end

it "should not be destroyed if failed fewer than Worker.max_attempts times" do
@job.should_not_receive(:destroy)
(Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
(Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
end
end

Expand All @@ -455,12 +434,12 @@ def create_job(opts = {})

it "should be failed if it failed more than Worker.max_attempts times" do
@job.reload.failed_at.should == nil
Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
@job.reload.failed_at.should_not == nil
end

it "should not be failed if it failed fewer than Worker.max_attempts times" do
(Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
(Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
@job.reload.failed_at.should == nil
end
end
Expand Down

0 comments on commit f4fc9a2

Please sign in to comment.