<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array"/>
  <modified type="array">
    <modified>
      <diff>@@ -3,6 +3,8 @@ module Delayed
   class DeserializationError &lt; StandardError
   end
 
+  # A job object that is persisted to the database.
+  # Contains the work object as a YAML field.
   class Job &lt; ActiveRecord::Base
     MAX_ATTEMPTS = 25
     MAX_RUN_TIME = 4.hours
@@ -29,9 +31,7 @@ module Delayed
     self.min_priority = nil
     self.max_priority = nil
 
-    class LockError &lt; StandardError
-    end
-
+    # When a worker is exiting, make sure we don't have any locked jobs.
     def self.clear_locks!
       update_all(&quot;locked_by = null, locked_at = null&quot;, [&quot;locked_by = ?&quot;, worker_name])
     end
@@ -60,6 +60,8 @@ module Delayed
       self['handler'] = object.to_yaml
     end
 
+    # Reschedule the job in the future (when a job fails).
+    # Uses an exponential scale depending on the number of failed attempts.
     def reschedule(message, backtrace = [], time = nil)
       if self.attempts &lt; MAX_ATTEMPTS
         time ||= Job.db_time_now + (attempts ** 4) + 5
@@ -75,6 +77,32 @@ module Delayed
       end
     end
 
+
+    # Try to run one job. Returns true/false (work done/work failed) or nil if job can't be locked.
+    def run_with_lock(max_run_time, worker_name)
+      logger.info &quot;* [JOB] aquiring lock on #{name}&quot;
+      unless lock_exclusively!(max_run_time, worker_name)
+        # We did not get the lock, some other worker process must have
+        logger.warn &quot;* [JOB] failed to aquire exclusive lock for #{name}&quot;
+        return nil # no work done
+      end
+
+      begin
+        runtime =  Benchmark.realtime do
+          invoke_job # TODO: raise error if takes longer than max_run_time
+          destroy
+        end
+        # TODO: warn if runtime &gt; max_run_time ?
+        logger.info &quot;* [JOB] #{name} completed after %.4f&quot; % runtime
+        return true  # did work
+      rescue Exception =&gt; e
+        reschedule e.message, e.backtrace
+        log_exception(e)
+        return false  # work failed
+      end
+    end
+
+    # Add a job to the queue
     def self.enqueue(*args, &amp;block)
       object = block_given? ? EvaledJob.new(&amp;block) : args.shift
 
@@ -88,6 +116,8 @@ module Delayed
       Job.create(:payload_object =&gt; object, :priority =&gt; priority.to_i, :run_at =&gt; run_at)
     end
 
+    # Find a few candidate jobs to run (in case some immediately get locked by others).
+    # Return in random order prevent everyone trying to do same head job at once.
     def self.find_available(limit = 5, max_run_time = MAX_RUN_TIME)
 
       time_now = db_time_now
@@ -115,38 +145,22 @@ module Delayed
       records.sort_by { rand() }
     end
 
-    # Get the payload of the next job we can get an exclusive lock on.
+    # Run the next job we can get an exclusive lock on.
     # If no jobs are left we return nil
-    def self.reserve(max_run_time = MAX_RUN_TIME, &amp;block)
+    def self.reserve_and_run_one_job(max_run_time = MAX_RUN_TIME)
 
-      # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
+      # We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
       # this leads to a more even distribution of jobs across the worker processes
       find_available(5, max_run_time).each do |job|
-        begin
-          logger.info &quot;* [JOB] aquiring lock on #{job.name}&quot;
-          job.lock_exclusively!(max_run_time, worker_name)
-          runtime =  Benchmark.realtime do
-            invoke_job(job.payload_object, &amp;block)
-            job.destroy
-          end
-          logger.info &quot;* [JOB] #{job.name} completed after %.4f&quot; % runtime
-
-          return job
-        rescue LockError
-          # We did not get the lock, some other worker process must have
-          logger.warn &quot;* [JOB] failed to aquire exclusive lock for #{job.name}&quot;
-        rescue StandardError =&gt; e
-          job.reschedule e.message, e.backtrace
-          log_exception(job, e)
-          return job
-        end
+        t = job.run_with_lock(max_run_time, worker_name)
+        return t unless t == nil  # return if we did work (good or bad)
       end
 
-      nil
+      nil # we didn't do any work, all 5 were not lockable
     end
 
