Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Revert "Remove .find_available and #lock_exclusively! from ActiveReco…

…rd backend now that .reserve is doing it in one step."

This reverts commit 3121164.
  • Loading branch information...
commit d3acec0bf82af8ecf3eb928e7002e39b08ea56c9 1 parent a91f4c3
@betamatt betamatt authored
Showing with 35 additions and 5 deletions.
  1. +35 −5 lib/delayed/backend/active_record.rb
View
40 lib/delayed/backend/active_record.rb
@@ -19,7 +19,7 @@ class Job < ::ActiveRecord::Base
scope :locked_by_worker, lambda{|worker_name, max_run_time|
where(['locked_by = ? AND locked_at > ?', worker_name, db_time_now - max_run_time])
}
-
+
def self.before_fork
::ActiveRecord::Base.clear_all_connections!
end
@@ -32,28 +32,58 @@ def self.after_fork
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
-
+
def self.jobs_available_to_worker(worker_name, max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
scope.by_priority
end
-
- # Reserve a single job in a single update query. This causes workers to serialize on the
+
+ # Reserve a single job in a single update query. This causes workers to serialize on the
# database and avoids contention.
def self.reserve(worker, max_run_time = Worker.max_run_time)
affected_rows = 0
::ActiveRecord::Base.silence do
affected_rows = jobs_available_to_worker(worker.name, max_run_time).limit(1).update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name])
end
-
+
if affected_rows == 1
locked_by_worker(worker.name, max_run_time).first
else
nil
end
end
+
+ # Find a few candidate jobs to run (in case some immediately get locked by others).
+ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
+ ::ActiveRecord::Base.silence do
+ jobs_available_to_worker(worker_name, max_run_time).limit(limit).all
+ end
+ end
+
+ # Lock this job for this worker.
+ # Returns true if we have the lock, false otherwise.
+ def lock_exclusively!(max_run_time, worker)
+ now = self.class.db_time_now
+ affected_rows = if locked_by != worker
+ # We don't own this job so we will update the locked_by name and the locked_at
+ self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
+ else
+ # We already own this job, this may happen if the job queue crashes.
+ # Simply resume and update the locked_at
+ self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
+ end
+ if affected_rows == 1
+ self.locked_at = now
+ self.locked_by = worker
+ self.locked_at_will_change!
+ self.locked_by_will_change!
+ return true
+ else
+ return false
+ end
+ end
# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
Please sign in to comment.
Something went wrong with that request. Please try again.