Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Made a gem out of delayed_job so I can use it in my services. Had to …

…modify Worker to not infer DJ is running in a Rails instance.
  • Loading branch information...
commit 9ba6b0872375b55eadf9979b8773efe9132bda2c 1 parent bcf8d1d
Justin Knowlden authored
View
1  .gitignore
@@ -0,0 +1 @@
+*.gem
View
2  HISTORY.txt
@@ -0,0 +1,2 @@
+== 0.1.0 / 2008-11-28
+ * First of many versions
View
39 delayed_job.gemspec
@@ -0,0 +1,39 @@
+Gem::Specification.new do |s|
+ s.name = "delayed_job"
+ s.version = "0.1.0"
+ s.date = "2008-11-28"
+ s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify"
+ s.email = "tobi@leetsoft.com"
+ s.homepage = "http://github.com/tobi/delayed_job/tree/master"
+ s.description = "Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks."
+ s.authors = ["Tobias Lütke", "Justin Knowlden"]
+
+ # s.bindir = "bin"
+ # s.executables = ["delayed_job"]
+ # s.default_executable = "delayed_job"
+
+ s.has_rdoc = false
+ s.rdoc_options = ["--main", "README.textile"]
+ s.extra_rdoc_files = ["HISTORY.txt", "README.textile"]
+
+ # run git ls-files to get an updated list
+ s.files = %w[
+ HISTORY.txt
+ MIT-LICENSE
+ README.textile
+ delayed_job.gemspec
+ init.rb
+ lib/delayed/job.rb
+ lib/delayed/message_sending.rb
+ lib/delayed/performable_method.rb
+ lib/delayed/worker.rb
+ lib/delayed_job.rb
+ tasks/jobs.rake
+ ]
+ s.test_files = %w[
+ spec/database.rb
+ spec/delayed_method_spec.rb
+ spec/job_spec.rb
+ spec/story_spec.rb
+ ]
+end
View
6 init.rb
@@ -1,5 +1 @@
-require File.dirname(__FILE__) + '/lib/delayed/message_sending'
-require File.dirname(__FILE__) + '/lib/delayed/performable_method'
-require File.dirname(__FILE__) + '/lib/delayed/job'
-
-Object.send(:include, Delayed::MessageSending)
+require File.dirname(__FILE__) + '/lib/delayed_job'
View
46 lib/delayed/job.rb
@@ -15,8 +15,8 @@ class Job < ActiveRecord::Base
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true
- # Every worker has a unique name which by default is the pid of the process.
- # There are some advantages to overriding this with something which survives worker retarts:
+ # Every worker has a unique name which by default is the pid of the process.
+ # There are some advantages to overriding this with something which survives worker retarts:
# Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
cattr_accessor :worker_name
self.worker_name = "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
@@ -25,10 +25,10 @@ class Job < ActiveRecord::Base
NextTaskOrder = 'priority DESC, run_at ASC'
ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
-
+
cattr_accessor :min_priority, :max_priority
self.min_priority = nil
- self.max_priority = nil
+ self.max_priority = nil
class LockError < StandardError
end
@@ -45,8 +45,8 @@ def failed?
def payload_object
@payload_object ||= deserialize(self['handler'])
end
-
- def name
+
+ def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
@@ -80,13 +80,13 @@ def self.enqueue(*args, &block)
if block_given?
priority = args.first || 0
run_at = args.second
-
+
Job.create(:payload_object => EvaledJob.new(&block), :priority => priority.to_i, :run_at => run_at)
else
object = args.first
priority = args.second || 0
run_at = args.third
-
+
unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
@@ -96,32 +96,32 @@ def self.enqueue(*args, &block)
end
def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
-
- time_now = db_time_now
-
+
+ time_now = db_time_now
+
sql = NextTaskSQL.dup
conditions = [time_now, time_now - max_run_time, worker_name]
-
+
if self.min_priority
sql << ' AND (priority >= ?)'
conditions << min_priority
end
-
+
if self.max_priority
sql << ' AND (priority <= ?)'
- conditions << max_priority
+ conditions << max_priority
end
- conditions.unshift(sql)
-
+ conditions.unshift(sql)
+
records = ActiveRecord::Base.silence do
find(:all, :conditions => conditions, :order => NextTaskOrder, :limit => limit)
end
-
+
records.sort { rand() }
- end
-
+ end
+
# Get the payload of the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def self.reserve(max_run_time = MAX_RUN_TIME, &block)
@@ -142,7 +142,7 @@ def self.reserve(max_run_time = MAX_RUN_TIME, &block)
rescue LockError
# We did not get the lock, some other worker process must have
logger.warn "* [JOB] failed to aquire exclusive lock for #{job.name}"
- rescue StandardError => e
+ rescue StandardError => e
job.reschedule e.message, e.backtrace
log_exception(job, e)
return job
@@ -160,16 +160,16 @@ def lock_exclusively!(max_run_time, worker = worker_name)
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?)", id, (now - max_run_time.to_i)])
else
- # We already own this job, this may happen if the job queue crashes.
+ # We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
raise LockError.new("Attempted to aquire exclusive lock failed") unless affected_rows == 1
-
+
self.locked_at = now
self.locked_by = worker
end
-
+
def unlock
self.locked_at = nil
self.locked_by = nil
View
18 lib/delayed/worker.rb
@@ -2,19 +2,21 @@ module Delayed
class Worker
SLEEP = 5
+ cattr_accessor :logger
+ self.logger = RAILS_DEFAULT_LOGGER if const_defined?(:RAILS_DEFAULT_LOGGER)
+
def initialize(options={})
- @quiet = options[:quiet]
+ @quiet = options[:quiet]
Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
- end
+ end
def start
say "*** Starting job worker #{Delayed::Job.worker_name}"
trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
-
-
+
loop do
result = nil
@@ -33,15 +35,15 @@ def start
end
break if $exit
- end
-
+ end
+
ensure
Delayed::Job.clear_locks!
end
-
+
def say(text)
puts text unless @quiet
- RAILS_DEFAULT_LOGGER.info text
+ logger.info text if logger
end
end
View
6 lib/delayed_job.rb
@@ -0,0 +1,6 @@
+require File.dirname(__FILE__) + '/delayed/message_sending'
+require File.dirname(__FILE__) + '/delayed/performable_method'
+require File.dirname(__FILE__) + '/delayed/job'
+require File.dirname(__FILE__) + '/delayed/worker'
+
+Object.send(:include, Delayed::MessageSending)
Please sign in to comment.
Something went wrong with that request. Please try again.