Skip to content
Fetching contributors…
Cannot retrieve contributors at this time
100 lines (82 sloc) 3.22 KB
require 'dm-core'
require 'dm-observer'
require 'dm-aggregates'
module DataMapper
module Resource
module ClassMethods
def load_for_delayed_job(id)
find!(id)
end
end
module InstanceMethods
def dump_for_delayed_job
"#{self.class};#{id}"
end
end
end
end
module Delayed
module Backend
module DataMapper
class Job
include ::DataMapper::Resource
include Delayed::Backend::Base
storage_names[:default] = 'delayed_jobs'
property :id, Serial
property :priority, Integer, :default => 0
property :attempts, Integer, :default => 0
property :handler, String
property :run_at, Time
property :locked_at, Time
property :locked_by, String
property :failed_at, Time
property :last_error, String
def self.db_time_now
Time.now.utc
end
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
where = "this.run_at <= new Date(#{db_time_now.to_f * 1000}) && (this.locked_at == null || this.locked_at < new Date(#{(db_time_now - max_run_time).to_f * 1000})) || this.locked_by == #{worker_name.to_json}"
# all(:limit => limit, :failed_at => nil, '$where' => where)
complex = all(:run_at.lte => db_time_now) &
(( all(:locked_at => nil ) | all(:locked_at.lt => db_time_now - max_run_time)) |
all(:locked_by => worker_name));
complex.all( :limit => limit,
:failed_at => nil,
:order => [:priority.asc, :run_at.asc] )
end
# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil)
end
# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
overtime = now - max_run_time
# FIXME - this is a bit gross
# DM doesn't give us the number of rows affected by a collection update
# so we have to circumvent some niceness in DM::Collection here
collection = locked_by != worker ?
(self.class.all(:id => id, :run_at.lte => now) & ( self.class.all(:locked_at => nil) | self.class.all(:locked_at.lt => overtime) ) ) :
self.class.all(:id => id, :locked_by => worker)
attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes
affected_rows = self.repository.update(attributes, collection)
if affected_rows == 1
self.locked_at = now
self.locked_by = worker
return true
else
return false
end
end
end
class JobObserver
include ::DataMapper::Observer
observe Job
before :save do
self.run_at ||= self.class.db_time_now
end
end
end
end
end
Something went wrong with that request. Please try again.