Skip to content

Commit

Permalink
fixing spec that was returning a false positive. adding spec for conc…
Browse files Browse the repository at this point in the history
…urrency.
  • Loading branch information
Rob Ares committed Nov 11, 2008
1 parent 149f45f commit be23e26
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
21 changes: 10 additions & 11 deletions lib/delayed/job.rb
Expand Up @@ -15,7 +15,7 @@ class Job < ActiveRecord::Base

# Conditions to find tasks that are locked by this process or one that has
# been created before now and is not currently locked.
NextTaskSQL = "(locked_by = ?) OR (run_at <= ? AND (locked_at IS NULL OR locked_at < ?))"
NextTaskSQL = "(locked_by = ?) or (run_at <= ? and (locked_at is null or locked_at < ?))"
NextTaskOrder = "priority DESC, run_at ASC"
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

Expand Down Expand Up @@ -121,17 +121,16 @@ def self.reserve(max_run_time = 4.hours)
# to the given job. It will rise a LockError if it cannot get this lock.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
transaction do
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1

self.locked_at = now
self.locked_by = worker
end
Expand Down
38 changes: 19 additions & 19 deletions spec/job_spec.rb
Expand Up @@ -104,7 +104,7 @@ def perform; raise 'did not work'; end
@job.reschedule 'FAIL'
end

describe "when another worker is already performing an task, it" do
context "when another worker is already performing an task, it" do

before :each do
Delayed::Job.worker_name = 'worker1'
Expand All @@ -113,12 +113,10 @@ def perform; raise 'did not work'; end

it "should not allow a second worker to get exclusive access" do
lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
end

it "should not allow a second worker to get exclusive access if the timeout has passed" do

@job.lock_exclusively! 1.minute, 'worker2'

end

it "should allow a second worker to get exclusive access if the timeout has passed" do
lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError)
end

it "should be able to get access to the task if it was started more then max_age ago" do
Expand All @@ -131,7 +129,6 @@ def perform; raise 'did not work'; end
@job.locked_at.should > 1.minute.ago
end


it "should be able to get exclusive access again when the worker name is the same" do
@job.lock_exclusively! 5.minutes, 'worker1'
@job.lock_exclusively! 5.minutes, 'worker1'
Expand Down Expand Up @@ -173,24 +170,23 @@ def perform; raise 'did not work'; end

end

context "when retreiving jobs from the queue" do
context "when pulling jobs off the queue, it" do
before(:each) do
@job = Delayed::Job.create(
:payload_object => SimpleJob.new,
:locked_by => 'worker1',
:locked_at => Delayed::Job.db_time_now - 5.minutes)
end

it "should process jobs that haven't been processed yet and remove them from the queue" do
it "should pull jobs off the queue that haven't been completed yet" do
Delayed::Job.find_available.length.should == 1
SimpleJob.runs.should == 0
Delayed::Job.work_off(1)
SimpleJob.runs.should == 1
Delayed::Job.find_available.length.should == 0
end

# TODO: verify that the datetime fields are not changed (test the Tx work).
it "should leave the queue in a consistent state if failure occurs trying to aquire a lock" do

it "should leave the queue in a consistent state if locking fails and not run the job" do
SimpleJob.runs.should == 0
@job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError)
Delayed::Job.should_receive(:find_available).once.and_return([@job])
Expand All @@ -200,15 +196,19 @@ def perform; raise 'did not work'; end

end

context "when clearing the queue of jobs" do
context "while running alongside other workers with enqueued jobs, it" do
before(:each) do
# create some jobs here.
Delayed::Job.worker_name = 'worker1'
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 5.hours.ago)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => 3.hours.ago)
Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 2.hours.ago)
end

it "should remove only jobs created by this worker"

after(:each) do
# delete
it "should remove only jobs created by the current worker" do
SimpleJob.runs.should == 0
Delayed::Job.work_off(3)
SimpleJob.runs.should == 2
Delayed::Job.find_available.length.should == 1
end

end
Expand Down

0 comments on commit be23e26

Please sign in to comment.