Skip to content

Commit

Permalink
When using the activerecord backend, reserve jobs with an update quer…
Browse files Browse the repository at this point in the history
…y instead of trying to lock a batch of available jobs. This causes workers to serialize on the database and avoids contention between them.
  • Loading branch information
betamatt committed Nov 30, 2010
1 parent 8ea6cb4 commit deb329b
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
32 changes: 27 additions & 5 deletions lib/delayed/backend/active_record.rb
Expand Up @@ -16,6 +16,10 @@ class Job < ::ActiveRecord::Base
}
scope :by_priority, order('priority ASC, run_at ASC')

scope :locked_by_worker, lambda{|worker_name, max_run_time|
where(['locked_by = ? AND locked_at > ?', worker_name, db_time_now - max_run_time])
}

def self.before_fork
::ActiveRecord::Base.clear_all_connections!
end
Expand All @@ -28,15 +32,33 @@ def self.after_fork
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)

def self.jobs_available_to_worker(worker_name, max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority

scope.by_priority
end

# Reserve a single job in a single update query. This causes workers to serialize on the
# database and avoids contention.
def self.reserve(worker, max_run_time = Worker.max_run_time)
affected_rows = 0
::ActiveRecord::Base.silence do
affected_rows = update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name], jobs_available_to_worker(worker.name, max_run_time).scope(:find)[:conditions], :limit => 1)
end

if affected_rows == 1
locked_by_worker(worker.name, max_run_time).first
else
nil
end
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
::ActiveRecord::Base.silence do
scope.by_priority.all(:limit => limit)
jobs_available_to_worker(worker_name, max_run_time).all(:limit => limit)
end
end

Expand Down
1 change: 1 addition & 0 deletions lib/generators/delayed_job/templates/migration.rb
Expand Up @@ -13,6 +13,7 @@ def self.up
end

add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
add_index :delayed_jobs, :locked_by, :name => 'delayed_jobs_locked_by'
end

def self.down
Expand Down

0 comments on commit deb329b

Please sign in to comment.