Permalink
Browse files

Jobs can now be put into queues, and workers can specify the queue th…

…ey pull from.

Job::enqueue can now take names arguments.
These changes were basically pulled from andrewtimberlake's delayed job fork, but updated for collectiveidea's branch.
  • Loading branch information...
1 parent a00f01e commit 66b39621ca69f478df2367570f75459e5b6af688 @bracken committed Feb 18, 2010
View
@@ -1 +1,2 @@
*.gem
+.idea/*
View
@@ -1,6 +1,8 @@
h1. Delayed::Job
-Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background.
+Delated_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background.
+
+This fork adds a 'queue' column for each job, and the ability to specify which queue a worker should pull from.
It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks. Amongst those tasks are:
@@ -74,6 +76,9 @@ Notifier.deliver_signup(@user)
# with delayed_job
Notifier.send_later :deliver_signup, @user
+
+# putting it in a specific queue
+Notifier.send_later_with_queue :deliver_signup, "my_queue", @user
</pre>
If a method should always be run in the background, you can call @#handle_asynchronously@ after the method declaration:
@@ -84,6 +89,7 @@ class Device
# long running method
end
handle_asynchronously :deliver
+ #handle_asynchronously_with_queue :deliver, "my_queue"
end
device = Device.new
@@ -118,24 +124,27 @@ class NewsletterJob < Struct.new(:text, :emails)
end
end
-Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.find(:all).collect(&:email))
+Delayed::Job.enqueue(NewsletterJob.new('lorem ipsum...', Customers.find(:all).collect(&:email)), :queue=>"my_queue")
</pre>
h2. Gory Details
The library evolves around a delayed_jobs table which looks as follows:
+<pre>
create_table :delayed_jobs, :force => true do |table|
table.integer :priority, :default => 0 # Allows some jobs to jump to the front of the queue
table.integer :attempts, :default => 0 # Provides for retries, but still fail eventually.
table.text :handler # YAML-encoded string of the object that will do work
- table.text :last_error # reason for last failure (See Note below)
+ table.text :last_error # reason for last failure (See Note below)
+ table.text :queue, :default => nil # The queue that this job is in
table.datetime :run_at # When to run. Could be Time.zone.now for immediately, or sometime in the future.
table.datetime :locked_at # Set when a client is working on this object
table.datetime :failed_at # Set when all retries have failed (actually, by default, the record is deleted instead)
table.string :locked_by # Who is working on this object (if locked)
table.timestamps
end
+</pre>
On failure, the job is scheduled again in 5 seconds + N ** 4, where N is the number of retries.
@@ -156,6 +165,7 @@ Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.sleep_delay = 60
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 5.minutes
+Delayed::Worker.queue = "my_queue"
</pre>
h3. Cleaning up
@@ -5,6 +5,7 @@ def self.up
table.integer :attempts, :default => 0 # Provides for retries, but still fail eventually.
table.text :handler # YAML-encoded string of the object that will do work
table.text :last_error # reason for last failure (See Note below)
+ table.text :queue, :default => nil # The queue that this job is in
table.datetime :run_at # When to run. Could be Time.zone.now for immediately, or sometime in the future.
table.datetime :locked_at # Set when a client is working on this object
table.datetime :failed_at # Set when all retries have failed (actually, by default, the record is deleted instead)
@@ -34,10 +34,12 @@ def self.clear_locks!(worker_name)
end
# Find a few candidate jobs to run (in case some immediately get locked by others).
- def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
+ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time, queue=nil)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
+ scope = scope.scoped(:conditions => ['queue = ?', queue]) if queue
+ scope = scope.scoped(:conditions => ['queue is null']) unless queue
::ActiveRecord::Base.silence do
scope.by_priority.all(:limit => limit)
@@ -10,15 +10,21 @@ def self.included(base)
module ClassMethods
# Add a job to the queue
+ # The first argument should be an object that respond_to?(:perform)
+ # The rest should be named arguments, these keys are expected:
+ # :priority, :run_at, :queue
+ # Example: Delayed::Job.enqueue(object, :priority => 0, :run_at => time, :queue => queue)
def enqueue(*args)
object = args.shift
unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end
- priority = args.first || 0
- run_at = args[1]
- self.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
+ options = args.first || {}
+ options[:priority] ||= 0
+ options[:payload_object] = object
+
+ self.create(options)
end
end
@@ -40,14 +40,15 @@ def self.db_time_now
MongoMapper.time_class.now.utc
end
- def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
+ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time, queue=nil)
where = "this.run_at <= new Date(#{db_time_now.to_f * 1000}) && (this.locked_at == null || this.locked_at < new Date(#{(db_time_now - max_run_time).to_f * 1000})) || this.locked_by == #{worker_name.to_json}"
# all(:limit => limit, :failed_at => nil, '$where' => where)
conditions = {
'$where' => where,
:limit => limit,
:failed_at => nil,
+ :queue => queue,
:sort => [['priority', 1], ['run_at', 1]]
}
@@ -4,8 +4,16 @@ def send_later(method, *args)
Delayed::Job.enqueue Delayed::PerformableMethod.new(self, method.to_sym, args)
end
+ def send_later_with_queue(method, queue, *args)
+ Delayed::Job.enqueue(Delayed::PerformableMethod.new(self, method.to_sym, args), :queue => queue)
+ end
+
def send_at(time, method, *args)
- Delayed::Job.enqueue(Delayed::PerformableMethod.new(self, method.to_sym, args), 0, time)
+ Delayed::Job.enqueue(Delayed::PerformableMethod.new(self, method.to_sym, args), :priority => 0, :run_at => time)
+ end
+
+ def send_at_with_queue(time, method, queue, *args)
+ Delayed::Job.enqueue(Delayed::PerformableMethod.new(self, method.to_sym, args), :priority => 0, :run_at => time, :queue => queue)
end
module ClassMethods
@@ -17,6 +25,15 @@ def handle_asynchronously(method)
end
alias_method_chain method, :send_later
end
+
+ def handle_asynchronously_with_queue(method, queue)
+ aliased_method, punctuation = method.to_s.sub(/([?!=])$/, ''), $1
+ with_method, without_method = "#{aliased_method}_with_send_later_with_queue#{punctuation}", "#{aliased_method}_without_send_later_with_queue#{punctuation}"
+ define_method(with_method) do |*args|
+ send_later_with_queue(without_method, queue, *args)
+ end
+ alias_method_chain method, :send_later_with_queue
+ end
end
end
end
View
@@ -2,10 +2,11 @@
module Delayed
class Worker
- cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :sleep_delay, :logger
+ cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :sleep_delay, :logger, :queue
self.sleep_delay = 5
self.max_attempts = 25
self.max_run_time = 4.hours
+ self.queue = nil
# By default failed jobs are destroyed after too many attempts. If you want to keep them around
# (perhaps to inspect the reason for the failure), set this to false.
@@ -34,6 +35,7 @@ def self.backend=(backend)
def initialize(options={})
@quiet = options[:quiet]
+ @queue = options[:queue]
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
@@ -149,7 +151,7 @@ def reserve_and_run_one_job
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
- job = Delayed::Job.find_available(name, 5, self.class.max_run_time).detect do |job|
+ job = Delayed::Job.find_available(name, 5, self.class.max_run_time, @queue).detect do |job|
if job.lock_exclusively!(self.class.max_run_time, name)
say "* [Worker(#{name})] acquired lock on #{job.name}"
true
@@ -15,10 +15,18 @@
lambda { Object.new.send_later(:to_s) }.should change { Delayed::Job.count }.by(1)
end
+ it "should add a new entry to the job table when send_later_with_queue is called on it" do
+ lambda { Object.new.send_later_with_queue(:to_s, "testqueue") }.should change { Delayed::Job.count }.by(1)
+ end
+
it "should add a new entry to the job table when send_later is called on the class" do
lambda { Object.send_later(:to_s) }.should change { Delayed::Job.count }.by(1)
end
+ it "should add a new entry to the job table when send_later_with_queue is called on the class" do
+ lambda { Object.send_later_with_queue(:to_s, "testqueue") }.should change { Delayed::Job.count }.by(1)
+ end
+
it "should call send later on methods which are wrapped with handle_asynchronously" do
story = Story.create :text => 'Once upon...'
@@ -34,6 +42,21 @@
job.payload_object.perform.should == 'Once upon...'
end
+ it "should call send later on methods which are wrapped with handle_asynchronously_with_queue" do
+ story = Story.create :text => 'Once upon...'
+
+ Delayed::Job.count.should == 0
+
+ story.whatever_else(1, 5)
+
+ Delayed::Job.count.should == 1
+ job = Delayed::Job.find(:first)
+ job.payload_object.class.should == Delayed::PerformableMethod
+ job.payload_object.method.should == :whatever_else_without_send_later_with_queue
+ job.payload_object.args.should == [1, 5]
+ job.payload_object.perform.should == 'Once upon...'
+ end
+
context "send_at" do
it "should queue a new job" do
lambda do
@@ -56,4 +79,26 @@
end
end
+ context "send_at_with_queue" do
+ it "should queue a new job" do
+ lambda do
+ "string".send_at_with_queue(1.hour.from_now, :length, "testqueue")
+ end.should change { Delayed::Job.count }.by(1)
+ end
+
+ it "should schedule the job in the future" do
+ time = 1.hour.from_now
+ job = "string".send_at_with_queue(time, :length, "testqueue")
+ job.run_at.should == time
+ end
+
+ it "should store payload as PerformableMethod" do
+ job = "string".send_at_with_queue(1.hour.from_now, :count, "testqueue", 'r')
+ job.payload_object.class.should == Delayed::PerformableMethod
+ job.payload_object.method.should == :count
+ job.payload_object.args.should == ['r']
+ job.payload_object.perform.should == 1
+ end
+ end
+
end
@@ -26,13 +26,13 @@ def create_job(opts = {})
end
it "should be able to set priority when enqueuing items" do
- @job = @backend.enqueue SimpleJob.new, 5
+ @job = @backend.enqueue SimpleJob.new, :priority => 5
@job.priority.should == 5
end
it "should be able to set run_at when enqueuing items" do
later = @backend.db_time_now + 5.minutes
- @job = @backend.enqueue SimpleJob.new, 5, later
+ @job = @backend.enqueue SimpleJob.new, :priority => 5, :run_at => later
@job.run_at.should be_close(later, 1)
end
@@ -183,7 +183,7 @@ def create_job(opts = {})
end
it "should fetch jobs ordered by priority" do
- 10.times { @backend.enqueue SimpleJob.new, rand(10) }
+ 10.times { @backend.enqueue SimpleJob.new, :priority => rand(10) }
jobs = @backend.find_available('worker', 10)
jobs.size.should == 10
jobs.each_cons(2) do |a, b|
View
@@ -17,6 +17,7 @@
table.integer :priority, :default => 0
table.integer :attempts, :default => 0
table.text :handler
+ table.text :queue, :default => nil
table.string :last_error
table.datetime :run_at
table.datetime :locked_at
@@ -35,8 +36,10 @@
class Story < ActiveRecord::Base
def tell; text; end
def whatever(n, _); tell*n; end
+ def whatever_else(n, _); tell*n; end
handle_asynchronously :whatever
+ handle_asynchronously_with_queue :whatever_else, "testqueue"
end
require 'sample_jobs'
View
@@ -4,6 +4,9 @@
def job_create(opts = {})
Delayed::Job.create(opts.merge(:payload_object => SimpleJob.new))
end
+ def worker_create(opts = {})
+ Delayed::Worker.new(opts.merge(:max_priority => nil, :min_priority => nil, :quiet => true))
+ end
before(:all) do
Delayed::Worker.send :public, :work_off
@@ -13,7 +16,7 @@ def job_create(opts = {})
# Make sure backend is set to active record
Delayed::Worker.backend = :active_record
- @worker = Delayed::Worker.new(:max_priority => nil, :min_priority => nil, :quiet => true)
+ @worker = worker_create
Delayed::Job.delete_all
@@ -173,6 +176,43 @@ def job_create(opts = {})
end
end
-
-
+
+
+ context "Different queue workers" do
+ before :each do
+ job_create(:queue => 'queue1')
+ job_create(:queue => 'queue2')
+ job_create(:queue => nil)
+ end
+
+ it "should only work off jobs assigned to themselves" do
+ worker = worker_create(:queue=>'queue1')
+ SimpleJob.runs.should == 0
+ worker.work_off
+ SimpleJob.runs.should == 1
+
+ SimpleJob.runs = 0
+
+ worker = worker_create(:queue=>'queue2')
+ SimpleJob.runs.should == 0
+ worker.work_off
+ SimpleJob.runs.should == 1
+ end
+
+ it "should not work off jobs not assigned to themselves" do
+ worker = worker_create(:queue=>'queue3')
+
+ SimpleJob.runs.should == 0
+ worker.work_off
+ SimpleJob.runs.should == 0
+ end
+
+ it "should run non-named runner jobs when the runner has no name set" do
+ worker = worker_create(:queue=>nil)
+
+ SimpleJob.runs.should == 0
+ worker.work_off
+ SimpleJob.runs.should == 1
+ end
+ end
end

0 comments on commit 66b3962

Please sign in to comment.