Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'backends'

* backends:
  require active_record in the backend
  Move timeout require to appropriate file
  Refactor PerformableMethod so it's easier to extend with multiple backends
  Remove duplicate class definition in specs
  set Delayed::Worker.logger in specs
  Make performable method work with Mongo
  Update priority in mongo
  Added spec for Job#unlock
  Implement Job.clear_locks! for mongo
  Added spec for Job.clear_locks!
  Initial MongoDB backend.
  Initial work in allowing different backends to be used in place of ActiveRecord
  • Loading branch information...
commit fc4f3f153227d494dff54c1848c204263c454d71 2 parents bac3c4d + 07b427e
Brandon Keepers authored January 30, 2010
80  lib/delayed/backend/active_record.rb
... ...
@@ -0,0 +1,80 @@
  1
+require 'active_record'
  2
+
  3
+class ActiveRecord::Base
  4
+  def self.load_for_delayed_job(id)
  5
+    find(id)
  6
+  end
  7
+  
  8
+  def dump_for_delayed_job
  9
+    "#{self.class};#{id}"
  10
+  end
  11
+end
  12
+
  13
+module Delayed
  14
+  module Backend
  15
+    module ActiveRecord
  16
+      # A job object that is persisted to the database.
  17
+      # Contains the work object as a YAML field.
  18
+      class Job < ::ActiveRecord::Base
  19
+        include Delayed::Backend::Base
  20
+        set_table_name :delayed_jobs
  21
+
  22
