diff --git a/lib/delayed/job.rb b/lib/delayed/job.rb index cf2e29b61..792986db8 100644 --- a/lib/delayed/job.rb +++ b/lib/delayed/job.rb @@ -3,6 +3,8 @@ module Delayed class DeserializationError < StandardError end + # A job object that is persisted to the database. + # Contains the work object as a YAML field. class Job < ActiveRecord::Base MAX_ATTEMPTS = 25 MAX_RUN_TIME = 4.hours @@ -29,9 +31,7 @@ class Job < ActiveRecord::Base self.min_priority = nil self.max_priority = nil - class LockError < StandardError - end - + # When a worker is exiting, make sure we don't have any locked jobs. def self.clear_locks! update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name]) end @@ -60,6 +60,8 @@ def payload_object=(object) self['handler'] = object.to_yaml end + # Reschedule the job in the future (when a job fails). + # Uses an exponential scale depending on the number of failed attempts. def reschedule(message, backtrace = [], time = nil) if self.attempts < MAX_ATTEMPTS time ||= Job.db_time_now + (attempts ** 4) + 5 @@ -75,6 +77,32 @@ def reschedule(message, backtrace = [], time = nil) end end + + # Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked. + def run_with_lock(max_run_time, worker_name) + logger.info "* [JOB] aquiring lock on #{name}" + unless lock_exclusively!(max_run_time, worker_name) + # We did not get the lock, some other worker process must have + logger.warn "* [JOB] failed to aquire exclusive lock for #{name}" + return nil # no work done + end + + begin + runtime = Benchmark.realtime do + invoke_job # TODO: raise error if takes longer than max_run_time + destroy + end + # TODO: warn if runtime > max_run_time ? + logger.info "* [JOB] #{name} completed after %.4f" % runtime + return true # did work + rescue Exception => e + reschedule e.message, e.backtrace + log_exception(e) + return false # work failed + end + end + + # Add a job to the queue def self.enqueue(*args, &block) object = block_given? ? EvaledJob.new(&block) : args.shift @@ -88,6 +116,8 @@ def self.enqueue(*args, &block) Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at) end + # Find a few candidate jobs to run (in case some immediately get locked by others). + # Return in random order prevent everyone trying to do same head job at once. def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) time_now = db_time_now @@ -115,38 +145,22 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME) records.sort_by { rand() } end - # Get the payload of the next job we can get an exclusive lock on. + # Run the next job we can get an exclusive lock on. # If no jobs are left we return nil - def self.reserve(max_run_time = MAX_RUN_TIME, &block) + def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME) - # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next. + # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next. # this leads to a more even distribution of jobs across the worker processes find_available(5, max_run_time).each do |job| - begin - logger.info "* [JOB] aquiring lock on #{job.name}" - job.lock_exclusively!(max_run_time, worker_name) - runtime = Benchmark.realtime do - invoke_job(job.payload_object, &block) - job.destroy - end - logger.info "* [JOB] #{job.name} completed after %.4f" % runtime - - return job - rescue LockError - # We did not get the lock, some other worker process must have - logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}" - rescue StandardError => e - job.reschedule e.message, e.backtrace - log_exception(job, e) - return job - end + t = job.run_with_lock(max_run_time, worker_name) + return t unless t == nil # return if we did work (good or bad) end - nil + nil # we didn't do any work, all 5 were not lockable end - # This method is used internally by reserve method to ensure exclusive access - # to the given job. It will rise a LockError if it cannot get this lock. + # Lock this job for this worker. + # Returns true if we have the lock, false otherwise. def lock_exclusively!(max_run_time, worker = worker_name) now = self.class.db_time_now affected_rows = if locked_by != worker @@ -157,46 +171,50 @@ def lock_exclusively!(max_run_time, worker = worker_name) # Simply resume and update the locked_at self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker]) end - raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1 - - self.locked_at = now - self.locked_by = worker + if affected_rows == 1 + self.locked_at = now + self.locked_by = worker + return true + else + return false + end end + # Unlock this job (note: not saved to DB) def unlock self.locked_at = nil self.locked_by = nil end # This is a good hook if you need to report job processing errors in additional or different ways - def self.log_exception(job, error) - logger.error "* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts" + def log_exception(error) + logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts" logger.error(error) end + # Do num jobs and return stats on success/failure. + # Exit early if interrupted. def self.work_off(num = 100) success, failure = 0, 0 num.times do - job = self.reserve do |j| - begin - j.perform + case self.reserve_and_run_one_job + when true success += 1 - rescue + when false failure += 1 - raise - end + else + break # leave if no work could be done end - - break if job.nil? + break if $exit # leave if we're exiting end return [success, failure] end # Moved into its own method so that new_relic can trace it. - def self.invoke_job(job, &block) - block.call(job) + def invoke_job + payload_object.perform end private @@ -227,6 +245,9 @@ def attempt_to_load(klass) klass.constantize end + # Get the current time (GMT or local depending on DB) + # Note: This does not ping the DB to get the time, so all your clients + # must have syncronized clocks. def self.db_time_now (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now end diff --git a/spec/delayed_method_spec.rb b/spec/delayed_method_spec.rb index 25be6dac1..e3911dcdc 100644 --- a/spec/delayed_method_spec.rb +++ b/spec/delayed_method_spec.rb @@ -78,13 +78,9 @@ def read(story) Delayed::Job.count.should == 1 - output = nil + Delayed::Job.reserve_and_run_one_job - Delayed::Job.reserve do |e| - output = e.perform - end - - output.should == true + Delayed::Job.count.should == 0 end @@ -129,4 +125,4 @@ def read(story) job.payload_object.perform.should == 'Once upon...' end -end \ No newline at end of file +end diff --git a/spec/job_spec.rb b/spec/job_spec.rb index 7d7a735dc..b88e73895 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -184,11 +184,11 @@ def perform; @@runs += 1; end end it "should not allow a second worker to get exclusive access" do - lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError) + @job.lock_exclusively!(4.hours, 'worker2').should == false end it "should allow a second worker to get exclusive access if the timeout has passed" do - lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError) + @job.lock_exclusively!(1.minute, 'worker2').should == true end it "should be able to get access to the task if it was started more then max_age ago" do @@ -283,7 +283,7 @@ def perform; @@runs += 1; end it "should leave the queue in a consistent state and not run the job if locking fails" do SimpleJob.runs.should == 0 - @job.stub!(:lock_exclusively!).with(any_args).once.and_raise(Delayed::Job::LockError) + @job.stub!(:lock_exclusively!).with(any_args).once.and_return(false) Delayed::Job.should_receive(:find_available).once.and_return([@job]) Delayed::Job.work_off(1) SimpleJob.runs.should == 0 @@ -291,21 +291,55 @@ def perform; @@runs += 1; end end - context "while running alongside other workers with enqueued jobs, it" do + context "while running alongside other workers that locked jobs, it" do before(:each) do Delayed::Job.worker_name = 'worker1' - Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 3.minutes)) - Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 11.minutes)) - Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 2.minutes)) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes)) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes)) + Delayed::Job.create(:payload_object => SimpleJob.new) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes)) end - - it "should only find jobs if the lock has expired reguardless of the worker" do - SimpleJob.runs.should == 0 - Delayed::Job.work_off(5) - SimpleJob.runs.should == 2 - Delayed::Job.find_available(5, 10.minutes).length.should == 1 + + it "should ingore locked jobs from other workers" do + Delayed::Job.worker_name = 'worker3' + SimpleJob.runs.should == 0 + Delayed::Job.work_off + SimpleJob.runs.should == 1 # runs the one open job end - + + it "should find our own jobs regardless of locks" do + Delayed::Job.worker_name = 'worker1' + SimpleJob.runs.should == 0 + Delayed::Job.work_off + SimpleJob.runs.should == 3 # runs open job plus worker1 jobs that were already locked + end + end + + context "while running with locked and expired jobs, it" do + before(:each) do + Delayed::Job.worker_name = 'worker1' + exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Job::MAX_RUN_TIME) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => exp_time) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker2', :locked_at => (Delayed::Job.db_time_now - 1.minutes)) + Delayed::Job.create(:payload_object => SimpleJob.new) + Delayed::Job.create(:payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => (Delayed::Job.db_time_now - 1.minutes)) + end + + it "should only find unlocked and expired jobs" do + Delayed::Job.worker_name = 'worker3' + SimpleJob.runs.should == 0 + Delayed::Job.work_off + SimpleJob.runs.should == 2 # runs the one open job and one expired job + end + + it "should ignore locks when finding our own jobs" do + Delayed::Job.worker_name = 'worker1' + SimpleJob.runs.should == 0 + Delayed::Job.work_off + SimpleJob.runs.should == 3 # runs open job plus worker1 jobs + # This is useful in the case of a crash/restart on worker1, but make sure multiple workers on the same host have unique names! + end + end end