Skip to content

Commit

Permalink
Added a new meta table to keep track of the last run job, this way th…
Browse files Browse the repository at this point in the history
…e tasks are ensured to respect the interval even if there's no tasks in the queue.
  • Loading branch information
Fred Wu committed Mar 26, 2010
1 parent 0a88139 commit 1e845a3
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 5 deletions.
7 changes: 6 additions & 1 deletion generators/delayed_job/templates/migration.rb
Expand Up @@ -13,9 +13,14 @@ def self.up
end

add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'

create_table :delayed_jobs_meta, :force => true do |table|
table.datetime :last_run_at
end
end

def self.down
drop_table :delayed_jobs
drop_table :delayed_jobs
drop_table :delayed_jobs_meta
end
end
5 changes: 5 additions & 0 deletions lib/delayed/backend/active_record.rb
Expand Up @@ -17,13 +17,18 @@ def dump_for_delayed_job
module Delayed
module Backend
module ActiveRecord
class Meta < ::ActiveRecord::Base
include Delayed::Backend::Base
set_table_name :delayed_jobs_meta
end
# A job object that is persisted to the database.
# Contains the work object as a YAML field.
class Job < ::ActiveRecord::Base
include Delayed::Backend::Base
set_table_name :delayed_jobs

before_save :set_default_run_at
before_save :set_last_run_at_meta

named_scope :ready_to_run, lambda {|worker_name, max_run_time|
{:conditions => ['(run_at <= ? AND (locked_at IS NULL OR locked_at < ?) OR locked_by = ?) AND failed_at IS NULL', db_time_now, db_time_now - max_run_time, worker_name]}
Expand Down
8 changes: 8 additions & 0 deletions lib/delayed/backend/base.rb
Expand Up @@ -92,6 +92,14 @@ def attempt_to_load(klass)
def set_default_run_at
self.run_at ||= self.class.db_time_now
end

def set_last_run_at_meta
if meta = Delayed::Meta.find(:last)
meta.update_attributes(:last_run_at => self.run_at)
else
Delayed::Meta.create(:last_run_at => self.run_at)
end
end

end
end
Expand Down
6 changes: 6 additions & 0 deletions lib/delayed/backend/mongo_mapper.rb
Expand Up @@ -19,6 +19,11 @@ def dump_for_delayed_job
module Delayed
module Backend
module MongoMapper
class Meta
include MongoMapper::Document
include Delayed::Backend::Base
set_collection_name 'delayed_jobs_meta'
end
class Job
include ::MongoMapper::Document
include Delayed::Backend::Base
Expand All @@ -35,6 +40,7 @@ class Job
timestamps!

before_save :set_default_run_at
before_save :set_last_run_at_meta

ensure_index [[:priority, 1], [:run_at, 1]]

Expand Down
20 changes: 16 additions & 4 deletions lib/delayed/worker.rb
Expand Up @@ -28,10 +28,14 @@ class Worker
def self.backend=(backend)
if backend.is_a? Symbol
require "delayed/backend/#{backend}"
backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
backend_job = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
backend_meta = "Delayed::Backend::#{backend.to_s.classify}::Meta".constantize
end
@@backend = backend_job
silence_warnings do
::Delayed.const_set(:Job, backend_job)
::Delayed.const_set(:Meta, backend_meta)
end
@@backend = backend
silence_warnings { ::Delayed.const_set(:Job, backend) }
end

def initialize(options={})
Expand Down Expand Up @@ -119,7 +123,15 @@ def run(job)
end

def self.run_at_set_interval
self.run_interval.seconds.since(Delayed::Job.last.run_at) if Delayed::Job.count > 0
self.run_interval.seconds.since(self.run_time_of_last_job) unless self.run_time_of_last_job.nil?
end

def self.run_time_of_last_job
if Delayed::Job.count > 0
Delayed::Job.last.run_at
else
Delayed::Meta.find(:first).last_run_at
end
end

# Reschedule the job in the future (when a job fails).
Expand Down

0 comments on commit 1e845a3

Please sign in to comment.