Skip to content

Commit

Permalink
Optimized lookup for Postgresql and MySQL. Falling back to read-ahead…
Browse files Browse the repository at this point in the history
… for others
  • Loading branch information
albus522 authored and sferik committed Apr 1, 2013
1 parent fffce77 commit 3075c59
Showing 1 changed file with 29 additions and 14 deletions.
43 changes: 29 additions & 14 deletions lib/delayed/backend/active_record.rb
Expand Up @@ -37,29 +37,44 @@ def self.after_fork

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
where(:locked_by => worker_name).update_all(:locked_by => nil, :locked_at => nil)
end

def self.reserve(worker, max_run_time = Worker.max_run_time)
# scope to filter to records that are "ready to run"
readyScope = self.ready_to_run(worker.name, max_run_time)
ready_scope = self.ready_to_run(worker.name, max_run_time)

# scope to filter to the single next eligible job
nextScope = readyScope.scoped
nextScope = nextScope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
nextScope = nextScope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
nextScope = nextScope.scoped(:conditions => ["queue IN (?)", Worker.queues]) if Worker.queues.any?
nextScope = nextScope.scoped.by_priority.limit(1)
ready_scope = ready_scope.where('priority >= ?', Worker.min_priority) if Worker.min_priority
ready_scope = ready_scope.where('priority <= ?', Worker.max_priority) if Worker.max_priority
ready_scope = ready_scope.where(:queue => Worker.queues) if Worker.queues.any?
ready_scope = ready_scope.by_priority

now = self.db_time_now
job = nextScope.first
return unless job
job.with_lock do
job.locked_at = now
job.locked_by = worker.name
job.save!

# Optimizations for faster lookups on some common databases
case self.connection.adapter_name
when "PostgreSQL"
# Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
# This locks the single record 'FOR UPDATE' in the subquery (http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE)
# Note: active_record would attempt to generate UPDATE...LIMIT like sql for postgres if we use a .limit() filter, but it would not use
# 'FOR UPDATE' and we would have many locking conflicts
quoted_table_name = self.connection.quote_table_name(self.table_name)
subquery_sql = ready_scope.limit(1).lock(true).select('id').to_sql
reserved = self.find_by_sql(["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql}) RETURNING *", now, worker.name])
reserved[0]
when "MySQL", "Mysql2"
# This works on MySQL and possibly some other DBs that support UPDATE...LIMIT. It uses separate queries to lock and return the job
count = ready_scope.limit(1).update_all(:locked_at => now, :locked_by => worker.name)
return nil if count == 0
self.where(:locked_at => now, :locked_by => worker.name).first
else
# This is our old fashion, tried and true, but slower lookup
ready_scope.limit(worker.read_ahead).detect do |job|
count = ready_scope.where(:id => job.id).update_all(:locked_at => now, :locked_by => worker.name)
count == 1 && job.reload
end
end
job
end

# Get the current time (GMT or local depending on DB)
Expand Down

0 comments on commit 3075c59

Please sign in to comment.