-    # This method is used internally by reserve method to ensure exclusive access
-    # to the given job. It will rise a LockError if it cannot get this lock.
+    # Lock this job for this worker.
+    # Returns true if we have the lock, false otherwise.
     def lock_exclusively!(max_run_time, worker = worker_name)
       now = self.class.db_time_now
       affected_rows = if locked_by != worker
@@ -157,46 +171,50 @@ module Delayed
         # Simply resume and update the locked_at
         self.class.update_all([&quot;locked_at = ?&quot;, now], [&quot;id = ? and locked_by = ?&quot;, id, worker])
       end
-      raise LockError.new(&quot;Attempted to aquire exclusive lock failed&quot;) unless affected_rows == 1
-
-      self.locked_at    = now
-      self.locked_by    = worker
+      if affected_rows == 1
+        self.locked_at    = now
+        self.locked_by    = worker
+        return true
+      else
+        return false
+      end
     end
 
+    # Unlock this job (note: not saved to DB)
     def unlock
       self.locked_at    = nil
       self.locked_by    = nil
     end
 
     # This is a good hook if you need to report job processing errors in additional or different ways
-    def self.log_exception(job, error)
-      logger.error &quot;* [JOB] #{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts&quot;
+    def log_exception(error)
+      logger.error &quot;* [JOB] #{name} failed with #{error.class.name}: #{error.message} - #{attempts} failed attempts&quot;
       logger.error(error)
     end
 
+    # Do num jobs and return stats on success/failure.
+    # Exit early if interrupted.
     def self.work_off(num = 100)
       success, failure = 0, 0
 
       num.times do
-        job = self.reserve do |j|
-          begin
-            j.perform
+        case self.reserve_and_run_one_job
+        when true
             success += 1
-          rescue
+        when false
             failure += 1
-            raise
-          end
+        else
+          break  # leave if no work could be done
         end
-
-        break if job.nil?
+        break if $exit # leave if we're exiting
       end
 
       return [success, failure]
     end
 
     # Moved into its own method so that new_relic can trace it.
-    def self.invoke_job(job, &amp;block)
-      block.call(job)
+    def invoke_job
+      payload_object.perform
     end
 
   private
@@ -227,6 +245,9 @@ module Delayed
        klass.constantize
     end
 
+    # Get the current time (GMT or local depending on DB)
+    # Note: This does not ping the DB to get the time, so all your clients
+    # must have syncronized clocks.
     def self.db_time_now
       (ActiveRecord::Base.default_timezone == :utc) ? Time.now.utc : Time.now
     end</diff>
      <filename>lib/delayed/job.rb</filename>
    </modified>
    <modified>
      <diff>@@ -78,13 +78,9 @@ describe 'random ruby objects' do
 
     Delayed::Job.count.should == 1
 
-    output = nil
+    Delayed::Job.reserve_and_run_one_job
 
-    Delayed::Job.reserve do |e|
-      output = e.perform
-    end
-
-    output.should == true
+    Delayed::Job.count.should == 0
 
   end
 
@@ -129,4 +125,4 @@ describe 'random ruby objects' do
     job.payload_object.perform.should == 'Once upon...'
   end
 
-end
\ No newline at end of file
+end</diff>
      <filename>spec/delayed_method_spec.rb</filename>
    </modified>
    <modified>
      <diff>@@ -184,11 +184,11 @@ describe Delayed::Job do
     end
 
     it &quot;should not allow a second worker to get exclusive access&quot; do
-      lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
+      @job.lock_exclusively!(4.hours, 'worker2').should == false
     end
 
     it &quot;should allow a second worker to get exclusive access if the timeout has passed&quot; do
-      lambda { @job.lock_exclusively! 1.minute, 'worker2' }.should_not raise_error(Delayed::Job::LockError)
+      @job.lock_exclusively!(1.minute, 'worker2').should == true
     end      
     
     it &quot;should be able to get access to the task if it was started more then max_age ago&quot; do
@@ -283,7 +283,7 @@ describe Delayed::Job do
 
     it &quot;should leave the queue in a consistent state and not run the job if locking fails&quot; do
       SimpleJob.runs.should == 0     
-      @job.stub!(:lock_exclusively!).with(any_args).once.and_raise(Delayed::Job::LockError)
+      @job.stub!(:lock_exclusively!).with(any_args).once.and_return(false)
       Delayed::Job.should_receive(:find_available).once.and_return([@job])
       Delayed::Job.work_off(1)
       SimpleJob.runs.should == 0
