Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what's changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: collectiveidea/delayed_job
...
head fork: ttilley/delayed_job
compare: master
Checking mergeability… Don't worry, you can still create the pull request.
  • 3 commits
  • 7 files changed
  • 0 commit comments
  • 1 contributor
Commits on Nov 17, 2009
@ttilley ttilley worker model, based off of work by Rick Olson eea8b69
@ttilley ttilley basic new relic reporting 88a0f52
Commits on Nov 21, 2009
@ttilley ttilley Work on including Delayed::Job support in RPM proper has made this un…
…necessary.

Revert "basic new relic reporting"

This reverts commit 88a0f52.
1d055c0
View
13 generators/delayed_job/templates/migration.rb
@@ -11,6 +11,19 @@ def self.up
table.string :locked_by # Who is working on this object (if locked)
table.timestamps
end
+
+ create_table :delayed_workers, :force => true do |table|
+ table.string :name
+ table.integer :job_id
+ table.string :job_name
+ table.integer :job_attempt
+ table.integer :job_priority
+ table.integer :completed_jobs, :default => 0
+ table.integer :failed_jobs, :default => 0
+ table.integer :longest_job, :default => 0
+ table.datetime :job_started_at
+ table.timestamps
+ end
end
View
5 lib/delayed/command.rb
@@ -63,9 +63,8 @@ def run(worker_name = nil)
Delayed::Worker.logger = Rails.logger
ActiveRecord::Base.connection.reconnect!
- worker = Delayed::Worker.new(@options)
- worker.name_prefix = "#{worker_name} "
- worker.start
+ @options[:name_prefix] = "#{worker_name} "
+ Delayed::Worker.start(@options)
rescue => e
Rails.logger.fatal e
STDERR.puts e.message
View
17 lib/delayed/job.rb
@@ -31,6 +31,10 @@ class Job < ActiveRecord::Base
cattr_accessor :min_priority, :max_priority
self.min_priority = nil
self.max_priority = nil
+
+ def worker
+ Delayed::Worker.instance
+ end
# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
@@ -93,17 +97,26 @@ def run_with_lock(max_run_time, worker_name)
# Try to run job. Returns true/false (work done/work failed)
def run(max_run_time)
+ worker.start_job(self) if worker
+
runtime = Benchmark.realtime do
Timeout.timeout(max_run_time.to_i) { invoke_job }
destroy
end
+
# TODO: warn if runtime > max_run_time ?
logger.info "* [JOB] #{name} completed after %.4f" % runtime
- return true # did work
+
+ worker.end_job(self, runtime) if worker
+
+ return true
rescue Exception => e
+ worker.fail_job(self) if worker
+
reschedule e.message, e.backtrace
log_exception(e)
- return false # work failed
+
+ return false
end
# Add a job to the queue
View
2  lib/delayed/tasks.rb
@@ -10,6 +10,6 @@
desc "Start a delayed_job worker."
task :work => [:merb_env, :environment] do
- Delayed::Worker.new(:min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY']).start
+ Delayed::Worker.start(:min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY'])
end
end
View
100 lib/delayed/worker.rb
@@ -1,8 +1,23 @@
module Delayed
- class Worker
+ class Worker < ActiveRecord::Base
+ set_table_name :delayed_workers
+
+ # Every worker has a unique name which by default is the pid of the process.
+ # There are some advantages to overriding this with something which survives worker retarts:
+ # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
+ def self.default_name
+ "host:#{Socket.gethostname} pid:#{Process.pid}" rescue "pid:#{Process.pid}"
+ end
+
+ def self.start(options={})
+ create!(options).start
+ end
+
@@sleep_delay = 5
cattr_accessor :sleep_delay
+
+ cattr_accessor :instance
cattr_accessor :logger
self.logger = if defined?(Merb::Logger)
@@ -10,36 +25,27 @@ class Worker
elsif defined?(RAILS_DEFAULT_LOGGER)
RAILS_DEFAULT_LOGGER
end
-
- # name_prefix is ignored if name is set directly
- attr_accessor :name_prefix
+
+ after_save :set_worker_instance
def job_max_run_time
Delayed::Job.max_run_time
end
- # Every worker has a unique name which by default is the pid of the process.
- # There are some advantages to overriding this with something which survives worker retarts:
- # Workers can safely resume working on tasks which are locked by themselves. The worker will assume that it crashed before.
- def name
- return @name unless @name.nil?
- "#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
- end
-
- # Sets the name of the worker.
- # Setting the name to nil will reset the default worker name
- def name=(val)
- @name = val
- end
-
def initialize(options={})
- @quiet = options[:quiet]
- Delayed::Job.min_priority = options[:min_priority] if options.has_key?(:min_priority)
- Delayed::Job.max_priority = options[:max_priority] if options.has_key?(:max_priority)
+ @quiet = options.delete(:quiet)
+
+ Delayed::Job.min_priority = options.delete(:min_priority) if options.has_key?(:min_priority)
+ Delayed::Job.max_priority = options.delete(:max_priority) if options.has_key?(:max_priority)
+
+ options[:name] ||= self.class.default_name
+ options[:name] = "#{options.delete(:name_prefix)}#{options[:name]}" if options.has_key?(:name_prefix)
+
+ super(options)
end
def start
- say "*** Starting job worker #{name}"
+ say "*** Starting job worker #{self.name}"
trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
@@ -65,13 +71,52 @@ def start
end
ensure
- Delayed::Job.clear_locks!(name)
+ Delayed::Job.clear_locks!(self.name)
end
def say(text, level = Logger::INFO)
puts text unless @quiet
logger.add level, text if logger
end
+
+ def start_job(job)
+ return if new_record?
+ self.job_id = job.id
+ self.job_name = job.name
+ self.job_attempt = job.attempts
+ self.job_priority = job.priority
+ self.job_started_at = Delayed::Job.db_time_now
+ save!
+ end
+
+ def fail_job(job)
+ return if new_record?
+ self.failed_jobs += 1
+ set_longest_job
+ clear_state_fields
+ save!
+ end
+
+ def end_job(job, runtime = nil)
+ return if new_record?
+ self.completed_jobs += 1
+ set_longest_job(runtime)
+ clear_state_fields
+ save!
+ end
+
+ def clear_state_fields
+ self.job_id = nil
+ self.job_name = nil
+ self.job_attempt = nil
+ self.job_priority = nil
+ self.job_started_at = nil
+ end
+
+ def set_longest_job(duration = nil)
+ duration ||= job_started_at ? Delayed::Job.db_time_now - job_started_at : 0
+ self.longest_job = duration if duration > self.longest_job
+ end
protected
@@ -83,10 +128,10 @@ def reserve_and_run_one_job(max_run_time = job_max_run_time)
# this leads to a more even distribution of jobs across the worker processes
job = Delayed::Job.find_available(name, 5, max_run_time).detect do |job|
if job.lock_exclusively!(max_run_time, name)
- say "* [Worker(#{name})] acquired lock on #{job.name}"
+ say "* [Worker(#{self.name})] acquired lock on #{job.name}"
true
else
- say "* [Worker(#{name})] failed to acquire exclusive lock for #{job.name}", Logger::WARN
+ say "* [Worker(#{self.name})] failed to acquire exclusive lock for #{job.name}", Logger::WARN
false
end
end
@@ -117,5 +162,10 @@ def work_off(num = 100)
return [success, failure]
end
+
+ def set_worker_instance
+ self.class.instance = self
+ end
+
end
end
View
13 spec/database.rb
@@ -26,6 +26,19 @@
table.datetime :failed_at
table.timestamps
end
+
+ create_table :delayed_workers, :force => true do |table|
+ table.string :name
+ table.integer :job_id
+ table.string :job_name
+ table.integer :job_attempt
+ table.integer :job_priority
+ table.integer :completed_jobs, :default => 0
+ table.integer :failed_jobs, :default => 0
+ table.integer :longest_job, :default => 0
+ table.datetime :job_started_at
+ table.timestamps
+ end
create_table :stories, :force => true do |table|
table.string :text
View
89 spec/worker_spec.rb
@@ -8,19 +8,108 @@ def job_create(opts = {})
before do
Delayed::Worker.class_eval('public :work_off')
+ Delayed::Worker.delete_all
end
before(:each) do
@worker = Delayed::Worker.new(:max_priority => nil, :min_priority => nil)
+ @worker.save!
Delayed::Job.delete_all
SimpleJob.runs = 0
end
+
+ context "starting up" do
+ it "sets the worker instance" do
+ Delayed::Worker.instance.should == @worker
+ end
+
+ it "sets name" do
+ @worker.name.should == Delayed::Worker.default_name
+ end
+
+ it "sets created_at" do
+ @worker.created_at.should_not == nil
+ end
+ end
+
+ context "starting a job" do
+ before(:each) do
+ @worker = Delayed::Worker.new
+ @worker.save!
+ @job = job_create
+ @worker.start_job(@job)
+ end
+
+ it "sets job_id" do
+ @worker.job_id.should == @job.id
+ end
+
+ it "sets job_name" do
+ @worker.job_name.should == @job.name
+ end
+
+ it "sets job_attempt" do
+ @worker.job_attempt.should == @job.attempts
+ end
+
+ it "sets job_priority" do
+ @worker.job_priority.should == @job.priority
+ end
+
+ it "sets job_started_at" do
+ @worker.job_started_at.should_not == nil
+ end
+ end
+
+ context "ending a job" do
+ before(:each) do
+ @worker = Delayed::Worker.create!({})
+ @job = job_create
+ @worker.start_job(@job)
+ @worker.update_attribute :job_started_at, 5.minutes.ago
+ @worker.end_job(@job)
+ end
+
+ it "increments completed jobs" do
+ @worker.completed_jobs.should == 1
+ end
+
+ it "records longest running job" do
+ @worker.longest_job.should == 300
+ end
+
+ it "unsets job_id" do
+ @worker.job_id.should == nil
+ end
+
+ it "unsets job_name" do
+ @worker.job_name.should == nil
+ end
+
+ it "unsets job_started_at" do
+ @worker.job_started_at.should == nil
+ end
+ end
+
+ context "failing a job" do
+ before(:each) do
+ @worker = Delayed::Worker.create!({})
+ @job = job_create
+ @worker.start_job(@job)
+ @worker.fail_job(@job)
+ end
+
+ it "increments failed jobs" do
+ @worker.failed_jobs.should == 1
+ end
+ end
context "worker prioritization" do
before(:each) do
@worker = Delayed::Worker.new(:max_priority => 5, :min_priority => -5)
+ @worker.save!
end
it "should only work_off jobs that are >= min_priority" do

No commit comments for this range

Something went wrong with that request. Please try again.