Skip to content

Commit

Permalink
Merge e7e2290 into a98dbac
Browse files Browse the repository at this point in the history
  • Loading branch information
albus522 committed Feb 5, 2018
2 parents a98dbac + e7e2290 commit db32c1e
Showing 1 changed file with 23 additions and 14 deletions.
37 changes: 23 additions & 14 deletions lib/delayed/backend/active_record.rb
Expand Up @@ -35,7 +35,7 @@ class Job < ::ActiveRecord::Base
:failed_at, :locked_at, :locked_by, :handler
end

scope :by_priority, lambda { order("priority ASC, run_at ASC") }
scope :by_priority, lambda { order(:priority, :run_at) }
scope :min_priority, lambda { where("priority >= ?", Worker.min_priority) if Worker.min_priority }
scope :max_priority, lambda { where("priority <= ?", Worker.max_priority) if Worker.max_priority }
scope :for_queues, lambda { |queues = Worker.queues| where(queue: queues) if Array(queues).any? }
Expand All @@ -49,13 +49,14 @@ def self.set_delayed_job_table_name

set_delayed_job_table_name

def self.ready_to_run(worker_name, max_run_time)
where(
"(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL",
db_time_now,
db_time_now - max_run_time,
worker_name
)
def self.ready_to_run(worker_name, max_run_time) # rubocop:disable Metrics/AbcSize
lock_expired = arel_table[:locked_at].lt(db_time_now - max_run_time)
not_locked = arel_table[:locked_at].eq(nil).or(lock_expired)
locked_by_me = arel_table[:locked_by].eq(worker_name)

where(failed_at: nil)
.where(arel_table[:run_at].lteq(db_time_now))
.where(not_locked.or(locked_by_me))
end

def self.before_fork
Expand Down Expand Up @@ -123,10 +124,17 @@ def self.reserve_with_scope_using_optimized_postgres(ready_scope, worker, now)
# Note: active_record would attempt to generate UPDATE...LIMIT like
# SQL for Postgres if we use a .limit() filter, but it would not
# use 'FOR UPDATE' and we would have many locking conflicts
quoted_name = connection.quote_table_name(table_name)
subquery = ready_scope.limit(1).lock(true).select("id").to_sql
sql = "UPDATE #{quoted_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery}) RETURNING *"
reserved = find_by_sql([sql, now, worker.name])
quoted_table_name = Delayed::Job.arel_table.alias(:dj).to_sql
subquery_sql = ready_scope.limit(1).lock(true).select("id").to_sql
sql = <<-SQL.squish
WITH sub AS (#{subquery_sql})
UPDATE #{quoted_table_name} dj
SET locked_at = ?, locked_by = ?
FROM sub
WHERE dj.id = sub.id
RETURNING dj.*
SQL
reserved = find_by_sql([sql, now, worker.name])
reserved[0]
end

Expand All @@ -139,9 +147,10 @@ def self.reserve_with_scope_using_optimized_mysql(ready_scope, worker, now)
now = now.change(usec: 0)
# This works on MySQL and possibly some other DBs that support
# UPDATE...LIMIT. It uses separate queries to lock and return the job
count = ready_scope.limit(1).update_all(locked_at: now, locked_by: worker.name)
sets = "locked_at = :now, locked_by = :name, id = (SELECT @dj_update_id := id)"
count = ready_scope.limit(1).update_all([sets, now: now, name: worker.name])
return nil if count == 0
where(locked_at: now, locked_by: worker.name, failed_at: nil).first
find_by("id = @dj_update_id")
end

def self.reserve_with_scope_using_optimized_mssql(ready_scope, worker, now)
Expand Down

0 comments on commit db32c1e

Please sign in to comment.