diff --git a/lib/delayed/backend/active_record.rb b/lib/delayed/backend/active_record.rb index b69fd050..d5e805ee 100644 --- a/lib/delayed/backend/active_record.rb +++ b/lib/delayed/backend/active_record.rb @@ -35,7 +35,7 @@ class Job < ::ActiveRecord::Base :failed_at, :locked_at, :locked_by, :handler end - scope :by_priority, lambda { order("priority ASC, run_at ASC") } + scope :by_priority, lambda { order(:priority, :run_at) } scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority } scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority } scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? } @@ -49,13 +49,14 @@ def self.set_delayed_job_table_name set_delayed_job_table_name - def self.ready_to_run(worker_name, max_run_time) - where( - "(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL", - db_time_now, - db_time_now - max_run_time, - worker_name - ) + def self.ready_to_run(worker_name, max_run_time) # rubocop:disable Metrics/AbcSize + lock_expired = arel_table[:locked_at].lt(db_time_now - max_run_time) + not_locked = arel_table[:locked_at].eq(nil).or(lock_expired) + locked_by_me = arel_table[:locked_by].eq(worker_name) + + where(failed_at: nil) + .where(arel_table[:run_at].lteq(db_time_now)) + .where(not_locked.or(locked_by_me)) end def self.before_fork @@ -123,10 +124,17 @@ def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now) # Note: active_record would attempt to generate UPDATE...LIMIT like # SQL for Postgres if we use a .limit() filter, but it would not # use 'FOR UPDATE' and we would have many locking conflicts - quoted_name = connection.quote_table_name(table_name) - subquery = ready_scope.limit(1).lock(true).select("id").to_sql - sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *" - reserved = find_by_sql([sql, now, worker.name]) + quoted_table_name = Delayed::Job.arel_table.alias(:dj).to_sql + subquery_sql = ready_scope.limit(1).lock(true).select("id").to_sql + sql = <<-SQL.squish + WITH sub AS (#{subquery_sql}) + UPDATE #{quoted_table_name} dj + SET locked_at = ?, locked_by = ? + FROM sub + WHERE dj.id = sub.id + RETURNING dj.* + SQL + reserved = find_by_sql([sql, now, worker.name]) reserved[0] end @@ -139,9 +147,10 @@ def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now) now = now.change(usec: 0) # This works on MySQL and possibly some other DBs that support # UPDATE...LIMIT. It uses separate queries to lock and return the job - count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name) + sets = "locked_at = :now, locked_by = :name, id = (SELECT @dj_update_id := id)" + count = ready_scope.limit(1).update_all([sets, now: now, name: worker.name]) return nil if count == 0 - where(locked_at: now, locked_by: worker.name, failed_at: nil).first + find_by("id = @dj_update_id") end def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)