@@ -291,21 +291,55 @@ describe Delayed::Job do
   
   end
   
-  context &quot;while running alongside other workers with enqueued jobs, it&quot; do 
+  context &quot;while running alongside other workers that locked jobs, it&quot; do
     before(:each) do
       Delayed::Job.worker_name = 'worker1'
-      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker1', :locked_at =&gt; (Delayed::Job.db_time_now - 3.minutes))
-      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker2', :locked_at =&gt; (Delayed::Job.db_time_now - 11.minutes))  
-      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker1', :locked_at =&gt; (Delayed::Job.db_time_now - 2.minutes))
+      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker1', :locked_at =&gt; (Delayed::Job.db_time_now - 1.minutes))
+      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker2', :locked_at =&gt; (Delayed::Job.db_time_now - 1.minutes))
+      Delayed::Job.create(:payload_object =&gt; SimpleJob.new)
+      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker1', :locked_at =&gt; (Delayed::Job.db_time_now - 1.minutes))
     end
-    
-    it &quot;should only find jobs if the lock has expired reguardless of the worker&quot; do
-      SimpleJob.runs.should == 0  
-      Delayed::Job.work_off(5)
-      SimpleJob.runs.should == 2 
-      Delayed::Job.find_available(5, 10.minutes).length.should == 1
+
+    it &quot;should ingore locked jobs from other workers&quot; do
+      Delayed::Job.worker_name = 'worker3'
+      SimpleJob.runs.should == 0
+      Delayed::Job.work_off
+      SimpleJob.runs.should == 1 # runs the one open job
     end
-    
+
+    it &quot;should find our own jobs regardless of locks&quot; do
+      Delayed::Job.worker_name = 'worker1'
+      SimpleJob.runs.should == 0
+      Delayed::Job.work_off
+      SimpleJob.runs.should == 3 # runs open job plus worker1 jobs that were already locked
+    end
+  end
+
+  context &quot;while running with locked and expired jobs, it&quot; do
+    before(:each) do
+      Delayed::Job.worker_name = 'worker1'
+      exp_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Job::MAX_RUN_TIME)
+      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker1', :locked_at =&gt; exp_time)
+      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker2', :locked_at =&gt; (Delayed::Job.db_time_now - 1.minutes))
+      Delayed::Job.create(:payload_object =&gt; SimpleJob.new)
+      Delayed::Job.create(:payload_object =&gt; SimpleJob.new, :locked_by =&gt; 'worker1', :locked_at =&gt; (Delayed::Job.db_time_now - 1.minutes))
+    end
+
+    it &quot;should only find unlocked and expired jobs&quot; do
+      Delayed::Job.worker_name = 'worker3'
+      SimpleJob.runs.should == 0
+      Delayed::Job.work_off
+      SimpleJob.runs.should == 2 # runs the one open job and one expired job
+    end
+
+    it &quot;should ignore locks when finding our own jobs&quot; do
+      Delayed::Job.worker_name = 'worker1'
+      SimpleJob.runs.should == 0
+      Delayed::Job.work_off
+      SimpleJob.runs.should == 3 # runs open job plus worker1 jobs
+      # This is useful in the case of a crash/restart on worker1, but make sure multiple workers on the same host have unique names!
+    end
+
   end
   
 end</diff>
      <filename>spec/job_spec.rb</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>ad27c3e36fd951abb67545e7b711e5ed9521ec79</id>
    </parent>
  </parents>
  <author>
    <name>Dan DeMaggio</name>
    <email>dan@animoto.com</email>
  </author>
  <url>http://github.com/collectiveidea/delayed_job/commit/266fc15c12953a94a6d052281ce10059ddd5ebc1</url>
  <id>266fc15c12953a94a6d052281ce10059ddd5ebc1</id>
  <committed-date>2009-03-12T09:27:01-07:00</committed-date>
  <authored-date>2009-03-12T09:27:01-07:00</authored-date>
  <message>refactor job object

- Move some class methods to instance methods
- Don't use exception to signal lock failure
- Add more explicit test cases for locking with multiple workers</message>
  <tree>bd3ee1b4866271d91d7eb2a2a95521d6021e6b2d</tree>
  <committer>
    <name>Dan DeMaggio</name>
    <email>dan@animoto.com</email>
  </committer>
</commit>
