diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index e15d33574..a03e44162 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -15,7 +15,7 @@ class Job < ActiveRecord::Base # Conditions to find tasks that are locked by this process or one that has # been created before now and is not currently locked. - NextTaskSQL = "(locked_by = ?) OR (run_at <= ? AND (locked_at IS NULL OR locked_at < ?))" + NextTaskSQL = "(locked_by = ?) or (run_at <= ? and (locked_at is null or locked_at < ?))" NextTaskOrder = "priority DESC, run_at ASC" ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/ @@ -121,17 +121,16 @@ def self.reserve(max_run_time = 4.hours) # to the given job. It will rise a LockError if it cannot get this lock. def lock_exclusively!(max_run_time, worker = worker_name) now = self.class.db_time_now - transaction do - affected_rows = if locked_by != worker - # We don't own this job so we will update the locked_by name and the locked_at - self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) - else - # We already own this job, this may happen if the job queue crashes. - # Simply resume and update the locked_at - self.class.update_all(["locked_at = ?", now], ["id = ? and (locked_by = ?)", id, worker]) - end - raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 + affected_rows = if locked_by != worker + # We don't own this job so we will update the locked_by name and the locked_at + self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)]) + else + # We already own this job, this may happen if the job queue crashes. + # Simply resume and update the locked_at + self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end + raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 + self.locked_at = now self.locked_by = worker end diff --git a/spec/job_spec.rb b/spec/job_spec.rb index 1056a8774..06897b0c7 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -104,7 +104,7 @@ def perform; raise 'did not work'; end @job.reschedule 'FAIL' end - describe "when another worker is already performing an task, it" do + context "when another worker is already performing an task, it" do before :each do Delayed::Job.worker_name = 'worker1' @@ -113,12 +113,10 @@ def perform; raise 'did not work'; end it "should not allow a second worker to get exclusive access" do lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError) - end - - it "should not allow a second worker to get exclusive access if the timeout has passed" do - - @job.lock_exclusively! 1.minute, 'worker2' - + end + + it "should allow a second worker to get exclusive access if the timeout has passed" do + lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError) end it "should be able to get access to the task if it was started more then max_age ago" do @@ -131,7 +129,6 @@ def perform; raise 'did not work'; end @job.locked_at.should > 1.minute.ago end - it "should be able to get exclusive access again when the worker name is the same" do @job.lock_exclusively! 5.minutes, 'worker1' @job.lock_exclusively! 5.minutes, 'worker1' @@ -173,7 +170,7 @@ def perform; raise 'did not work'; end end - context "when retreiving jobs from the queue" do + context "when pulling jobs off the queue, it" do before(:each) do @job = Delayed::Job.create( :payload_object => SimpleJob.new, @@ -181,16 +178,15 @@ def perform; raise 'did not work'; end :locked_at => Delayed::Job.db_time_now - 5.minutes) end - it "should process jobs that haven't been processed yet and remove them from the queue" do + it "should pull jobs off the queue that haven't been completed yet" do Delayed::Job.find_available.length.should == 1 SimpleJob.runs.should == 0 Delayed::Job.work_off(1) SimpleJob.runs.should == 1 Delayed::Job.find_available.length.should == 0 end - - # TODO: verify that the datetime fields are not changed (test the Tx work). - it "should leave the queue in a consistent state if failure occurs trying to aquire a lock" do + + it "should leave the queue in a consistent state if locking fails and not run the job" do SimpleJob.runs.should == 0 @job.stub!(:lock_exclusively!).with(:any_args).once.and_raise(Delayed::Job::LockError) Delayed::Job.should_receive(:find_available).once.and_return([@job]) @@ -200,15 +196,19 @@ def perform; raise 'did not work'; end end - context "when clearing the queue of jobs" do + context "while running alongside other workers with enqueued jobs, it" do before(:each) do - # create some jobs here. + Delayed::Job.worker_name = 'worker1' + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 5.hours.ago) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => 3.hours.ago) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => 2.hours.ago) end - it "should remove only jobs created by this worker" - - after(:each) do - # delete + it "should remove only jobs created by the current worker" do + SimpleJob.runs.should == 0 + Delayed::Job.work_off(3) + SimpleJob.runs.should == 2 + Delayed::Job.find_available.length.should == 1 end end