Browse files

Increment the "attempt" count of a job when a lock is retrieved (prio…

…r to invoking the job) and also check that the max_attempts is not exceeded prior to invoking the job.
  • Loading branch information...
1 parent 21024d3 commit d022b38d6dac33b476690d06572ff583d0b11b2f Ben Fyvie committed May 6, 2011
Showing with 36 additions and 20 deletions.
  1. +36 −20 lib/delayed/job.rb
View
56 lib/delayed/job.rb
@@ -1,3 +1,5 @@
+require 'timeout'
+
module Delayed
class DeserializationError < StandardError
@@ -6,8 +8,11 @@ class DeserializationError < StandardError
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ActiveRecord::Base
- MAX_ATTEMPTS = 25
- MAX_RUN_TIME = 4.hours
+ @@max_attempts = 25
+ @@max_run_time = 4.hours
+
+ cattr_accessor :max_attempts, :max_run_time
+
set_table_name :delayed_jobs
# By default failed jobs are destroyed after too many attempts.
@@ -63,33 +68,40 @@ def payload_object=(object)
# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(message, backtrace = [], time = nil)
- if self.attempts < MAX_ATTEMPTS
+ if max_attempts_exceeded?
+ max_attempts_exceeded
+ else
time ||= Job.db_time_now + (attempts ** 4) + 5
- self.attempts += 1
self.run_at = time
self.last_error = message + "\n" + backtrace.join("\n")
self.unlock
- save!
- else
- logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
- destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
+ save!
end
end
+ def max_attempts_exceeded?
+ self.attempts > max_attempts
+ end
+
+ def max_attempts_exceeded
+ logger.info "* [JOB] PERMANENTLY removing #{self.name} because of #{attempts} consequetive failures."
+ destroy_failed_jobs ? destroy : update_attribute(:failed_at, Delayed::Job.db_time_now)
+ end
# Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked.
def run_with_lock(max_run_time, worker_name)
- logger.info "* [JOB] aquiring lock on #{name}"
+ logger.info "* [JOB] acquiring lock on #{name}"
unless lock_exclusively!(max_run_time, worker_name)
# We did not get the lock, some other worker process must have
- logger.warn "* [JOB] failed to aquire exclusive lock for #{name}"
+ logger.warn "* [JOB] failed to acquire exclusive lock for #{name}"
return nil # no work done
end
begin
+ raise "Attempted to run this job for a #{attempts} time which exceeds the max allowed of #{max_attempts}" if max_attempts_exceeded?
runtime = Benchmark.realtime do
- invoke_job # TODO: raise error if takes longer than max_run_time
+ Timeout.timeout(max_run_time.to_i) { invoke_job }
destroy
end
# TODO: warn if runtime > max_run_time ?
@@ -117,8 +129,7 @@ def self.enqueue(*args, &block)
end
# Find a few candidate jobs to run (in case some immediately get locked by others).
- # Return in random order prevent everyone trying to do same head job at once.
- def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
+ def self.find_available(limit = 5, max_run_time = max_run_time)
time_now = db_time_now
@@ -138,16 +149,14 @@ def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
conditions.unshift(sql)
- records = ActiveRecord::Base.silence do
+ ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
-
- records.sort_by { rand() }
end
# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
- def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)
+ def self.reserve_and_run_one_job(max_run_time = max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
@@ -165,11 +174,11 @@ def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
- self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
+ self.class.update_all(["locked_at = ?, locked_by = ?, attempts = ?", now, worker, self.attempts += 1], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
- self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
+ self.class.update_all(["locked_at = ?, attempts = ?", now, self.attempts += 1], ["id = ? and locked_by = ?", id, worker])
end
if affected_rows == 1
self.locked_at = now
@@ -190,6 +199,13 @@ def unlock
def log_exception(error)
logger.error "* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts"
logger.error(error)
+ begin
+ scout_response = RailzScout.submit_bug(error, nil, nil)
+ logger.error("* [JOB] #{name} posted case #{scout_response[:case_number]} to Fogbugz")
+ rescue => e
+ logger.error("* [JOB] #{name} FogBugz post raised an error: #{e}")
+ logger.error(error.backtrace)
+ end

This looks specific to your site I think.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
end
# Do num jobs and return stats on success/failure.
@@ -233,7 +249,7 @@ def deserialize(source)
return handler if handler.respond_to?(:perform)
raise DeserializationError,
- 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
+ 'Job failed to load: Unknown handler. Try to manually require the appropriate file.'
rescue TypeError, LoadError, NameError => e
raise DeserializationError,
"Job failed to load: #{e.message}. Try to manually require the required file."

0 comments on commit d022b38

Please sign in to comment.