public
Description: Database based asynchronously priority queue system -- Extracted from Shopify
Homepage: http://www.shopify.com
Clone URL: git://github.com/tobi/delayed_job.git
delayed_job / lib / delayed / job.rb
100644 195 lines (146 sloc) 6.616 kb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
module Delayed
 
  class DeserializationError < StandardError
  end
 
  class Job < ActiveRecord::Base
    set_table_name :delayed_jobs
 
    cattr_accessor :worker_name
    self.worker_name = "pid:#{Process.pid}"
    
    
    NextTaskSQL = '`run_at` <= ? AND (`locked_until` IS NULL OR `locked_until` < ?) OR (`locked_by`=?)'
    NextTaskOrder = 'priority DESC, run_at ASC'
    ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
 
    class LockError < StandardError
    end
 
    def self.clear_locks!
      connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_until`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
    end
      
    def payload_object
      @payload_object ||= deserialize(self['handler'])
    end
  
    def payload_object=(object)
      self['handler'] = object.to_yaml
    end
  
    def reshedule(message, time = nil)
      time ||= Job.db_time_now + (attempts ** 4).seconds + 1
      
      self.attempts += 1
      self.run_at = time
      self.last_error = message
      self.unlock
      save!
    end
    
    
    def self.enqueue(object, priority = 0)
      unless object.respond_to?(:perform)
        raise ArgumentError, 'Cannot enqueue items which do not respond to perform'
      end
    
      Job.create(:payload_object => object, :priority => priority)
    end
    
    def self.find_available(limit = 5)
      time_now = db_time_now
      find(:all, :conditions => [NextTaskSQL, time_now, time_now, worker_name], :order => NextTaskOrder, :limit => 5)
    end
                                                                             
    # Get the payload of the next job we can get an exclusive lock on.
    # If no jobs are left we return nil
    def self.reserve(timeout = 5 * 60)
                    
      # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
      # this leads to a more even distribution of jobs across the worker processes
      find_available(5).each do |job|
        begin
          job.lock_exclusively!(self.db_time_now + timeout, worker_name)
          yield job.payload_object
          job.destroy
          return job
        rescue LockError
          # We did not get the lock, some other worker process must have
          puts "failed to aquire exclusive lock for #{job.id}"
        rescue StandardError => e
          job.reshedule e.message
          return job
        end
      end
 
      nil
    end
 
    # This method is used internally by reserve method to ensure exclusive access
    # to the given job. It will rise a LockError if it cannot get this lock.
    def lock_exclusively!(lock_until, worker = worker_name)
      
      affected_rows = if locked_by != worker
        
        # We don't own this job so we will update the locked_by name and the locked_until
        connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_until`=#{quote_value(lock_until)}, `locked_by`=#{quote_value(worker)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_until`<#{quote_value(self.class.db_time_now)} OR `locked_until` IS NULL)
end_sql
 
      else
        
        # We alrady own this job, this may happen if the job queue crashes.
        # Simply update the lock timeout
        connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
UPDATE #{self.class.table_name}
SET `locked_until`=#{quote_value(lock_until)}
WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
end_sql
 
      end
      
      unless affected_rows == 1
        raise LockError, "Attempted to aquire exclusive lock failed"
      end
      
      self.locked_until = lock_until
      self.locked_by = worker
    end
    
    def unlock
      self.locked_until = nil
      self.locked_by = nil
    end
    
    def self.work_off(num = 100)
      success, failure = 0, 0
      
      num.times do
        
        job = self.reserve do |j|
          begin
            j.perform
            success += 1
          rescue
            failure += 1
            raise
          end
        end
        
        break if job.nil?
      end
      
      return [success, failure]
    end
      
    private
  
    def deserialize(source)
      attempt_to_load_file = true
    
      begin
        handler = YAML.load(source) rescue nil
        return handler if handler.respond_to?(:perform)
      
        if handler.nil?
          if source =~ ParseObjectFromYaml
  
            # Constantize the object so that ActiveSupport can attempt
            # its auto loading magic. Will raise LoadError if not successful.
            attempt_to_load($1)
  
            # If successful, retry the yaml.load
            handler = YAML.load(source)
            return handler if handler.respond_to?(:perform)
          end
        end
      
        if handler.is_a?(YAML::Object)
        
          # Constantize the object so that ActiveSupport can attempt
          # its auto loading magic. Will raise LoadError if not successful.
          attempt_to_load(handler.class)
      
          # If successful, retry the yaml.load
          handler = YAML.load(source)
          return handler if handler.respond_to?(:perform)
        end
                
        raise DeserializationError, 'Job failed to load: Unknown handler. Try to manually require the appropiate file.'
           
      rescue TypeError, LoadError, NameError => e
      
        raise DeserializationError, "Job failed to load: #{e.message}. Try to manually require the required file."
      end
    end
      
    def attempt_to_load(klass)
       klass.constantize
    end
 
    def self.db_time_now
      (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
    end
  
    protected
        
    def before_save
      self.run_at ||= self.class.db_time_now
    end
     
  end
end