+        named_scope :ready_to_run, lambda {|worker_name, max_run_time|
  23
+          {:conditions => ['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name]}
  24
+        }
  25
+        named_scope :by_priority, :order => 'priority ASC, run_at ASC'
  26
+
  27
+        # When a worker is exiting, make sure we don't have any locked jobs.
  28
+        def self.clear_locks!(worker_name)
  29
+          update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
  30
+        end
  31
+
  32
+        # Find a few candidate jobs to run (in case some immediately get locked by others).
  33
+        def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  34
+          scope = self.ready_to_run(worker_name, max_run_time)
  35
+          scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
  36
+          scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
  37
+      
  38
+          ::ActiveRecord::Base.silence do
  39
+            scope.by_priority.all(:limit => limit)
  40
+          end
  41
+        end
  42
+
  43
+        # Lock this job for this worker.
  44
+        # Returns true if we have the lock, false otherwise.
  45
+        def lock_exclusively!(max_run_time, worker)
  46
+          now = self.class.db_time_now
  47
+          affected_rows = if locked_by != worker
  48
+            # We don't own this job so we will update the locked_by name and the locked_at
  49
+            self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
  50
+          else
  51
+            # We already own this job, this may happen if the job queue crashes.
  52
+            # Simply resume and update the locked_at
  53
+            self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
  54
+          end
  55
+          if affected_rows == 1
  56
+            self.locked_at    = now
  57
+            self.locked_by    = worker
  58
+            return true
  59
+          else
  60
+            return false
  61
+          end
  62
+        end
  63
+
  64
+        # Get the current time (GMT or local depending on DB)
  65
+        # Note: This does not ping the DB to get the time, so all your clients
  66
+        # must have syncronized clocks.
  67
+        def self.db_time_now
  68
+          if Time.zone
  69
+            Time.zone.now
  70
+          elsif ::ActiveRecord::Base.default_timezone == :utc
  71
+            Time.now.utc
  72
+          else
  73
+            Time.now
  74
+          end
  75
+        end
  76
+
  77
+      end
  78
+    end
  79
+  end
  80
+end
98  lib/delayed/backend/base.rb
... ...
@@ -0,0 +1,98 @@
  1
+module Delayed
  2
+  module Backend
  3
+    class DeserializationError < StandardError
  4
+    end
  5
+
  6
+    module Base
  7
+      def self.included(base)
  8
+        base.extend ClassMethods
  9
+      end
  10
+      
  11
+      module ClassMethods
  12
+        # Add a job to the queue
  13
+        def enqueue(*args)
  14
+          object = args.shift
  15
+          unless object.respond_to?(:perform)
  16
+            raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
  17
+          end
  18
+    
  19
+          priority = args.first || 0
  20
+          run_at   = args[1]
  21
+          self.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
  22
+        end
  23
+      end
  24
+      
  25
+      ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
  26
+
  27
+      def failed?
  28
+        failed_at
  29
+      end
  30
+      alias_method :failed, :failed?
  31
+
  32
+      def payload_object
  33
+        @payload_object ||= deserialize(self['handler'])
  34
+      end
  35
+
  36
+      def name
  37
+        @name ||= begin
  38
+          payload = payload_object
  39
+          if payload.respond_to?(:display_name)
  40
+            payload.display_name
  41
+          else
  42
+            payload.class.name
  43
+          end
  44
+        end
  45
+      end
  46
+
  47
+      def payload_object=(object)
  48
+        self['handler'] = object.to_yaml
  49
+      end
  50
+      
  51
+      # Moved into its own method so that new_relic can trace it.
  52
+      def invoke_job
  53
+        payload_object.perform
  54
+      end
  55
+      
  56
+      # Unlock this job (note: not saved to DB)
  57
+      def unlock
  58
+        self.locked_at    = nil
  59
+        self.locked_by    = nil
  60
+      end
  61
+      
  62
+    private
  63
+
  64
+      def deserialize(source)
  65
+        handler = YAML.load(source) rescue nil
  66
+
  67
+        unless handler.respond_to?(:perform)
  68
+          if handler.nil? && source =~ ParseObjectFromYaml
  69
+            handler_class = $1
  70
+          end
  71
+          attempt_to_load(handler_class || handler.class)
  72
+          handler = YAML.load(source)
  73
+        end
  74
+
  75
+        return handler if handler.respond_to?(:perform)
  76
+
  77
+        raise DeserializationError,
  78
+          'Job failed to load: Unknown handler. Try to manually require the appropriate file.'
  79
+      rescue TypeError, LoadError, NameError => e
  80
+        raise DeserializationError,
  81
+          "Job failed to load: #{e.message}. Try to manually require the required file."
  82
+      end
  83
+
  84
+      # Constantize the object so that ActiveSupport can attempt
  85
+      # its auto loading magic. Will raise LoadError if not successful.
  86
+      def attempt_to_load(klass)
  87
+         klass.constantize
  88
+      end
  89
+
  90
+    protected
  91
+
  92
+      def before_save
  93
+        self.run_at ||= self.class.db_time_now
  94
+      end
  95
+    
  96
+    end
  97
+  end
  98
+end
110  lib/delayed/backend/mongo.rb
... ...
@@ -0,0 +1,110 @@
  1
+require 'mongo_mapper'
  2
+
  3
+module MongoMapper
  4
+  module Document
  5
+    module ClassMethods
  6
+      def load_for_delayed_job(id)
  7
+        find!(id)
  8
+      end
  9
+    end
  10
+
  11
+    module InstanceMethods
  12
+      def dump_for_delayed_job
  13
+        "#{self.class};#{id}"
  14
+      end
  15
+    end
  16
+  end
  17
+end
  18
+
  19
+module Delayed
  20
+  module Backend
  21
+    module Mongo
  22
+      class Job
  23
+        include MongoMapper::Document
  24
+        include Delayed::Backend::Base
  25
+        set_collection_name 'delayed_jobs'
  26
+        
  27
+        key :priority,    Integer, :default => 0
  28
+        key :attempts,    Integer, :default => 0
  29
+        key :handler,     String
  30
+        key :run_at,      Time
  31
+        key :locked_at,   Time
  32
+        key :locked_by,   String
  33
+        key :failed_at,   Time
  34
+        key :last_error,  String
  35
+        timestamps!
  36
+        
  37
+        before_save :set_default_run_at
  38
+        
  39
+        def self.db_time_now
  40
+          MongoMapper.time_class.now.utc
  41
+        end
  42
+        
  43
+        def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
  44
+          where = "this.run_at <= new Date(#{db_time_now.to_i * 1000}) && (this.locked_at == null || this.locked_at < new Date(#{(db_time_now - max_run_time).to_i * 1000})) || this.locked_by == #{worker_name.to_json}"
  45
+          # all(:limit => limit, :failed_at => nil, '$where' => where)
  46
+          
  47
+          conditions = {
  48
+            '$where' => where,
  49
+            :limit => limit,
  50
+            :failed_at => nil,
  51
+            :sort => [['priority', 1], ['run_at', 1]]
  52
+          }
  53
+          
  54
+          # (conditions[:priority] ||= {})['$gte'] = Worker.min_priority if Worker.min_priority
  55
+          # (conditions[:priority] ||= {})['$lte'] = Worker.max_priority if Worker.max_priority
  56
+
  57
+          all(conditions)
  58
+        end
  59
+        
  60
+        # When a worker is exiting, make sure we don't have any locked jobs.
  61
+        def self.clear_locks!(worker_name)
  62
+          collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
  63
+        end
  64
+        
  65
+        # Lock this job for this worker.
  66
+        # Returns true if we have the lock, false otherwise.
  67
+        def lock_exclusively!(max_run_time, worker = worker_name)
  68
+          now = self.class.db_time_now
  69
+          overtime = make_date(now - max_run_time.to_i)
  70
+          
  71
+          query = "this._id == #{id.to_json} && this.run_at <= #{make_date(now)} && (this.locked_at == null || this.locked_at < #{overtime} || this.locked_by == #{worker.to_json})"
  72
+
  73
+          conditions = {"$where" => make_query(query)}
  74
+          collection.update(conditions, {"$set" => {:locked_at => now, :locked_by => worker}}, :multi => true)
  75
+          affected_rows = collection.find({:_id => id, :locked_by => worker}).count
  76
+          if affected_rows == 1
  77
+            self.locked_at = now
  78
+            self.locked_by = worker
  79
+            return true
  80
+          else
  81
+            return false
  82
+          end
  83
+        end
  84
+        
  85
+      private
  86
+      
  87
+        def self.make_date(date)
  88
+          "new Date(#{date.to_f * 1000})"
  89
+        end
  90
+
  91
+        def make_date(date)
  92
+          self.class.make_date(date)
  93
+        end
  94
+        
  95
+        def self.make_query(string)
  96
+          "function() { return (#{string}); }"
  97
+        end
  98
+
  99
+        def make_query(string)
  100
+          self.class.make_query(string)
  101
+        end
  102
+      
  103
+      
  104
+        def set_default_run_at
  105
+          self.run_at ||= self.class.db_time_now
  106
+        end
  107
+      end
  108
+    end
  109
+  end
  110
+end
153  lib/delayed/job.rb
... ...
@@ -1,153 +0,0 @@
1  
-require 'timeout'
2  
-
3  
-module Delayed
4  
-
5  
-  class DeserializationError < StandardError
6  
-  end
7  
-
8  
-  # A job object that is persisted to the database.
9  
-  # Contains the work object as a YAML field.
10  
-  class Job < ActiveRecord::Base
11  
-    set_table_name :delayed_jobs
12  
-
13  
-    named_scope :ready_to_run, lambda {|worker_name, max_run_time|
14  
-      {:conditions => ['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name]}
15  
-    }
16  
-    named_scope :by_priority, :order => 'priority ASC, run_at ASC'
17  
-    
18  
-    ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
19  
-
20  
-    # When a worker is exiting, make sure we don't have any locked jobs.
21  
-    def self.clear_locks!(worker_name)
22  
-      update_all("locked_by = null, locked_at = null", ["locked_by = ?", worker_name])
23  
-    end
24  
-
25  
-    def failed?
26  
-      failed_at
27  
-    end
28  
-    alias_method :failed, :failed?
29  
-
30  
-    def payload_object
31  
-      @payload_object ||= deserialize(self['handler'])
32  
-    end
33  
-
34  
-    def name
35  
-      @name ||= begin
36  
-        payload = payload_object
37  
-        if payload.respond_to?(:display_name)
38  
-          payload.display_name
39  
-        else
40  
-          payload.class.name
41  
-        end
42  
-      end
43  
-    end
44  
-
45  
-    def payload_object=(object)
46  
-      self['handler'] = object.to_yaml
47  
-    end
48  
-
49  
-    # Add a job to the queue
50  
-    def self.enqueue(*args)
51  
-      object = args.shift
52  
-      unless object.respond_to?(:perform)
53  
-        raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
54  
-      end
55  
-    
56  
-      priority = args.first || 0
57  
-      run_at   = args[1]
58  
-
59  
-      Job.create(:payload_object => object, :priority => priority.to_i, :run_at => run_at)
60  
-    end
61  
-
62  
-    # Find a few candidate jobs to run (in case some immediately get locked by others).
63  
-    def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
64  
-      scope = self.ready_to_run(worker_name, max_run_time)
65  
-      scope = scope.scoped(:conditions => ['priority >= ?', Worker.min_priority]) if Worker.min_priority
66  
-      scope = scope.scoped(:conditions => ['priority <= ?', Worker.max_priority]) if Worker.max_priority
67  
-      
68  
-      ActiveRecord::Base.silence do
69  
-        scope.by_priority.all(:limit => limit)
70  
-      end
71  
-    end
72  
-
73  
-    # Lock this job for this worker.
74  
-    # Returns true if we have the lock, false otherwise.
75  
-    def lock_exclusively!(max_run_time, worker)
76  
-      now = self.class.db_time_now
77  
-      affected_rows = if locked_by != worker
78  
-        # We don't own this job so we will update the locked_by name and the locked_at
79  
-        self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and (locked_at is null or locked_at < ?) and (run_at <= ?)", id, (now - max_run_time.to_i), now])
80  
-      else
81  
-        # We already own this job, this may happen if the job queue crashes.
82  
-        # Simply resume and update the locked_at
83  
-        self.class.update_all(["locked_at = ?", now], ["id = ? and locked_by = ?", id, worker])
84  
-      end
85  
-      if affected_rows == 1
86  
-        self.locked_at    = now
87  
-        self.locked_by    = worker
88  
-        return true
89  
-      else
90  
-        return false
91  
-      end
92  
-    end
93  
-
94  
-    # Unlock this job (note: not saved to DB)
95  
-    def unlock
96  
-      self.locked_at    = nil
97  
-      self.locked_by    = nil
98  
-    end
99  
-
100  
-    # Moved into its own method so that new_relic can trace it.
101  
-    def invoke_job
102  
-      payload_object.perform
103  
-    end
104  
-
105  
-  private
106  
-
107  
-    def deserialize(source)
108  
-      handler = YAML.load(source) rescue nil
109  
-
110  
-      unless handler.respond_to?(:perform)
111  
-        if handler.nil? && source =~ ParseObjectFromYaml
112  
-          handler_class = $1
113  
-        end
114  
-        attempt_to_load(handler_class || handler.class)
115  
-        handler = YAML.load(source)
116  
-      end
117  
-
118  
-      return handler if handler.respond_to?(:perform)
119  
-
120  
-      raise DeserializationError,
121  
-        'Job failed to load: Unknown handler. Try to manually require the appropriate file.'
122  
-    rescue TypeError, LoadError, NameError => e
123  
-      raise DeserializationError,
124  
-        "Job failed to load: #{e.message}. Try to manually require the required file."
125  
-    end
126  
-
127  
-    # Constantize the object so that ActiveSupport can attempt
128  
-    # its auto loading magic. Will raise LoadError if not successful.
129  
-    def attempt_to_load(klass)
130  
-       klass.constantize
131  
-    end
132  
-
133  
-    # Get the current time (GMT or local depending on DB)
134  
-    # Note: This does not ping the DB to get the time, so all your clients
135  
-    # must have syncronized clocks.
136  
-    def self.db_time_now
137  
-      if Time.zone
138  
-        Time.zone.now
139  
-      elsif ActiveRecord::Base.default_timezone == :utc
140  
-        Time.now.utc
141  
-      else
142  
-        Time.now
143  
-      end
144  
-    end
145  
-
146  
-  protected
147  
-
148  
-    def before_save
149  
-      self.run_at ||= self.class.db_time_now
150  
-    end
151  
-
152  
-  end
153  
-end
67  lib/delayed/performable_method.rb
... ...
@@ -1,7 +1,19 @@
  1
+class Class
  2
+  def load_for_delayed_job(arg)
  3
+    self
  4
+  end
  5
+  
  6
+  def dump_for_delayed_job
  7
+    name
  8
+  end
  9
+end
  10
+
1 11
 module Delayed
2 12
   class PerformableMethod < Struct.new(:object, :method, :args)
3  
-    CLASS_STRING_FORMAT = /^CLASS\:([A-Z][\w\:]+)$/
4  
-    AR_STRING_FORMAT    = /^AR\:([A-Z][\w\:]+)\:(\d+)$/
  13
+    STRING_FORMAT = /^LOAD\;([A-Z][\w\:]+)(?:\;(\w+))?$/
  14
+    
  15
+    class LoadError < StandardError
  16
+    end
5 17
 
6 18
     def initialize(object, method, args)
7 19
       raise NoMethodError, "undefined method `#{method}' for #{self.inspect}" unless object.respond_to?(method)
@@ -11,45 +23,40 @@ def initialize(object, method, args)
11 23
       self.method = method.to_sym
12 24
     end
13 25
     
14  
-    def display_name  
15  
-      case self.object
16  
-      when CLASS_STRING_FORMAT then "#{$1}.#{method}"
17  
-      when AR_STRING_FORMAT    then "#{$1}##{method}"
18  
-      else "Unknown##{method}"
19  
-      end      
20  
-    end    
21  
-
  26
+    def display_name
  27
+      if STRING_FORMAT === object
  28
+        "#{$1}#{$2 ? '#' : '.'}#{method}"
  29
+      else
  30
+        "#{object.class}##{method}"
  31
+      end
  32
+    end
  33
+    
22 34
     def perform
23 35
       load(object).send(method, *args.map{|a| load(a)})
24  
-    rescue ActiveRecord::RecordNotFound
25  
-      # We cannot do anything about objects which were deleted in the meantime
  36
+    rescue PerformableMethod::LoadError
  37
+      # We cannot do anything about objects that can't be loaded
26 38
       true
27 39
     end
28 40
 
29 41
     private
30 42
 
31  
-    def load(arg)
32  
-      case arg
33  
-      when CLASS_STRING_FORMAT then $1.constantize
34  
-      when AR_STRING_FORMAT    then $1.constantize.find($2)
35  
-      else arg
  43
+    def load(obj)
  44
+      if STRING_FORMAT === obj
  45
+        $1.constantize.load_for_delayed_job($2)
  46
+      else
  47
+        obj
36 48
       end
  49
+    rescue => e
  50
+      Delayed::Worker.logger.warn "Could not load object for job: #{e.message}"
  51
+      raise PerformableMethod::LoadError
37 52
     end
38 53
 
39  
-    def dump(arg)
40  
-      case arg
41  
-      when Class              then class_to_string(arg)
42  
-      when ActiveRecord::Base then ar_to_string(arg)
43  
-      else arg
  54
+    def dump(obj)
  55
+      if obj.respond_to?(:dump_for_delayed_job)
  56
+        "LOAD;#{obj.dump_for_delayed_job}"
  57
+      else
  58
+        obj
44 59
       end
45 60
     end
46  
-
47  
-    def ar_to_string(obj)
48  
-      "AR:#{obj.class}:#{obj.id}"
49  
-    end
50  
-
51  
-    def class_to_string(obj)
52  
-      "CLASS:#{obj.name}"
53  
-    end
54 61
   end
55 62
 end
13  lib/delayed/worker.rb
... ...
@@ -1,3 +1,5 @@
  1
+require 'timeout'
  2
+
1 3
 module Delayed
2 4
   class Worker
3 5
     cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :sleep_delay, :logger
@@ -18,6 +20,17 @@ class Worker
18 20
 
19 21
     # name_prefix is ignored if name is set directly
20 22
     attr_accessor :name_prefix
  23
+    
  24
+    cattr_reader :backend
  25
+    
  26
+    def self.backend=(backend)
  27
+      if backend.is_a? Symbol
  28
+        require "delayed/backend/#{backend}"
  29
+        backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
  30
+      end
  31
+      @@backend = backend
  32
+      silence_warnings { ::Delayed.const_set(:Job, backend) }
  33
+    end
21 34
 
22 35
     def initialize(options={})
23 36
       @quiet = options[:quiet]
6  lib/delayed_job.rb
... ...
@@ -1,13 +1,13 @@
1  
-autoload :ActiveRecord, 'activerecord'
2  
-
3 1
 require File.dirname(__FILE__) + '/delayed/message_sending'
4 2
 require File.dirname(__FILE__) + '/delayed/performable_method'
5  
-require File.dirname(__FILE__) + '/delayed/job'
  3
+require File.dirname(__FILE__) + '/delayed/backend/base'
6 4
 require File.dirname(__FILE__) + '/delayed/worker'
7 5
 
8 6
 Object.send(:include, Delayed::MessageSending)   
9 7
 Module.send(:include, Delayed::MessageSending::ClassMethods)
10 8
 
  9
+Delayed::Worker.backend = :active_record
  10
+
11 11
 if defined?(Merb::Plugins)
12 12
   Merb::Plugins.add_rakefiles File.dirname(__FILE__) / 'delayed' / 'tasks'
13 13
 end
25  spec/delayed_method_spec.rb
@@ -11,23 +11,6 @@ def say_hello
11 11
   end
12 12
 end
13 13
 
14  
-class ErrorObject
15  
-
16  
-  def throw
17  
-    raise ActiveRecord::RecordNotFound, '...'
18  
-    false
19  
-  end
20  
-
21  
-end
22  
-
23  
-class StoryReader
24  
-
25  
-  def read(story)
26  
-    "Epilog: #{story.tell}"
27  
-  end
28  
-
29  
-end
30  
-
31 14
 class StoryReader
32 15
 
33 16
   def read(story)
@@ -73,7 +56,9 @@ def read(story)
73 56
   end
74 57
 
75 58
   it "should ignore ActiveRecord::RecordNotFound errors because they are permanent" do
76  
-    job = ErrorObject.new.send_later(:throw)
  59
+    story = Story.create :text => 'Once upon...'
  60
+    job = story.send_later(:tell)
  61
+    story.destroy
77 62
     lambda { job.invoke_job }.should_not raise_error
78 63
   end
79 64
 
@@ -83,7 +68,7 @@ def read(story)
83 68
 
84 69
     job =  Delayed::Job.find(:first)
85 70
     job.payload_object.class.should   == Delayed::PerformableMethod
86  
-    job.payload_object.object.should  == "AR:Story:#{story.id}"
  71
+    job.payload_object.object.should  == "LOAD;Story;#{story.id}"
87 72
     job.payload_object.method.should  == :tell
88 73
     job.payload_object.args.should    == []
89 74
     job.payload_object.perform.should == 'Once upon...'
@@ -99,7 +84,7 @@ def read(story)
99 84
     job =  Delayed::Job.find(:first)
100 85
     job.payload_object.class.should   == Delayed::PerformableMethod
101 86
     job.payload_object.method.should  == :read
102  
-    job.payload_object.args.should    == ["AR:Story:#{story.id}"]
  87
+    job.payload_object.args.should    == ["LOAD;Story;#{story.id}"]
103 88
     job.payload_object.perform.should == 'Epilog: Once upon...'
104 89
   end                 
105 90
   
191  spec/job_spec.rb
... ...
@@ -1,198 +1,19 @@
1 1
 require 'spec_helper'
2 2
 
3 3
 describe Delayed::Job do
4  
-  before  do               
5  
-    Delayed::Worker.max_priority = nil
6  
-    Delayed::Worker.min_priority = nil
7  
-    
8  
-    Delayed::Job.delete_all
  4
+  before(:all) do
  5
+    @backend = Delayed::Job
9 6
   end
10 7
   
11 8
   before(:each) do
  9
+    Delayed::Worker.max_priority = nil
  10
+    Delayed::Worker.min_priority = nil
  11
+    Delayed::Job.delete_all
12 12
     SimpleJob.runs = 0
13 13
   end
14  
-
15  
-  it "should set run_at automatically if not set" do
16  
-    Delayed::Job.create(:payload_object => ErrorJob.new ).run_at.should_not == nil
17  
-  end
18  
-
19  
-  it "should not set run_at automatically if already set" do
20  
-    later = 5.minutes.from_now
21  
-    Delayed::Job.create(:payload_object => ErrorJob.new, :run_at => later).run_at.should == later
22  
-  end
23  
-
24  
-  it "should raise ArgumentError when handler doesn't respond_to :perform" do
25  
-    lambda { Delayed::Job.enqueue(Object.new) }.should raise_error(ArgumentError)
26  
-  end
27  
-
28  
-  it "should increase count after enqueuing items" do
29  
-    Delayed::Job.enqueue SimpleJob.new
30  
-    Delayed::Job.count.should == 1
31  
-  end
32  
-
33  
-  it "should be able to set priority when enqueuing items" do
34  
-    Delayed::Job.enqueue SimpleJob.new, 5
35  
-    Delayed::Job.first.priority.should == 5
36  
-  end
37  
-
38  
-  it "should be able to set run_at when enqueuing items" do
39  
-    later = (Delayed::Job.db_time_now+5.minutes)
40  
-    Delayed::Job.enqueue SimpleJob.new, 5, later
41  
-
42  
-    # use be close rather than equal to because millisecond values cn be lost in DB round trip
43  
-    Delayed::Job.first.run_at.should be_close(later, 1)
44  
-  end
45  
-
46  
-  it "should work with jobs in modules" do
47  
-    job = Delayed::Job.enqueue M::ModuleJob.new
48  
-    lambda { job.invoke_job }.should change { M::ModuleJob.runs }.from(0).to(1)
49  
-  end
50  
-                   
51  
-  it "should raise an DeserializationError when the job class is totally unknown" do
52  
-
53  
-    job = Delayed::Job.new
54  
-    job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
55  
-
56  
-    lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
57  
-  end
58  
-
59  
-  it "should try to load the class when it is unknown at the time of the deserialization" do
60  
-    job = Delayed::Job.new
61  
-    job['handler'] = "--- !ruby/object:JobThatDoesNotExist {}"
62  
-
63  
-    job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
64  
-
65  
-    lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
66  
-  end
67  
-
68  
-  it "should try include the namespace when loading unknown objects" do
69  
-    job = Delayed::Job.new
70  
-    job['handler'] = "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
71  
-    job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
72  
-    lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
73  
-  end
74  
-
75  
-  it "should also try to load structs when they are unknown (raises TypeError)" do
76  
-    job = Delayed::Job.new
77  
-    job['handler'] = "--- !ruby/struct:JobThatDoesNotExist {}"
78  
-
79  
-    job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
80  
-
81  
-    lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
82  
-  end
83  
-
84  
-  it "should try include the namespace when loading unknown structs" do
85  
-    job = Delayed::Job.new
86  
-    job['handler'] = "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
87  
-
88  
-    job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
89  
-    lambda { job.payload_object.perform }.should raise_error(Delayed::DeserializationError)
90  
-  end
91 14
   
92  
-  it "should never find failed jobs" do
93  
-    @job = Delayed::Job.create :payload_object => SimpleJob.new, :attempts => 50, :failed_at => Delayed::Job.db_time_now
94  
-    Delayed::Job.find_available('worker', 1).length.should == 0
95  
-  end
96  
-
97  
-  context "when another worker is already performing an task, it" do
98  
-
99  
-    before :each do
100  
-      @job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Delayed::Job.db_time_now - 5.minutes
101  
-    end
102  
-
103  
-    it "should not allow a second worker to get exclusive access" do
104  
-      @job.lock_exclusively!(4.hours, 'worker2').should == false
105  
-    end
106  
-
107  
-    it "should allow a second worker to get exclusive access if the timeout has passed" do
108  
-      @job.lock_exclusively!(1.minute, 'worker2').should == true
109  
-    end      
110  
-    
111  
-    it "should be able to get access to the task if it was started more then max_age ago" do
112  
-      @job.locked_at = 5.hours.ago
113  
-      @job.save
114  
-
115  
-      @job.lock_exclusively! 4.hours, 'worker2'
116  
-      @job.reload
117  
-      @job.locked_by.should == 'worker2'
118  
-      @job.locked_at.should > 1.minute.ago
119  
-    end
120  
-
121  
-    it "should not be found by another worker" do
122  
-      Delayed::Job.find_available('worker2', 1, 6.minutes).length.should == 0
123  
-    end
124  
-
125  
-    it "should be found by another worker if the time has expired" do
126  
-      Delayed::Job.find_available('worker2', 1, 4.minutes).length.should == 1
127  
-    end
128  
-
129  
-    it "should be able to get exclusive access again when the worker name is the same" do
130  
-      @job.lock_exclusively!(5.minutes, 'worker1').should be_true
131  
-      @job.lock_exclusively!(5.minutes, 'worker1').should be_true
132  
-      @job.lock_exclusively!(5.minutes, 'worker1').should be_true
133  
-    end                                        
134  
-  end
135  
-  
136  
-  context "when another worker has worked on a task since the job was found to be available, it" do
137  
-
138  
-    before :each do
139  
-      @job = Delayed::Job.create :payload_object => SimpleJob.new
140  
-      @job_copy_for_worker_2 = Delayed::Job.find(@job.id)
141  
-    end
  15
+  it_should_behave_like 'a backend'
142 16
 
143  
-    it "should not allow a second worker to get exclusive access if already successfully processed by worker1" do
144  
-      @job.delete
145  
-      @job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
146  
-    end
147  
-
148  
-    it "should not allow a second worker to get exclusive access if failed to be processed by worker1 and run_at time is now in future (due to backing off behaviour)" do
149  
-      @job.update_attributes(:attempts => 1, :run_at => 1.day.from_now)
150  
-      @job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
151  
-    end
152  
-  end
153  
-
154  
-  context "#name" do
155  
-    it "should be the class name of the job that was enqueued" do
156  
-      Delayed::Job.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
157  
-    end
158  
-
159  
-    it "should be the method that will be called if its a performable method object" do
160  
-      Delayed::Job.send_later(:clear_locks!)
161  
-      Delayed::Job.last.name.should == 'Delayed::Job.clear_locks!'
162  
-
163  
-    end
164  
-    it "should be the instance method that will be called if its a performable method object" do
165  
-      story = Story.create :text => "..."                 
166  
-      
167  
-      story.send_later(:save)
168  
-      
169  
-      Delayed::Job.last.name.should == 'Story#save'
170  
-    end
171  
-  end
172  
-  
173  
-  context "worker prioritization" do
174  
-    
175  
-    before(:each) do
176  
-      Delayed::Worker.max_priority = nil
177  
-      Delayed::Worker.min_priority = nil
178  
-    end
179  
-
180  
-    it "should fetch jobs ordered by priority" do
181  
-      number_of_jobs = 10
182  
-      number_of_jobs.times { Delayed::Job.enqueue SimpleJob.new, rand(10) }
183  
-      jobs = Delayed::Job.find_available('worker', 10)
184  
-      ordered = true
185  
-      jobs[1..-1].each_index{ |i| 
186  
-        if (jobs[i].priority > jobs[i+1].priority)
187  
-          ordered = false
188  
-          break
189  
-        end
190  
-      }
191  
-      ordered.should == true
192  
-    end
193  
-   
194  
-  end
195  
-  
196 17
   context "db_time_now" do
197 18
     it "should return time in current time zone if set" do
198 19
       Time.zone = 'Eastern Time (US & Canada)'
62  spec/mongo_job_spec.rb
... ...
@@ -0,0 +1,62 @@
  1
+require 'spec_helper'
  2
+
  3
+require 'delayed/backend/mongo'
  4
+
  5
+MongoMapper.connection = Mongo::Connection.new nil, nil, :logger => ActiveRecord::Base.logger
  6
+MongoMapper.database = 'delayed_job'
  7
+
  8
+describe Delayed::Backend::Mongo::Job do
  9
+  before(:all) do
  10
+    @backend = Delayed::Backend::Mongo::Job
  11
+  end
  12
+  
  13
+  before(:each) do
  14
+    MongoMapper.database.collections.each(&:remove)
  15
+  end
  16
+  
  17
+  it_should_behave_like 'a backend'
  18
+  
  19
+  describe "delayed method" do
  20
+    class MongoStoryReader
  21
+      def read(story)
  22
+        "Epilog: #{story.tell}"
  23
+      end
  24
+    end
  25
+    
  26
+    class MongoStory
  27
+      include MongoMapper::Document
  28
+      key :text, String
  29
+      
  30
+      def tell
  31
+        text
  32
+      end
  33
+    end
  34
+    
  35
+    it "should ignore not found errors because they are permanent" do
  36
+      story = MongoStory.create :text => 'Once upon a time…'
  37
+      job = story.send_later(:tell)
  38
+      story.destroy
  39
+      lambda { job.invoke_job }.should_not raise_error
  40
+    end
  41
+
  42
+    it "should store the object as string" do
  43
+      story = MongoStory.create :text => 'Once upon a time…'
  44
+      job = story.send_later(:tell)
  45
+
  46
+      job.payload_object.class.should   == Delayed::PerformableMethod
  47
+      job.payload_object.object.should  == "LOAD;MongoStory;#{story.id}"
  48
+      job.payload_object.method.should  == :tell
  49
+      job.payload_object.args.should    == []
  50
+      job.payload_object.perform.should == 'Once upon a time…'
  51
+    end
  52
+
  53
+    it "should store arguments as string" do
  54
+      story = MongoStory.create :text => 'Once upon a time…'
  55
+      job = MongoStoryReader.new.send_later(:read, story)
  56
+      job.payload_object.class.should   == Delayed::PerformableMethod
  57
+      job.payload_object.method.should  == :read
  58
+      job.payload_object.args.should    == ["LOAD;MongoStory;#{story.id}"]
  59
+      job.payload_object.perform.should == 'Epilog: Once upon a time…'
  60
+    end
  61
+  end
  62
+end
228  spec/shared_backend_spec.rb
... ...
@@ -0,0 +1,228 @@
  1
+shared_examples_for 'a backend' do
  2
+  def create_job(opts = {})
  3
+    @backend.create(opts.merge(:payload_object => SimpleJob.new))
  4
+  end
  5
+
  6
+  before do
  7
+    SimpleJob.runs = 0
  8
+  end
  9
+  
  10
+  it "should set run_at automatically if not set" do
  11
+    @backend.create(:payload_object => ErrorJob.new ).run_at.should_not be_nil
  12
+  end
  13
+
  14
+  it "should not set run_at automatically if already set" do
  15
+    later = @backend.db_time_now + 5.minutes
  16
+    @backend.create(:payload_object => ErrorJob.new, :run_at => later).run_at.should be_close(later, 1)
  17
+  end
  18
+
  19
+  it "should raise ArgumentError when handler doesn't respond_to :perform" do
  20
+    lambda { @backend.enqueue(Object.new) }.should raise_error(ArgumentError)
  21
+  end
  22
+
  23
+  it "should increase count after enqueuing items" do
  24
+    @backend.enqueue SimpleJob.new
  25
+    @backend.count.should == 1
  26
+  end
  27
+  
  28
+  it "should be able to set priority when enqueuing items" do
  29
+    @job = @backend.enqueue SimpleJob.new, 5
  30
+    @job.priority.should == 5
  31
+  end
  32
+
  33
+  it "should be able to set run_at when enqueuing items" do
  34
+    later = @backend.db_time_now + 5.minutes
  35
+    @job = @backend.enqueue SimpleJob.new, 5, later
  36
+    @job.run_at.should be_close(later, 1)
  37
+  end
  38
+
  39
+  it "should work with jobs in modules" do
  40
+    M::ModuleJob.runs = 0
  41
+    job = @backend.enqueue M::ModuleJob.new
  42
+    lambda { job.invoke_job }.should change { M::ModuleJob.runs }.from(0).to(1)
  43
+  end
  44
+                   
  45
+  it "should raise an DeserializationError when the job class is totally unknown" do
  46
+    job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
  47
+    lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
  48
+  end
  49
+
  50
+  it "should try to load the class when it is unknown at the time of the deserialization" do
  51
+    job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
  52
+    job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
  53
+    lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
  54
+  end
  55
+
  56
+  it "should try include the namespace when loading unknown objects" do
  57
+    job = @backend.new :handler => "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
  58
+    job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
  59
+    lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
  60
+  end
  61
+
  62
+  it "should also try to load structs when they are unknown (raises TypeError)" do
  63
+    job = @backend.new :handler => "--- !ruby/struct:JobThatDoesNotExist {}"
  64
+    job.should_receive(:attempt_to_load).with('JobThatDoesNotExist').and_return(true)
  65
+    lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
  66
+  end
  67
+
  68
+  it "should try include the namespace when loading unknown structs" do
  69
+    job = @backend.new :handler => "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
  70
+    job.should_receive(:attempt_to_load).with('Delayed::JobThatDoesNotExist').and_return(true)
  71
+    lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
  72
+  end
  73
+  
  74
+  describe "find_available" do
  75
+    it "should not find failed jobs" do
  76
+      @job = create_job :attempts => 50, :failed_at => @backend.db_time_now
  77
+      @backend.find_available('worker', 5, 1.second).should_not include(@job)
  78
+    end
  79
+    
  80
+    it "should not find jobs scheduled for the future" do
  81
+      @job = create_job :run_at => (@backend.db_time_now + 1.minute)
  82
+      @backend.find_available('worker', 5, 4.hours).should_not include(@job)
  83
+    end
  84
+    
  85
+    it "should not find jobs locked by another worker" do
  86
+      @job = create_job(:locked_by => 'other_worker', :locked_at => @backend.db_time_now - 1.minute)
  87
+      @backend.find_available('worker', 5, 4.hours).should_not include(@job)
  88
+    end
  89
+    
  90
+    it "should find open jobs" do
  91
+      @job = create_job
  92
+      @backend.find_available('worker', 5, 4.hours).should include(@job)
  93
+    end
  94
+    
  95
+    it "should find expired jobs" do
  96
+      @job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now - 2.minutes)
  97
+      @backend.find_available('worker', 5, 1.minute).should include(@job)
  98
+    end
  99
+    
  100
+    it "should find own jobs" do
  101
+      @job = create_job(:locked_by => 'worker', :locked_at => (@backend.db_time_now - 1.minutes))
  102
+      @backend.find_available('worker', 5, 4.hours).should include(@job)
  103
+    end
  104
+  end
  105
+  
  106
+  context "when another worker is already performing an task, it" do
  107
+
  108
+    before :each do
  109
+      @job = @backend.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => @backend.db_time_now - 5.minutes
  110
+    end
  111
+
  112
+    it "should not allow a second worker to get exclusive access" do
  113
+      @job.lock_exclusively!(4.hours, 'worker2').should == false
  114
+    end
  115
+
  116
+    it "should allow a second worker to get exclusive access if the timeout has passed" do
  117
+      @job.lock_exclusively!(1.minute, 'worker2').should == true
  118
+    end      
  119
+    
  120
+    it "should be able to get access to the task if it was started more then max_age ago" do
  121
+      @job.locked_at = 5.hours.ago
  122
+      @job.save
  123
+
  124
+      @job.lock_exclusively! 4.hours, 'worker2'
  125
+      @job.reload
  126
+      @job.locked_by.should == 'worker2'
  127
+      @job.locked_at.should > 1.minute.ago
  128
+    end
  129
+
  130
+    it "should not be found by another worker" do
  131
+      @backend.find_available('worker2', 1, 6.minutes).length.should == 0
  132
+    end
  133
+
  134
+    it "should be found by another worker if the time has expired" do
  135
+      @backend.find_available('worker2', 1, 4.minutes).length.should == 1
  136
+    end
  137
+
  138
+    it "should be able to get exclusive access again when the worker name is the same" do
  139
+      @job.lock_exclusively!(5.minutes, 'worker1').should be_true
  140
+      @job.lock_exclusively!(5.minutes, 'worker1').should be_true
  141
+      @job.lock_exclusively!(5.minutes, 'worker1').should be_true
  142
+    end                                        
  143
+  end
  144
+  
  145
+  context "when another worker has worked on a task since the job was found to be available, it" do
  146
+
  147
+    before :each do
  148
+      @job = @backend.create :payload_object => SimpleJob.new
  149
+      @job_copy_for_worker_2 = @backend.find(@job.id)
  150
+    end
  151
+
  152
+    it "should not allow a second worker to get exclusive access if already successfully processed by worker1" do
  153
+      @job.destroy
  154
+      @job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
  155
+    end
  156
+
  157
+    it "should not allow a second worker to get exclusive access if failed to be processed by worker1 and run_at time is now in future (due to backing off behaviour)" do
  158
+      @job.update_attributes(:attempts => 1, :run_at => 1.day.from_now)
  159
+      @job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
  160
+    end
  161
+  end
  162
+
  163
+  context "#name" do
  164
+    it "should be the class name of the job that was enqueued" do
  165
+      @backend.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
  166
+    end
  167
+
  168
+    it "should be the method that will be called if its a performable method object" do
  169
+      @job = Story.send_later(:create)
  170
+      @job.name.should == "Story.create"
  171
+    end
  172
+
  173
+    it "should be the instance method that will be called if its a performable method object" do
  174
+      @job = Story.create(:text => "...").send_later(:save)
  175
+      @job.name.should == 'Story#save'
  176
+    end
  177
+  end
  178
+  
  179
+  context "worker prioritization" do
  180
+    before(:each) do
  181
+      Delayed::Worker.max_priority = nil
  182
+      Delayed::Worker.min_priority = nil
  183
+    end
  184
+
  185
+    it "should fetch jobs ordered by priority" do
  186
+      number_of_jobs = 10
  187
+      number_of_jobs.times { @backend.enqueue SimpleJob.new, rand(10) }
  188
+      jobs = @backend.find_available('worker', 10)
  189
+      ordered = true
  190
+      jobs[1..-1].each_index{ |i| 
  191
+        if (jobs[i].priority > jobs[i+1].priority)
  192
+          ordered = false
  193
+          break
  194
+        end
  195
+      }
  196
+      ordered.should == true
  197
+    end
  198
+  end
  199
+  
  200
+  context "clear_locks!" do
  201
+    before do
  202
+      @job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now)
  203
+    end
  204
+    
  205
+    it "should clear locks for the given worker" do
  206
+      @backend.clear_locks!('worker')
  207
+      @backend.find_available('worker2', 5, 1.minute).should include(@job)
  208
+    end
  209
+    
  210
+    it "should not clear locks for other workers" do
  211
+      @backend.clear_locks!('worker1')
  212
+      @backend.find_available('worker1', 5, 1.minute).should_not include(@job)
  213
+    end
  214
+  end
  215
+  
  216
+  context "unlock" do
  217
+    before do
  218
+      @job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now)
  219
+    end
  220
+
  221
+    it "should clear locks" do
  222
+      @job.unlock
  223
+      @job.locked_by.should be_nil
  224
+      @job.locked_at.should be_nil
  225
+    end
  226
+  end
  227
+  
  228
+end
7  spec/spec_helper.rb
@@ -4,8 +4,10 @@
4 4
 require 'spec'
5 5
 require 'active_record'
6 6
 require 'delayed_job'
7  
-  
8  
-ActiveRecord::Base.logger = Logger.new('/tmp/dj.log')
  7
+
  8
+logger = Logger.new('/tmp/dj.log')
  9
+ActiveRecord::Base.logger = logger
  10
+Delayed::Worker.logger = logger
9 11
 ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => ':memory:')
10 12
 ActiveRecord::Migration.verbose = false
11 13
 
@@ -38,3 +40,4 @@ def whatever(n, _); tell*n; end
38 40
 end
39 41
 
40 42
 require 'sample_jobs'
  43
+require 'shared_backend_spec'
17  spec/worker_spec.rb
@@ -10,6 +10,9 @@ def job_create(opts = {})
10 10
   end
11 11
 
12 12
   before(:each) do