public
Description: Database based asynchronously priority queue system -- Extracted from Shopify
Homepage: http://www.shopify.com
Clone URL: git://github.com/tobi/delayed_job.git
Renamed locked_until to locked_at. We now store when we start a given task 
instead of how long it will be locked by the worker. This allows us to get 
a reading on how long a task took to execute.
Tobias Lütke (author)
Sat Jun 21 21:01:56 -0700 2008
commit  59989313afa5f0b431263e37a994d895a3cc669d
tree    c6e0e7a738a991276bb926f8cb137daf087ee5e8
parent  bf90faf65419751d79442db392be02fb382695f1
...
13
14
15
16
 
 
17
18
19
...
13
14
15
 
16
17
18
19
20
0
@@ -13,7 +13,8 @@ It is a direct extraction from Shopify where the job table is responsible for a
0
 * spam checks
0
 
0
 h2. Changes
0
-
0
+
0
+* 1.6 Renamed locked_until to locked_at. We now store when we start a given task instead of how long it will be locked by the worker. This allows us to get a reading on how long a task took to execute.
0
 * 1.5 Job runners can now be run in parallel. Two new database columns are needed: locked_until and locked_by. This allows us to use pessimistic locking, which enables us to run as many worker processes as we need to speed up queue processing.
0
 * 1.0 Initial release
0
             
...
10
11
12
13
 
14
15
16
...
18
19
20
21
 
22
23
24
...
57
58
59
60
 
61
62
63
64
65
66
 
67
68
69
...
81
82
83
84
 
 
85
86
 
87
88
 
 
89
90
91
92
 
 
93
94
95
96
97
98
 
99
100
101
 
102
103
104
...
108
109
110
111
 
112
113
114
115
116
 
117
118
119
...
10
11
12
 
13
14
15
16
...
18
19
20
 
21
22
23
24
...
57
58
59
 
60
61
62
63
64
65
 
66
67
68
69
...
81
82
83
 
84
85
86
 
87
88
 
89
90
91
92
 
 
93
94
95
96
97
98
99
 
100
101
102
 
103
104
105
106
...
110
111
112
 
113
114
115
116
117
 
118
119
120
121
0
@@ -10,7 +10,7 @@ module Delayed
0
     self.worker_name = "pid:#{Process.pid}"
0
     
0
     
0
- NextTaskSQL = '`run_at` <= ? AND (`locked_until` IS NULL OR `locked_until` < ?) OR (`locked_by`=?)'
0
+ NextTaskSQL = '`run_at` <= ? AND (`locked_at` IS NULL OR `locked_at` < ?) OR (`locked_by` = ?)'
0
     NextTaskOrder = 'priority DESC, run_at ASC'
0
     ParseObjectFromYaml = /\!ruby\/\w+\:([^\s]+)/
0
 
0
@@ -18,7 +18,7 @@ module Delayed
0
     end
0
 
0
     def self.clear_locks!
