From b5aeeeceac5a8b6c6b73fa83fcfca75951fd11c2 Mon Sep 17 00:00:00 2001 From: Brian Kierstead Date: Tue, 4 May 2010 09:43:43 -0700 Subject: [PATCH] added code from http://gist.github.com/389006 --- lib/delayed/backend/couch_mapper.rb | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/lib/delayed/backend/couch_mapper.rb b/lib/delayed/backend/couch_mapper.rb index e1fae72a4..b3941b803 100644 --- a/lib/delayed/backend/couch_mapper.rb +++ b/lib/delayed/backend/couch_mapper.rb @@ -35,10 +35,15 @@ class Job < ::CouchRest::ExtendedDocument property :last_error timestamps! - view_by :priority, :run_at view_by :locked_by + view_by(:locked_at, :run_at, + :map => "function(doc){" + + " if(doc['couchrest-type'] == 'Job' && doc.run_at) {" + + " var locked_at = doc.locked_at || '';" + + " emit([locked_at, doc.run_at], null);}" + + " }") - save_callback :before, :set_default_run_at + set_callback :save, :before, :set_default_run_at def self.after_fork ::CouchRest.server = CouchRest.new('http://localhost:5984') @@ -50,18 +55,21 @@ def self.db_time_now end def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time) - + by_locked_at_and_run_at :start_key => [''], :end_key => ['', db_time_now], :limit => limit end # When a worker is exiting, make sure we don't have any locked jobs. def self.clear_locks!(worker_name) - + docs = by_locked_by :startkey => [worker_name], :endkey => [worker_name, {}] + docs.each { |doc| doc.locked_by, doc.locked_at = nil, nil; } + database.bulk_save docs end # Lock this job for this worker. # Returns true if we have the lock, false otherwise. def lock_exclusively!(max_run_time, worker = worker_name) - + locked_at, locked_by = self.class.db_time_now, worker + save end def self.delete_all