Permalink
Browse files

Remove .find_available and #lock_exclusively! from ActiveRecord backe…

…nd now that .reserve is doing it in one step.
  • Loading branch information...
1 parent 1ad4453 commit 3121164eab3eff4a75a69857d113128557f6e2f8 @bkeepers bkeepers committed Dec 1, 2010
Showing with 5 additions and 35 deletions.
  1. +5 −35 lib/delayed/backend/active_record.rb
@@ -19,7 +19,7 @@ class Job < ::ActiveRecord::Base
scope :locked_by_worker, lambda{|worker_name, max_run_time|
where(['locked_by = ? AND locked_at > ?', worker_name, db_time_now - max_run_time])
}
-
+
def self.before_fork
::ActiveRecord::Base.clear_all_connections!
end
@@ -32,58 +32,28 @@ def self.after_fork
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end
-
+
def self.jobs_available_to_worker(worker_name, max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
scope.by_priority
end
-
- # Reserve a single job in a single update query. This causes workers to serialize on the
+
+ # Reserve a single job in a single update query. This causes workers to serialize on the
# database and avoids contention.
def self.reserve(worker, max_run_time = Worker.max_run_time)
affected_rows = 0
::ActiveRecord::Base.silence do
affected_rows = jobs_available_to_worker(worker.name, max_run_time).limit(1).update_all(["locked_at = ?, locked_by = ?", db_time_now, worker.name])
end
-
+
if affected_rows == 1
locked_by_worker(worker.name, max_run_time).first
else
nil
end
end
-
- # Find a few candidate jobs to run (in case some immediately get locked by others).
- def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
- ::ActiveRecord::Base.silence do
- jobs_available_to_worker(worker_name, max_run_time).limit(limit).all
- end
- end
-
- # Lock this job for this worker.
- # Returns true if we have the lock, false otherwise.
- def lock_exclusively!(max_run_time, worker)
- now = self.class.db_time_now
- affected_rows = if locked_by != worker
- # We don't own this job so we will update the locked_by name and the locked_at
- self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
- else
- # We already own this job, this may happen if the job queue crashes.
- # Simply resume and update the locked_at
- self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
- end
- if affected_rows == 1
- self.locked_at = now
- self.locked_by = worker
- self.locked_at_will_change!
- self.locked_by_will_change!
- return true
- else
- return false
- end
- end
# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients

0 comments on commit 3121164

Please sign in to comment.