Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

file 112 lines (91 sloc) 3.14 kb
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112
require 'ostruct'

# An in-memory backend suitable only for testing. Tries to behave as if it were an ORM.
module Delayed
  module Backend
    module Test
      class Job
        attr_accessor :id
        attr_accessor :priority
        attr_accessor :attempts
        attr_accessor :handler
        attr_accessor :last_error
        attr_accessor :run_at
        attr_accessor :locked_at
        attr_accessor :locked_by
        attr_accessor :failed_at
        attr_accessor :queue
        
        include Delayed::Backend::Base

        cattr_accessor :id
        self.id = 0
        
        def initialize(hash = {})
          self.attempts = 0
          self.priority = 0
          self.id = (self.class.id += 1)
          hash.each{|k,v| send(:"#{k}=", v)}
        end
        
        @jobs = []
        def self.all
          @jobs
        end
        
        def self.count
          all.size
        end
        
        def self.delete_all
          all.clear
        end
        
        def self.create(attrs = {})
          new(attrs).tap do |o|
            o.save
          end
        end
        
        def self.create!(*args); create(*args); end
        
        def self.clear_locks!(worker_name)
          all.select{|j| j.locked_by == worker_name}.each {|j| j.locked_by = nil; j.locked_at = nil}
        end

        # Find a few candidate jobs to run (in case some immediately get locked by others).
        def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
          jobs = all.select do |j|
            j.run_at <= db_time_now &&
            (j.locked_at.nil? || j.locked_at < db_time_now - max_run_time || j.locked_by == worker_name) &&
            !j.failed?
          end
                    
          jobs = jobs.select{|j| Worker.queues.include?(j.queue)} if Worker.queues.any?
          jobs = jobs.select{|j| j.priority >= Worker.min_priority} if Worker.min_priority
          jobs = jobs.select{|j| j.priority <= Worker.max_priority} if Worker.max_priority
          jobs.sort_by{|j| [j.priority, j.run_at]}[0..limit-1]
        end

        # Lock this job for this worker.
        # Returns true if we have the lock, false otherwise.
        def lock_exclusively!(max_run_time, worker)
          now = self.class.db_time_now
          if locked_by != worker
            # We don't own this job so we will update the locked_by name and the locked_at
            self.locked_at = now
            self.locked_by = worker
          end

          return true
        end

        def self.db_time_now
          Time.current
        end
        
        def update_attributes(attrs = {})
          attrs.each{|k,v| send(:"#{k}=", v)}
          save
        end
        
        def destroy
          self.class.all.delete(self)
        end
        
        def save
          self.run_at ||= Time.current
          
          self.class.all << self unless self.class.all.include?(self)
          true
        end
        
        def save!; save; end
               
        def reload
          reset
          self
        end
      end
    end
  end
end
Something went wrong with that request. Please try again.