Skip to content

Commit

Permalink
Initial work in allowing different backends to be used in place of Ac…
Browse files Browse the repository at this point in the history
…tiveRecord
  • Loading branch information
bkeepers authored and iansheridan committed Jun 16, 2010
1 parent 9ecfc3c commit e2e7290
Show file tree
Hide file tree
Showing 9 changed files with 408 additions and 339 deletions.
76 changes: 76 additions & 0 deletions lib/delayed/backend/active_record.rb
@@ -0,0 +1,76 @@
require 'timeout'

module Delayed
module Backend
module ActiveRecord
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ::ActiveRecord::Base
include Delayed::Backend::Base
set_table_name :delayed_jobs

named_scope :ready_to_run, lambda {|worker_name, max_run_time|
{:conditions => ['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name]}
}
named_scope :by_priority, :order => 'priority DESC, run_at ASC'

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
end

# Find a few candidate jobs to run (in case some immediately get locked by others).
def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
scope = self.ready_to_run(worker_name, max_run_time)
scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority

::ActiveRecord::Base.silence do
scope.by_priority.all(:limit => limit)
end
end

# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker)
now = self.class.db_time_now
affected_rows = if locked_by != worker
# We don't own this job so we will update the locked_by name and the locked_at
self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
else
# We already own this job, this may happen if the job queue crashes.
# Simply resume and update the locked_at
self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
end
if affected_rows == 1
self.locked_at = now
self.locked_by = worker
return true
else
return false
end
end

# Unlock this job (note: not saved to DB)
def unlock
self.locked_at = nil
self.locked_by = nil
end

# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have syncronized clocks.
def self.db_time_now
if Time.zone
Time.zone.now
elsif ::ActiveRecord::Base.default_timezone == :utc
Time.now.utc
else
Time.now
end
end

end
end
end
end
92 changes: 92 additions & 0 deletions lib/delayed/backend/base.rb
@@ -0,0 +1,92 @@
module Delayed
module Backend
class DeserializationError < StandardError
end

module Base
def self.included(base)
base.extend ClassMethods
end

module ClassMethods
# Add a job to the queue
def enqueue(*args)
object = args.shift
unless object.respond_to?(:perform)
raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
end

priority = args.first || 0
run_at = args[1]
self.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
end
end

ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/

def failed?
failed_at
end
alias_method :failed, :failed?

def payload_object
@payload_object ||= deserialize(self['handler'])
end

def name
@name ||= begin
payload = payload_object
if payload.respond_to?(:display_name)
payload.display_name
else
payload.class.name
end
end
end

def payload_object=(object)
self['handler'] = object.to_yaml
end

# Moved into its own method so that new_relic can trace it.
def invoke_job
payload_object.perform
end

private

def deserialize(source)
handler = YAML.load(source) rescue nil

unless handler.respond_to?(:perform)
if handler.nil? && source =~ ParseObjectFromYaml
handler_class = $1
end
attempt_to_load(handler_class || handler.class)
handler = YAML.load(source)
end

return handler if handler.respond_to?(:perform)

raise DeserializationError,
'Job failed to load: Unknown handler. Try to manually require the appropriate file.'
rescue TypeError, LoadError, NameError => e
raise DeserializationError,
"Job failed to load: #{e.message}. Try to manually require the required file."
end

# Constantize the object so that ActiveSupport can attempt
# its auto loading magic. Will raise LoadError if not successful.
def attempt_to_load(klass)
klass.constantize
end

protected

def before_save
self.run_at ||= self.class.db_time_now
end

end
end
end
153 changes: 0 additions & 153 deletions lib/delayed/job.rb

This file was deleted.

11 changes: 11 additions & 0 deletions lib/delayed/worker.rb
Expand Up @@ -18,6 +18,17 @@ class Worker

# name_prefix is ignored if name is set directly
attr_accessor :name_prefix

cattr_reader :backend

def self.backend=(backend)
if backend.is_a? Symbol
require "delayed/backend/#{backend}"
backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
end
@@backend = backend
silence_warnings { ::Delayed.const_set(:Job, backend) }
end

def initialize(options={})
@quiet = options[:quiet]
Expand Down
4 changes: 3 additions & 1 deletion lib/delayed_job.rb
Expand Up @@ -2,12 +2,14 @@

require File.dirname(__FILE__) + '/delayed/message_sending'
require File.dirname(__FILE__) + '/delayed/performable_method'
require File.dirname(__FILE__) + '/delayed/job'
require File.dirname(__FILE__) + '/delayed/backend/base'
require File.dirname(__FILE__) + '/delayed/worker'

Object.send(:include, Delayed::MessageSending)
Module.send(:include, Delayed::MessageSending::ClassMethods)

Delayed::Worker.backend = :active_record

if defined?(Merb::Plugins)
Merb::Plugins.add_rakefiles File.dirname(__FILE__) / 'delayed' / 'tasks'
end

0 comments on commit e2e7290

Please sign in to comment.