0
- connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_until`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
0
+ connection.execute "UPDATE #{table_name} SET `locked_by`=NULL, `locked_at`=NULL WHERE `locked_by`=#{quote_value(worker_name)}"
0
     end
0
       
0
     def payload_object
0
@@ -57,13 +57,13 @@ module Delayed
0
                                                                              
0
     # Get the payload of the next job we can get an exclusive lock on.
0
     # If no jobs are left we return nil
0
- def self.reserve(timeout = 4 * 60 * 60)
0
+ def self.reserve(max_run_time = 4.hours)
0
                     
0
       # We get up to 5 jobs from the db. In face we cannot get exclusive access to a job we try the next.
0
       # this leads to a more even distribution of jobs across the worker processes
0
       find_available(5).each do |job|
0
         begin
0
- job.lock_exclusively!(self.db_time_now + timeout, worker_name)
0
+ job.lock_exclusively!(max_run_time, worker_name)
0
           yield job.payload_object
0
           job.destroy
0
           return job
0
@@ -81,24 +81,26 @@ module Delayed
0
 
0
     # This method is used internally by reserve method to ensure exclusive access
0
     # to the given job. It will rise a LockError if it cannot get this lock.
0
- def lock_exclusively!(lock_until, worker = worker_name)
0
+ def lock_exclusively!(max_run_time, worker = worker_name)
0
+ now = self.class.db_time_now
0
       
0
- affected_rows = if locked_by != worker
0
+ affected_rows = if locked_by != worker
0
         
0
- # We don't own this job so we will update the locked_by name and the locked_until
0
+
0
+ # We don't own this job so we will update the locked_by name and the locked_at
0
         connection.update(<<-end_sql, "#{self.class.name} Update to aquire exclusive lock")
0
           UPDATE #{self.class.table_name}
0
- SET `locked_until`=#{quote_value(lock_until)}, `locked_by`=#{quote_value(worker)}
0
- WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_until`<#{quote_value(self.class.db_time_now)} OR `locked_until` IS NULL)
0
+ SET `locked_at`=#{quote_value(now)}, `locked_by`=#{quote_value(worker)}
0
+ WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_at` IS NULL OR `locked_at` < #{quote_value(now + max_run_time)})
0
         end_sql
0
 
0
       else
0
         
0
         # We alrady own this job, this may happen if the job queue crashes.
0
- # Simply update the lock timeout
0
+ # Simply resume and update the locked_at
0
         connection.update(<<-end_sql, "#{self.class.name} Update exclusive lock")
0
           UPDATE #{self.class.table_name}
0
- SET `locked_until`=#{quote_value(lock_until)}
0
+ SET `locked_at`=#{quote_value(now)}
0
           WHERE #{self.class.primary_key} = #{quote_value(id)} AND (`locked_by`=#{quote_value(worker)})
0
         end_sql
0
 
0
@@ -108,12 +110,12 @@ module Delayed
0
         raise LockError, "Attempted to aquire exclusive lock failed"
0
       end
0
       
0
- self.locked_until = lock_until
0
+ self.locked_at = now
0
       self.locked_by = worker
0
     end
0
     
0
     def unlock
0
- self.locked_until = nil
0
+ self.locked_at = nil
0
       self.locked_by = nil
0
     end
0
     
...
17
18
19
20
 
21
22
23
...
17
18
19
 
20
21
22
23
0
@@ -17,7 +17,7 @@ def reset_db
0
       table.text :handler
0
       table.string :last_error
0
       table.datetime :run_at
0
- table.datetime :locked_until
0
+ table.datetime :locked_at
0
       table.string :locked_by
0
       table.timestamps
0
     end
...
91
92
93
94
 
95
96
97
98
 
99
100
101
102
103
104
 
 
 
 
 
 
 
 
 
 
 
 
 
105
106
107
108
109
110
 
 
 
111
112
113
...
91
92
93
 
94
95
96
97
 
98
99
100
 
 
 
 
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
 
 
 
 
116
117
118
119
120
121
0
@@ -91,23 +91,31 @@ describe Delayed::Job do
0
   end
0
              
0
   
0
- describe "when two workers are running" do
0
+ describe "when another worker is already performing an task, it" do
0
     
0
     before :each do
0
       Delayed::Job.worker_name = 'worker1'
0
- Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_until => Time.now + 360
0
+ @job = Delayed::Job.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => Time.now.utc
0
     end
0
     
0
- it "should give exclusive access only to a single worker" do
0
- job = Delayed::Job.find_available.first
0
- lambda { job.lock_exclusively! Time.now + 20, 'worker2' }.should raise_error(Delayed::Job::LockError)
0
- end
0
+ it "should not allow a second worker to get exclusive access" do
0
+ lambda { @job.lock_exclusively! 4.hours, 'worker2' }.should raise_error(Delayed::Job::LockError)
0
+ end
0
+
0
+ it "should be able to get access to the task if it was started more then max_age ago" do
0
+ @job.locked_at = 5.hours.ago
0
+ @job.save
0
+
0
+ @job.lock_exclusively! 4.hours, 'worker2'
0
+ @job.reload
0
+ @job.locked_by.should == 'worker2'
0
+ @job.locked_at.should > 1.minute.ago
0
+ end
0
 
0
     it "should be able to get exclusive access again when the worker name is the same" do
0
- job = Delayed::Job.find_available.first
0
- job.lock_exclusively! Time.now + 20, 'worker1'
0
- job.lock_exclusively! Time.now + 21, 'worker1'
0
- job.lock_exclusively! Time.now + 22, 'worker1'
0
+ @job.lock_exclusively! Time.now + 20, 'worker1'
0
+ @job.lock_exclusively! Time.now + 21, 'worker1'
0
+ @job.lock_exclusively! Time.now + 22, 'worker1'
0
     end
0
   end
0
   

Comments

    No one has commented yet.