Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
rbriank committed May 4, 2010
1 parent 3a11caf commit b5aeeec
Showing 1 changed file with 13 additions and 5 deletions.
18 changes: 13 additions & 5 deletions lib/delayed/backend/couch_mapper.rb
Expand Up @@ -35,10 +35,15 @@ class Job < ::CouchRest::ExtendedDocument
property :last_error
timestamps!

view_by :priority, :run_at
view_by :locked_by
view_by(:locked_at, :run_at,
:map => "function(doc){" +
" if(doc['couchrest-type'] == 'Job' && doc.run_at) {" +
" var locked_at = doc.locked_at || '';" +
" emit([locked_at, doc.run_at], null);}" +
" }")

save_callback :before, :set_default_run_at
set_callback :save, :before, :set_default_run_at

def self.after_fork
::CouchRest.server = CouchRest.new('http://localhost:5984')
Expand All @@ -50,18 +55,21 @@ def self.db_time_now
end

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)

by_locked_at_and_run_at :start_key => [''], :end_key => ['', db_time_now], :limit => limit
end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)

docs = by_locked_by :startkey => [worker_name], :endkey => [worker_name, {}]
docs.each { |doc| doc.locked_by, doc.locked_at = nil, nil; }
database.bulk_save docs
end

# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker = worker_name)

locked_at, locked_by = self.class.db_time_now, worker
save
end

def self.delete_all
Expand Down

0 comments on commit b5aeeec

Please sign in to comment.