Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Removed the global lock. Jobs can now processed in parallel by runnin…

…g multiple job runners on the same machine or across your server farm.

This requires two new columns in the job table: locked_by and locked_until
  • Loading branch information...
commit 8ec934eef96cd5622713eb7c621441cbafabe55d 1 parent 2309a94
@tobi tobi authored
View
71 README
@@ -12,6 +12,12 @@ It is a direct extraction from Shopify where the job table is responsible for a
* updating solr, our search server, after product changes
* batch imports
* spam checks
+
+== Changes ==
+
+1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us
+ to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing.
+1.0 Initial release
== Setup ==
@@ -22,7 +28,9 @@ The library evolves around a delayed_jobs table which looks as follows:
table.integer :attempts, :default => 0
table.text :handler
table.string :last_error
- table.datetime :run_at
+ table.datetime :run_at
+ table.datetime :locked_until
+ table.string :locked_by
table.timestamps
end
@@ -58,46 +66,39 @@ At Shopify we run the the tasks from a simple script/job_runner which is being i
#!/usr/bin/env ruby
require File.dirname(__FILE__) + '/../config/environment'
- SLEEP = 15
- RESTART_AFTER = 1000
+ SLEEP = 5
trap('TERM') { puts 'Exiting...'; $exit = true }
trap('INT') { puts 'Exiting...'; $exit = true }
-
- # this script dies after several runs to prevent memory leaks.
- # runnit will immediately start it again.
- count, runs_left = 0, RESTART_AFTER
- loop do
-
- count = 0
-
- # this requires the locking plugin, also from jadedPixel
- ActiveRecord::base.aquire_lock("jobs table worker", 10) do
- puts 'got lock'
-
- realtime = Benchmark.realtime do
- count = Delayed::Job.work_off
- end
- end
+ puts "*** Staring job worker #{Delayed::Job.worker_name}"
+
+ begin
+
+ loop do
+ result = nil
- runs_left -= 1
+ realtime = Benchmark.realtime do
+ result = Delayed::Job.work_off
+ end
- break if $exit
+ count = result.sum
+
+ break if $exit
- if count.zero?
- sleep(SLEEP)
- else
- status = "#{count} jobs completed at %.2f j/s ..." % [count / realtime]
- RAILS_DEFAULT_LOGGER.info status
- puts status
- end
+ if count.zero?
+ sleep(SLEEP)
+ puts 'Waiting for more jobs...'
+ else
+ status = "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
+ RAILS_DEFAULT_LOGGER.info status
+ puts status
+ end
- if $exit or runs_left <= 0
- break
+ break if $exit
end
- end
-
-== Todo ==
-
-Work out a locking mechanism which would allow several job runners to run at the same time, spreading the load between them.
+
+ ensure
+ Delayed::Job.clear_locks!
+ end
+
View
228 lib/delayed/job.rb
@@ -4,142 +4,192 @@ class DeserializationError < StandardError
end
class Job < ActiveRecord::Base
- ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
-
set_table_name :delayed_jobs
-
- class Runner
- attr_accessor :logger, :jobs
- attr_accessor :runs, :success, :failure
+
+ cattr_accessor :worker_name
+ self.worker_name = "pid:#{Process.pid}"
- def initialize(jobs, logger = nil)
- @jobs = jobs
- @logger = logger
- self.runs = self.success = self.failure = 0
- end
-
- def run
-
- ActiveRecord::Base.cache do
- ActiveRecord::Base.transaction do
- @jobs.each do |job|
- self.runs += 1
- begin
- time = Benchmark.measure do
- job.perform
- ActiveRecord::Base.uncached { job.destroy }
- self.success += 1
- end
- logger.debug "Executed job in #{time.real}"
- rescue DeserializationError, StandardError, RuntimeError => e
- if logger
- logger.error "Job #{job.id}: #{e.class} #{e.message}"
- logger.error e.backtrace.join("\n")
- end
- ActiveRecord::Base.uncached { job.reshedule e.message }
- self.failure += 1
- end
- end
- end
- end
-
- self
- end
- end
-
- def self.enqueue(object, priority = 0)
- raise ArgumentError, 'Cannot enqueue items which do not respond to perform' unless object.respond_to?(:perform)
- Job.create(:handler => object, :priority => priority)
- end
-
- def handler=(object)
- self['handler'] = object.to_yaml
+ NextTaskSQL = '`run_at` <= ? AND (`locked_until` IS NULL OR `locked_until` < ?) OR (`locked_by`=?)'
+ NextTaskOrder = 'priority DESC, run_at ASC'
+ ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
+
+ class LockError < StandardError
+ end
+
+ def self.clear_locks!
+ connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_until`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
end
-
- def handler
- @handler ||= deserialize(self['handler'])
+
+ def payload_object
+ @payload_object ||= deserialize(self['handler'])
end
- def perform
- handler.perform
+ def payload_object=(object)
+ self['handler'] = object.to_yaml
end
- def reshedule(message)
- self.attempts += 1
- self.run_at = self.class.time_now + (attempts ** 4).seconds
- self.last_error = message
+ def reshedule(message, time = nil)
+ time ||= Job.db_time_now + (attempts ** 4).seconds + 1
+
+ self.attempts += 1
+ self.run_at = time
+ self.last_error = message
+ self.unlock
save!
- end
-
- def self.peek(limit = 1)
- if limit == 1
- find(:first, :order => "priority DESC, run_at ASC", :conditions => ['run_at <= ?', time_now])
- else
- find(:all, :order => "priority DESC, run_at ASC", :limit => limit, :conditions => ['run_at <= ?', time_now])
+ end
+
+
+ def self.enqueue(object, priority = 0)
+ unless object.respond_to?(:perform)
+ raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
- end
- def self.work_off(limit = 100)
- jobs = Job.find(:all, :conditions => ['run_at <= ?', time_now], :order => "priority DESC, run_at ASC", :limit => limit)
+ Job.create(:payload_object => object, :priority => priority)
+ end
- Job::Runner.new(jobs, logger).run
+ def self.find_available(limit = 5)
+ time_now = db_time_now
+ find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => 5)
end
-
- protected
+
+ # Get the payload of the next job we can get an exclusive lock on.
+ # If no jobs are left we return nil
+ def self.reserve(timeout = 5 * 60)
+
+ # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
+ # this leads to a more even distribution of jobs across the worker processes
+ find_available(5).each do |job|
+ begin
+ job.lock_exclusively!(self.db_time_now + timeout, worker_name)
+ yield job.payload_object
+ job.destroy
+ return job
+ rescue LockError
+ # We did not get the lock, some other worker process must have
+ puts "failed to aquire exclusive lock for #{job.id}"
+ rescue StandardError => e
+ job.reshedule e.message
+ return job
+ end
+ end
+
+ nil
+ end
+
+ # This method is used internally by reserve method to ensure exclusive access
+ # to the given job. It will rise a LockError if it cannot get this lock.
+ def lock_exclusively!(lock_until, worker = worker_name)
+
+ affected_rows = if locked_by != worker
+
+ # We don't own this job so we will update the locked_by name and the locked_until
+ connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
+ UPDATE #{self.class.table_name}
+ SET `locked_until`=#{quote_value(lock_until)}, `locked_by`=#{quote_value(worker)}
+ WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_until`<#{quote_value(self.class.db_time_now)} OR `locked_until` IS NULL)
+ end_sql
+
+ else
+
+ # We alrady own this job, this may happen if the job queue crashes.
+ # Simply update the lock timeout
+ connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
+ UPDATE #{self.class.table_name}
+ SET `locked_until`=#{quote_value(lock_until)}
+ WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
+ end_sql
+
+ end
+
+ unless affected_rows == 1
+ raise LockError, "Attempted to aquire exclusive lock failed"
+ end
+
+ self.locked_until = lock_until
+ self.locked_by = worker
+ end
- def self.time_now
- (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
+ def unlock
+ self.locked_until = nil
+ self.locked_by = nil
end
- def before_save
- self.run_at ||= self.class.time_now
- end
-
+ def self.work_off(num = 100)
+ success, failure = 0, 0
+
+ num.times do
+
+ job = self.reserve do |j|
+ begin
+ j.perform
+ success += 1
+ rescue
+ failure += 1
+ raise
+ end
+ end
+
+ break if job.nil?
+ end
+
+ return [success, failure]
+ end
+
private
-
+
def deserialize(source)
attempt_to_load_file = true
-
+
begin
handler = YAML.load(source) rescue nil
return handler if handler.respond_to?(:perform)
-
+
if handler.nil?
if source =~ ParseObjectFromYaml
-
+
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load($1)
-
+
# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end
end
-
+
if handler.is_a?(YAML::Object)
-
+
# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
attempt_to_load(handler.class)
-
+
# If successful, retry the yaml.load
handler = YAML.load(source)
return handler if handler.respond_to?(:perform)
end
-
+
raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
-
+
rescue TypeError, LoadError, NameError => e
-
+
raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
end
- end
-
+ end
+
def attempt_to_load(klass)
klass.constantize
end
+
+ def self.db_time_now
+ (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
+ end
+ protected
+
+ def before_save
+ self.run_at ||= self.class.db_time_now
+ end
+
end
end
View
3  lib/delayed/performable_method.rb
@@ -12,6 +12,9 @@ def initialize(object, method, args)
def perform
load(object).send(method, *args.map{|a| load(a)})
+ rescue ActiveRecord::RecordNotFound
+ # We cannot do anything about objects which were deleted in the meantime
+ true
end
private
View
10 spec/database.rb
@@ -12,11 +12,13 @@ def reset_db
ActiveRecord::Schema.define do
create_table :delayed_jobs, :force => true do |table|
- table.integer :priority, :default => 0
- table.integer :attempts, :default => 0
- table.text :handler
- table.string :last_error
+ table.integer :priority, :default => 0
+ table.integer :attempts, :default => 0
+ table.text :handler
+ table.string :last_error
table.datetime :run_at
+ table.datetime :locked_until
+ table.string :locked_by
table.timestamps
end
View
26 spec/delayed_method_spec.rb
@@ -48,20 +48,18 @@ def read(story)
RandomRubyObject.new.send_later(:say_hello)
Delayed::Job.count.should == 1
- Delayed::Job.peek.perform.should == 'hello'
end
- it "should store the object as string if its an active record" do
-
+ it "should store the object as string if its an active record" do
story = Story.create :text => 'Once upon...'
story.send_later(:tell)
- job = Delayed::Job.peek
- job.handler.class.should == Delayed::PerformableMethod
- job.handler.object.should == 'AR:Story:1'
- job.handler.method.should == :tell
- job.handler.args.should == []
- job.perform.should == 'Once upon...'
+ job = Delayed::Job.find(:first)
+ job.payload_object.class.should == Delayed::PerformableMethod
+ job.payload_object.object.should == 'AR:Story:1'
+ job.payload_object.method.should == :tell
+ job.payload_object.args.should == []
+ job.payload_object.perform.should == 'Once upon...'
end
it "should store arguments as string if they an active record" do
@@ -71,11 +69,11 @@ def read(story)
reader = StoryReader.new
reader.send_later(:read, story)
- job = Delayed::Job.peek
- job.handler.class.should == Delayed::PerformableMethod
- job.handler.method.should == :read
- job.handler.args.should == ['AR:Story:1']
- job.perform.should == 'Epilog: Once upon...'
+ job = Delayed::Job.find(:first)
+ job.payload_object.class.should == Delayed::PerformableMethod
+ job.payload_object.method.should == :read
+ job.payload_object.args.should == ['AR:Story:1']
+ job.payload_object.perform.should == 'Epilog: Once upon...'
end
end
View
52 spec/job_spec.rb
@@ -1,6 +1,5 @@
require File.dirname(__FILE__) + '/database'
-
class SimpleJob
cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
@@ -30,25 +29,11 @@ def perform; raise 'did not work'; end
Delayed::Job.count.should == 1
end
- it "should return nil when peeking on empty table" do
- Delayed::Job.peek.should == nil
- end
-
- it "should return a job when peeking a table with jobs in it" do
- Delayed::Job.enqueue SimpleJob.new
- Delayed::Job.peek.class.should == Delayed::Job
- end
-
- it "should return an array of jobs when peek is called with a count larger than zero" do
- Delayed::Job.enqueue SimpleJob.new
- Delayed::Job.peek(2).class.should == Array
- end
-
it "should call perform on jobs when running work_off" do
SimpleJob.runs.should == 0
Delayed::Job.enqueue SimpleJob.new
- Delayed::Job.work_off(1)
+ Delayed::Job.work_off
SimpleJob.runs.should == 1
end
@@ -56,9 +41,7 @@ def perform; raise 'did not work'; end
it "should re-schedule by about 1 second at first and increment this more and more minutes when it fails to execute properly" do
Delayed::Job.enqueue ErrorJob.new
runner = Delayed::Job.work_off(1)
- runner.success.should == 0
- runner.failure.should == 1
-
+
job = Delayed::Job.find(:first)
job.last_error.should == 'did not work'
job.attempts.should == 1
@@ -71,7 +54,7 @@ def perform; raise 'did not work'; end
job = Delayed::Job.new
job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
- lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
it "should try to load the class when it is unknown at the time of the deserialization" do
@@ -80,14 +63,14 @@ def perform; raise 'did not work'; end
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
- lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
it "should try include the namespace when loading unknown objects" do
job = Delayed::Job.new
job['handler'] = "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
- lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
@@ -97,15 +80,36 @@ def perform; raise 'did not work'; end
job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
- lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
it "should try include the namespace when loading unknown structs" do
job = Delayed::Job.new
job['handler'] = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
- lambda { job.perform }.should raise_error(Delayed::DeserializationError)
+ lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
end
+
+
+ describe "when two workers are running" do
+
+ before :each do
+ Delayed::Job.worker_name = 'worker1'
+ Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_until => Time.now + 360
+ end
+
+ it "should give exclusive access only to a single worker" do
+ job = Delayed::Job.find_available.first
+ lambda { job.lock_exclusively! Time.now + 20, 'worker2' }.should raise_error(Delayed::Job::LockError)
+ end
+
+ it "should be able to get exclusive access again when the worker name is the same" do
+ job = Delayed::Job.find_available.first
+ job.lock_exclusively! Time.now + 20, 'worker1'
+ job.lock_exclusively! Time.now + 21, 'worker1'
+ job.lock_exclusively! Time.now + 22, 'worker1'
+ end
+ end
end
Please sign in to comment.
Something went wrong with that request. Please try again.