Permalink
Browse files

adds 'read_ahead' as configurable option

  • Loading branch information...
1 parent 91523ab commit 666f4422a89e8e9a6aa591c94e7ad0fb5b6a8fa5 @dcuddeback committed Jan 27, 2012
Showing with 25 additions and 2 deletions.
  1. +1 −1 lib/delayed/backend/base.rb
  2. +21 −0 lib/delayed/backend/shared_spec.rb
  3. +3 −1 lib/delayed/worker.rb
@@ -41,7 +41,7 @@ def enqueue(*args)
def reserve(worker, max_run_time = Worker.max_run_time)
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
- find_available(worker.name, 5, max_run_time).detect do |job|
+ find_available(worker.name, worker.read_ahead, max_run_time).detect do |job|
job.lock_exclusively!(max_run_time, worker.name)
end
end
@@ -279,6 +279,27 @@ def create_job(opts = {})
end
end
+ context "worker read-ahead" do
+ before do
+ @read_ahead = Delayed::Worker.read_ahead
+ end
+
+ after do
+ Delayed::Worker.read_ahead = @read_ahead
+ end
+
+ it "should read five jobs" do
+ described_class.should_receive(:find_available).with(anything, 5, anything).and_return([])
+ described_class.reserve(worker)
+ end
+
+ it "should read a configurable number of jobs" do
+ Delayed::Worker.read_ahead = 15
+ described_class.should_receive(:find_available).with(anything, Delayed::Worker.read_ahead, anything).and_return([])
+ described_class.reserve(worker)
+ end
+ end
+
context "clear_locks!" do
before do
@job = create_job(:locked_by => 'worker1', :locked_at => described_class.db_time_now)
View
@@ -7,13 +7,14 @@
module Delayed
class Worker
- cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues
+ cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues, :read_ahead
self.sleep_delay = 5
self.max_attempts = 25
self.max_run_time = 4.hours
self.default_priority = 0
self.delay_jobs = true
self.queues = []
+ self.read_ahead = 5
# Add or remove plugins in this list before the worker is instantiated
cattr_accessor :plugins
@@ -82,6 +83,7 @@ def initialize(options={})
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
+ self.class.read_ahead = options[:read_ahead] if options.has_key?(:read_ahead)
self.class.queues = options[:queues] if options.has_key?(:queues)
self.plugins.each { |klass| klass.new }

0 comments on commit 666f442

Please sign in to comment.