Skip to content
Browse files

Merge remote branch 'collectiveidea/master' into delayed_job_daemon

Conflicts:
	lib/delayed/tasks.rb
  • Loading branch information...
2 parents 7221810 + 40fabf3 commit 4c3e1c9e7ddd2c2d0a1f2e1c64b7b3c55a4c9749 guns committed Sep 5, 2010
View
10 Gemfile
@@ -0,0 +1,10 @@
+source 'http://rubygems.org'
+gem 'activesupport', '~>3'
+gem 'daemons'
+
+group :development do
+ gem 'rspec'
+ gem 'rake'
+ gem 'activerecord', '~>3'
+ gem 'sqlite3-ruby'
+end
View
33 Gemfile.lock
@@ -0,0 +1,33 @@
+GEM
+ remote: http://rubygems.org/
+ specs:
+ activemodel (3.0.0)
+ activesupport (= 3.0.0)
+ builder (~> 2.1.2)
+ i18n (~> 0.4.1)
+ activerecord (3.0.0)
+ activemodel (= 3.0.0)
+ activesupport (= 3.0.0)
+ arel (~> 1.0.0)
+ tzinfo (~> 0.3.23)
+ activesupport (3.0.0)
+ arel (1.0.1)
+ activesupport (~> 3.0.0)
+ builder (2.1.2)
+ daemons (1.1.0)
+ i18n (0.4.1)
+ rake (0.8.7)
+ rspec (1.3.0)
+ sqlite3-ruby (1.3.1)
+ tzinfo (0.3.23)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ activerecord (~> 3)
+ activesupport (~> 3)
+ daemons
+ rake
+ rspec
+ sqlite3-ruby
View
19 README.textile
@@ -51,25 +51,6 @@ $ script/generate delayed_job
$ rake db:migrate
</pre>
-h3. MongoMapper
-
-You must use @MongoMapper.setup@ in the initializer:
-
-<pre>
-config = YAML::load(File.read(Rails.root.join('config/mongo.yml')))
-MongoMapper.setup(config, Rails.env)
-
-Delayed::Worker.backend = :mongo_mapper
-</pre>
-
-h3. DataMapper
-
-<pre>
-# config/initializers/delayed_job.rb
-Delayed::Worker.backend = :data_mapper
-Delayed::Worker.backend.auto_upgrade!
-</pre>
-
h2. Queuing Jobs
Call @.delay.method(params)@ on any object and it will be processed in the background.
View
46 Rakefile
@@ -1,53 +1,13 @@
# -*- encoding: utf-8 -*-
-begin
- require 'jeweler'
-rescue LoadError
- puts "Jeweler not available. Install it with: sudo gem install jeweler"
- exit 1
-end
-
-Jeweler::Tasks.new do |s|
- s.name = "delayed_job"
- s.summary = "Database-backed asynchronous priority queue system -- Extracted from Shopify"
- s.email = "tobi@leetsoft.com"
- s.homepage = "http://github.com/collectiveidea/delayed_job"
- s.description = "Delayed_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks.\n\nThis gem is collectiveidea's fork (http://github.com/collectiveidea/delayed_job)."
- s.authors = ["Brandon Keepers", "Tobias Lütke"]
-
- s.has_rdoc = true
- s.rdoc_options = ["--main", "README.textile", "--inline-source", "--line-numbers"]
- s.extra_rdoc_files = ["README.textile"]
-
- s.test_files = Dir['spec/*_spec.rb']
-
- s.add_dependency "daemons"
- s.add_development_dependency "rspec"
- s.add_development_dependency "sqlite3-ruby"
- s.add_development_dependency "activerecord"
- s.add_development_dependency "mongo_mapper"
- s.add_development_dependency "dm-core"
- s.add_development_dependency "dm-observer"
- s.add_development_dependency "dm-aggregates"
- s.add_development_dependency "dm-validations"
- s.add_development_dependency "do_sqlite3"
- s.add_development_dependency "couchrest"
-end
+require 'rubygems'
+require 'bundler/setup'
require 'spec/rake/spectask'
-
-
-task :default do
- %w(2.3.5 3.0.0.beta3).each do |version|
- puts "Running specs with Rails #{version}"
- system("RAILS_VERSION=#{version} rake -s spec;")
- end
-end
-
desc 'Run the specs'
Spec::Rake::SpecTask.new(:spec) do |t|
t.libs << 'lib'
t.pattern = 'spec/*_spec.rb'
t.verbose = true
end
-task :spec => :check_dependencies
+task :default => :spec
View
1 VERSION
@@ -1 +0,0 @@
-2.1.0.pre
View
139 delayed_job.gemspec
@@ -1,125 +1,26 @@
-# Generated by jeweler
-# DO NOT EDIT THIS FILE DIRECTLY
-# Instead, edit Jeweler::Tasks in Rakefile, and run the gemspec command
# -*- encoding: utf-8 -*-
Gem::Specification.new do |s|
- s.name = %q{delayed_job}
- s.version = "2.1.0.pre"
+ s.name = 'delayed_job'
+ s.version = '2.1.0.pre'
+ s.authors = ["Brandon Keepers", "Tobias L\303\274tke"]
+ s.summary = 'Database-backed asynchronous priority queue system -- Extracted from Shopify'
+ s.description = "Delayed_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks.
- s.required_rubygems_version = Gem::Requirement.new("> 1.3.1") if s.respond_to? :required_rubygems_version=
- s.authors = ["Brandon Keepers", "Tobias L\303\274tke"]
- s.date = %q{2010-05-21}
- s.description = %q{Delayed_job (or DJ) encapsulates the common pattern of asynchronously executing longer tasks in the background. It is a direct extraction from Shopify where the job table is responsible for a multitude of core tasks.
-
-This gem is collectiveidea's fork (http://github.com/collectiveidea/delayed_job).}
- s.email = %q{tobi@leetsoft.com}
- s.extra_rdoc_files = [
- "README.textile"
- ]
- s.files = [
- ".gitignore",
- "MIT-LICENSE",
- "README.textile",
- "Rakefile",
- "VERSION",
- "benchmarks.rb",
- "contrib/delayed_job.monitrc",
- "contrib/delayed_job_multiple.monitrc",
- "delayed_job.gemspec",
- "generators/delayed_job/delayed_job_generator.rb",
- "generators/delayed_job/templates/migration.rb",
- "generators/delayed_job/templates/script",
- "init.rb",
- "lib/delayed/backend/active_record.rb",
- "lib/delayed/backend/base.rb",
- "lib/delayed/backend/couch_rest.rb",
- "lib/delayed/backend/data_mapper.rb",
- "lib/delayed/backend/mongo_mapper.rb",
- "lib/delayed/command.rb",
- "lib/delayed/message_sending.rb",
- "lib/delayed/performable_method.rb",
- "lib/delayed/railtie.rb",
- "lib/delayed/recipes.rb",
- "lib/delayed/tasks.rb",
- "lib/delayed/worker.rb",
- "lib/delayed/yaml_ext.rb",
- "lib/delayed_job.rb",
- "lib/generators/delayed_job/delayed_job_generator.rb",
- "lib/generators/delayed_job/templates/migration.rb",
- "lib/generators/delayed_job/templates/script",
- "rails/init.rb",
- "recipes/delayed_job.rb",
- "spec/autoloaded/clazz.rb",
- "spec/autoloaded/struct.rb",
- "spec/backend/active_record_job_spec.rb",
- "spec/backend/couch_rest_job_spec.rb",
- "spec/backend/data_mapper_job_spec.rb",
- "spec/backend/mongo_mapper_job_spec.rb",
- "spec/backend/shared_backend_spec.rb",
- "spec/message_sending_spec.rb",
- "spec/performable_method_spec.rb",
- "spec/sample_jobs.rb",
- "spec/setup/active_record.rb",
- "spec/setup/couch_rest.rb",
- "spec/setup/data_mapper.rb",
- "spec/setup/mongo_mapper.rb",
- "spec/spec_helper.rb",
- "spec/worker_spec.rb",
- "tasks/jobs.rake"
- ]
- s.homepage = %q{http://github.com/collectiveidea/delayed_job}
- s.rdoc_options = ["--main", "README.textile", "--inline-source", "--line-numbers"]
- s.require_paths = ["lib"]
- s.rubygems_version = %q{1.3.6}
- s.summary = %q{Database-backed asynchronous priority queue system -- Extracted from Shopify}
- s.test_files = [
- "spec/message_sending_spec.rb",
- "spec/performable_method_spec.rb",
- "spec/worker_spec.rb"
- ]
-
- if s.respond_to? :specification_version then
- current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
- s.specification_version = 3
-
- if Gem::Version.new(Gem::RubyGemsVersion) >= Gem::Version.new('1.2.0') then
- s.add_runtime_dependency(%q<daemons>, [">= 0"])
- s.add_development_dependency(%q<rspec>, [">= 0"])
- s.add_development_dependency(%q<sqlite3-ruby>, [">= 0"])
- s.add_development_dependency(%q<activerecord>, [">= 0"])
- s.add_development_dependency(%q<mongo_mapper>, [">= 0"])
- s.add_development_dependency(%q<dm-core>, [">= 0"])
- s.add_development_dependency(%q<dm-observer>, [">= 0"])
- s.add_development_dependency(%q<dm-aggregates>, [">= 0"])
- s.add_development_dependency(%q<dm-validations>, [">= 0"])
- s.add_development_dependency(%q<do_sqlite3>, [">= 0"])
- s.add_development_dependency(%q<couchrest>, [">= 0"])
- else
- s.add_dependency(%q<daemons>, [">= 0"])
- s.add_dependency(%q<rspec>, [">= 0"])
- s.add_dependency(%q<sqlite3-ruby>, [">= 0"])
- s.add_dependency(%q<activerecord>, [">= 0"])
- s.add_dependency(%q<mongo_mapper>, [">= 0"])
- s.add_dependency(%q<dm-core>, [">= 0"])
- s.add_dependency(%q<dm-observer>, [">= 0"])
- s.add_dependency(%q<dm-aggregates>, [">= 0"])
- s.add_dependency(%q<dm-validations>, [">= 0"])
- s.add_dependency(%q<do_sqlite3>, [">= 0"])
- s.add_dependency(%q<couchrest>, [">= 0"])
- end
- else
- s.add_dependency(%q<daemons>, [">= 0"])
- s.add_dependency(%q<rspec>, [">= 0"])
- s.add_dependency(%q<sqlite3-ruby>, [">= 0"])
- s.add_dependency(%q<activerecord>, [">= 0"])
- s.add_dependency(%q<mongo_mapper>, [">= 0"])
- s.add_dependency(%q<dm-core>, [">= 0"])
- s.add_dependency(%q<dm-observer>, [">= 0"])
- s.add_dependency(%q<dm-aggregates>, [">= 0"])
- s.add_dependency(%q<dm-validations>, [">= 0"])
- s.add_dependency(%q<do_sqlite3>, [">= 0"])
- s.add_dependency(%q<couchrest>, [">= 0"])
- end
+This gem is collectiveidea's fork (http://github.com/collectiveidea/delayed_job)."
+ s.email = 'brandon@collectiveidea.com'
+ s.extra_rdoc_files = 'README.textile'
+ s.files = Dir.glob('{contrib,generators,lib,rails,recipes,spec,tasks}/**/*') +
+ %w(init.rb MIT-LICENSE README.textile)
+ s.homepage = 'http://github.com/collectiveidea/delayed_job'
+ s.rdoc_options = ["--main", "README.textile", "--inline-source", "--line-numbers"]
+ s.require_paths = ["lib"]
+ s.test_files = Dir.glob('spec/**/*')
+
+ s.add_runtime_dependency 'daemons'
+ s.add_runtime_dependency 'activesupport' '~>3'
+ s.add_development_dependency 'rspec'
+ s.add_development_dependency 'sqlite3-ruby'
+ s.add_development_dependency 'activerecord'
end
View
11 lib/delayed/backend/base.rb
@@ -62,7 +62,16 @@ def payload_object
# Moved into its own method so that new_relic can trace it.
def invoke_job
- payload_object.perform
+ payload_object.before(self) if payload_object.respond_to?(:before)
+ begin
+ payload_object.perform
+ payload_object.success(self) if payload_object.respond_to?(:success)
+ rescue Exception => e
+ payload_object.failure(self, e) if payload_object.respond_to?(:failure)
+ raise e
+ ensure
+ payload_object.after(self) if payload_object.respond_to?(:after)
+ end
end
# Unlock this job (note: not saved to DB)
View
109 lib/delayed/backend/couch_rest.rb
@@ -1,109 +0,0 @@
-require 'couchrest'
-
-#extent couchrest to handle delayed_job serialization.
-class CouchRest::ExtendedDocument
- yaml_as "tag:ruby.yaml.org,2002:CouchRest"
-
- def reload
- job = self.class.get self['_id']
- job.each {|k,v| self[k] = v}
- end
- def self.find(id)
- get id
- end
- def self.yaml_new(klass, tag, val)
- klass.get(val['_id'])
- end
- def ==(other)
- if other.is_a? ::CouchRest::ExtendedDocument
- self['_id'] == other['_id']
- else
- super
- end
- end
-end
-
-#couchrest adapter
-module Delayed
- module Backend
- module CouchRest
- class Job < ::CouchRest::ExtendedDocument
- include Delayed::Backend::Base
- use_database ::CouchRest::Server.new.database('delayed_job')
-
- property :handler
- property :last_error
- property :locked_by
- property :priority, :default => 0
- property :attempts, :default => 0
- property :run_at, :cast_as => 'Time'
- property :locked_at, :cast_as => 'Time'
- property :failed_at, :cast_as => 'Time'
- timestamps!
-
- set_callback :save, :before, :set_default_run_at
-
- view_by(:failed_at, :locked_by, :run_at,
- :map => "function(doc){" +
- " if(doc['couchrest-type'] == 'Delayed::Backend::CouchRest::Job') {" +
- " emit([doc.failed_at || null, doc.locked_by || null, doc.run_at || null], null);}" +
- " }")
- view_by(:failed_at, :locked_at, :run_at,
- :map => "function(doc){" +
- " if(doc['couchrest-type'] == 'Delayed::Backend::CouchRest::Job') {" +
- " emit([doc.failed_at || null, doc.locked_at || null, doc.run_at || null], null);}" +
- " }")
-
- def self.db_time_now; Time.now; end
- def self.find_available(worker_name, limit = 5, max_run_time = ::Delayed::Worker.max_run_time)
- ready = ready_jobs
- mine = my_jobs worker_name
- expire = expired_jobs max_run_time
- jobs = (ready + mine + expire)[0..limit-1].sort_by { |j| j.priority }
- jobs = jobs.find_all { |j| j.priority >= Worker.min_priority } if Worker.min_priority
- jobs = jobs.find_all { |j| j.priority <= Worker.max_priority } if Worker.max_priority
- jobs
- end
- def self.clear_locks!(worker_name)
- jobs = my_jobs worker_name
- jobs.each { |j| j.locked_by, j.locked_at = nil, nil; }
- database.bulk_save jobs
- end
- def self.delete_all
- database.bulk_save all.each { |doc| doc['_deleted'] = true }
- end
-
- def lock_exclusively!(max_run_time, worker = worker_name)
- return false if locked_by_other?(worker) and not expired?(max_run_time)
- case
- when locked_by_me?(worker)
- self.locked_at = self.class.db_time_now
- when (unlocked? or (locked_by_other?(worker) and expired?(max_run_time)))
- self.locked_at, self.locked_by = self.class.db_time_now, worker
- end
- save
- rescue RestClient::Conflict
- false
- end
-
- private
- def self.ready_jobs
- options = {:startkey => [nil, nil], :endkey => [nil, nil, db_time_now]}
- by_failed_at_and_locked_by_and_run_at options
- end
- def self.my_jobs(worker_name)
- options = {:startkey => [nil, worker_name], :endkey => [nil, worker_name, {}]}
- by_failed_at_and_locked_by_and_run_at options
- end
- def self.expired_jobs(max_run_time)
- options = {:startkey => [nil,'0'], :endkey => [nil, db_time_now - max_run_time, db_time_now]}
- by_failed_at_and_locked_at_and_run_at options
- end
- def unlocked?; locked_by.nil?; end
- def expired?(time); locked_at < self.class.db_time_now - time; end
- def locked_by_me?(worker); not locked_by.nil? and locked_by == worker; end
- def locked_by_other?(worker); not locked_by.nil? and locked_by != worker; end
- end
- end
- end
-end
View
121 lib/delayed/backend/data_mapper.rb
@@ -1,121 +0,0 @@
-require 'dm-core'
-require 'dm-observer'
-require 'dm-aggregates'
-
-DataMapper::Resource.class_eval do
- yaml_as "tag:ruby.yaml.org,2002:DataMapper"
-
- def self.yaml_new(klass, tag, val)
- klass.find(val['id'])
- end
-
- def to_yaml_properties
- ['@id']
- end
-end
-
-module Delayed
- module Backend
- module DataMapper
- class Job
- include ::DataMapper::Resource
- include Delayed::Backend::Base
-
- storage_names[:default] = 'delayed_jobs'
-
- property :id, Serial
- property :priority, Integer, :default => 0, :index => :run_at_priority
- property :attempts, Integer, :default => 0
- property :handler, Text, :lazy => false
- property :run_at, Time, :index => :run_at_priority
- property :locked_at, Time, :index => true
- property :locked_by, String
- property :failed_at, Time
- property :last_error, Text
-
- def self.db_time_now
- Time.now
- end
-
- def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
-
- simple_conditions = { :run_at.lte => db_time_now, :limit => limit, :failed_at => nil, :order => [:priority.asc, :run_at.asc] }
-
- # respect priorities
- simple_conditions[:priority.gte] = Worker.min_priority if Worker.min_priority
- simple_conditions[:priority.lte] = Worker.max_priority if Worker.max_priority
-
- # lockable
- lockable = (
- # not locked or past the max time
- ( all(:locked_at => nil ) | all(:locked_at.lt => db_time_now - max_run_time)) |
-
- # OR locked by our worker
- all(:locked_by => worker_name))
-
- # plus some other boring junk
- (lockable).all( simple_conditions )
- end
-
- # When a worker is exiting, make sure we don't have any locked jobs.
- def self.clear_locks!(worker_name)
- all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil)
- end
-
- # Lock this job for this worker.
- # Returns true if we have the lock, false otherwise.
- def lock_exclusively!(max_run_time, worker = worker_name)
-
- now = self.class.db_time_now
- overtime = now - max_run_time
-
- # FIXME - this is a bit gross
- # DM doesn't give us the number of rows affected by a collection update
- # so we have to circumvent some niceness in DM::Collection here
- collection = locked_by != worker ?
- (self.class.all(:id => id, :run_at.lte => now) & ( self.class.all(:locked_at => nil) | self.class.all(:locked_at.lt => overtime) ) ) :
- self.class.all(:id => id, :locked_by => worker)
-
- attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes
- affected_rows = self.repository.update(attributes, collection)
-
- if affected_rows == 1
- self.locked_at = now
- self.locked_by = worker
- return true
- else
- return false
- end
- end
-
- # these are common to the other backends, so we provide an implementation
- def self.delete_all
- Delayed::Job.auto_migrate!
- end
-
- def self.find id
- get id
- end
-
- def update_attributes(attributes)
- attributes.each do |k,v|
- self[k] = v
- end
- self.save
- end
-
-
- end
-
- class JobObserver
- include ::DataMapper::Observer
-
- observe Job
-
- before :save do
- self.run_at ||= self.class.db_time_now
- end
- end
- end
- end
-end
View
106 lib/delayed/backend/mongo_mapper.rb
@@ -1,106 +0,0 @@
-require 'mongo_mapper'
-
-MongoMapper::Document.class_eval do
- yaml_as "tag:ruby.yaml.org,2002:MongoMapper"
-
- def self.yaml_new(klass, tag, val)
- klass.find(val['_id'])
- end
-
- def to_yaml_properties
- ['@_id']
- end
-end
-
-module Delayed
- module Backend
- module MongoMapper
- class Job
- include ::MongoMapper::Document
- include Delayed::Backend::Base
- set_collection_name 'delayed_jobs'
-
- key :priority, Integer, :default => 0
- key :attempts, Integer, :default => 0
- key :handler, String
- key :run_at, Time
- key :locked_at, Time
- key :locked_by, String, :index => true
- key :failed_at, Time
- key :last_error, String
- timestamps!
-
- before_save :set_default_run_at
-
- ensure_index [[:priority, 1], [:run_at, 1]]
-
- def self.before_fork
- ::MongoMapper.connection.close
- end
-
- def self.after_fork
- ::MongoMapper.connect(RAILS_ENV)
- end
-
- def self.db_time_now
- Time.now.utc
- end
-
- def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
- right_now = db_time_now
-
- conditions = {
- :run_at => {"$lte" => right_now},
- :limit => -limit, # In mongo, positive limits are 'soft' and negative are 'hard'
- :failed_at => nil,
- :sort => [['priority', 1], ['run_at', 1]]
- }
-
- where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}"
-
- (conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
- (conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority
-
- results = all(conditions.merge(:locked_by => worker_name))
- results += all(conditions.merge('$where' => where)) if results.size < limit
- results
- end
-
- # When a worker is exiting, make sure we don't have any locked jobs.
- def self.clear_locks!(worker_name)
- collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
- end
-
- # Lock this job for this worker.
- # Returns true if we have the lock, false otherwise.
- def lock_exclusively!(max_run_time, worker = worker_name)
- right_now = self.class.db_time_now
- overtime = right_now - max_run_time.to_i
-
- query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}"
- conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query}
-
- collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}})
- affected_rows = collection.find({:_id => id, :locked_by => worker}).count
- if affected_rows == 1
- self.locked_at = right_now
- self.locked_by = worker
- return true
- else
- return false
- end
- end
-
- private
-
- def self.make_date(date_or_seconds)
- "new Date(#{date_or_seconds.to_f * 1000})"
- end
-
- def make_date(date)
- self.class.make_date(date)
- end
- end
- end
- end
-end
View
472 lib/delayed/backend/shared_spec.rb
@@ -0,0 +1,472 @@
+require File.expand_path('../../../../spec/sample_jobs', __FILE__)
+
+shared_examples_for 'a delayed_job backend' do
+ def create_job(opts = {})
+ described_class.create(opts.merge(:payload_object => SimpleJob.new))
+ end
+
+ before do
+ Delayed::Worker.max_priority = nil
+ Delayed::Worker.min_priority = nil
+ Delayed::Worker.default_priority = 99
+ SimpleJob.runs = 0
+ described_class.delete_all
+ end
+
+ it "should set run_at automatically if not set" do
+ described_class.create(:payload_object => ErrorJob.new ).run_at.should_not be_nil
+ end
+
+ it "should not set run_at automatically if already set" do
+ later = described_class.db_time_now + 5.minutes
+ described_class.create(:payload_object => ErrorJob.new, :run_at => later).run_at.should be_close(later, 1)
+ end
+
+ describe "enqueue" do
+ it "should raise ArgumentError when handler doesn't respond_to :perform" do
+ lambda { described_class.enqueue(Object.new) }.should raise_error(ArgumentError)
+ end
+
+ it "should increase count after enqueuing items" do
+ described_class.enqueue SimpleJob.new
+ described_class.count.should == 1
+ end
+
+ it "should be able to set priority" do
+ @job = described_class.enqueue SimpleJob.new, 5
+ @job.priority.should == 5
+ end
+
+ it "should use default priority when it is not set" do
+ @job = described_class.enqueue SimpleJob.new
+ @job.priority.should == 99
+ end
+
+ it "should be able to set run_at" do
+ later = described_class.db_time_now + 5.minutes
+ @job = described_class.enqueue SimpleJob.new, 5, later
+ @job.run_at.should be_close(later, 1)
+ end
+
+ it "should work with jobs in modules" do
+ M::ModuleJob.runs = 0
+ job = described_class.enqueue M::ModuleJob.new
+ lambda { job.invoke_job }.should change { M::ModuleJob.runs }.from(0).to(1)
+ end
+ end
+
+ describe "callbacks" do
+ before(:each) do
+ SuccessfulCallbackJob.messages = []
+ FailureCallbackJob.messages = []
+ end
+
+ it "should call before and after callbacks" do
+ job = described_class.enqueue(SuccessfulCallbackJob.new)
+ job.invoke_job
+ SuccessfulCallbackJob.messages.should == ["before perform", "perform", "success!", "after perform"]
+ end
+
+ it "should call the after callback with an error" do
+ job = described_class.enqueue(FailureCallbackJob.new)
+ lambda {job.invoke_job}.should raise_error
+ FailureCallbackJob.messages.should == ["before perform", "error: RuntimeError", "after perform"]
+ end
+
+ end
+
+ describe "payload_object" do
+ it "should raise a DeserializationError when the job class is totally unknown" do
+ job = described_class.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
+ lambda { job.payload_object }.should raise_error(Delayed::Backend::DeserializationError)
+ end
+
+ it "should raise a DeserializationError when the job struct is totally unknown" do
+ job = described_class.new :handler => "--- !ruby/struct:StructThatDoesNotExist {}"
+ lambda { job.payload_object }.should raise_error(Delayed::Backend::DeserializationError)
+ end
+ end
+
+ describe "find_available" do
+ it "should not find failed jobs" do
+ @job = create_job :attempts => 50, :failed_at => described_class.db_time_now
+ described_class.find_available('worker', 5, 1.second).should_not include(@job)
+ end
+
+ it "should not find jobs scheduled for the future" do
+ @job = create_job :run_at => (described_class.db_time_now + 1.minute)
+ described_class.find_available('worker', 5, 4.hours).should_not include(@job)
+ end
+
+ it "should not find jobs locked by another worker" do
+ @job = create_job(:locked_by => 'other_worker', :locked_at => described_class.db_time_now - 1.minute)
+ described_class.find_available('worker', 5, 4.hours).should_not include(@job)
+ end
+
+ it "should find open jobs" do
+ @job = create_job
+ described_class.find_available('worker', 5, 4.hours).should include(@job)
+ end
+
+ it "should find expired jobs" do
+ @job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now - 2.minutes)
+ described_class.find_available('worker', 5, 1.minute).should include(@job)
+ end
+
+ it "should find own jobs" do
+ @job = create_job(:locked_by => 'worker', :locked_at => (described_class.db_time_now - 1.minutes))
+ described_class.find_available('worker', 5, 4.hours).should include(@job)
+ end
+
+ it "should find only the right amount of jobs" do
+ 10.times { create_job }
+ described_class.find_available('worker', 7, 4.hours).should have(7).jobs
+ end
+ end
+
+ context "when another worker is already performing an task, it" do
+ before :each do
+ @job = described_class.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => described_class.db_time_now - 5.minutes
+ end
+
+ it "should not allow a second worker to get exclusive access" do
+ @job.lock_exclusively!(4.hours, 'worker2').should == false
+ end
+
+ it "should allow a second worker to get exclusive access if the timeout has passed" do
+ @job.lock_exclusively!(1.minute, 'worker2').should == true
+ end
+
+ it "should be able to get access to the task if it was started more then max_age ago" do
+ @job.locked_at = described_class.db_time_now - 5.hours
+ @job.save
+
+ @job.lock_exclusively! 4.hours, 'worker2'
+ @job.reload
+ @job.locked_by.should == 'worker2'
+ @job.locked_at.should > (described_class.db_time_now - 1.minute)
+ end
+
+ it "should not be found by another worker" do
+ described_class.find_available('worker2', 1, 6.minutes).length.should == 0
+ end
+
+ it "should be found by another worker if the time has expired" do
+ described_class.find_available('worker2', 1, 4.minutes).length.should == 1
+ end
+
+ it "should be able to get exclusive access again when the worker name is the same" do
+ @job.lock_exclusively!(5.minutes, 'worker1').should be_true
+ @job.lock_exclusively!(5.minutes, 'worker1').should be_true
+ @job.lock_exclusively!(5.minutes, 'worker1').should be_true
+ end
+ end
+
+ context "when another worker has worked on a task since the job was found to be available, it" do
+
+ before :each do
+ @job = described_class.create :payload_object => SimpleJob.new
+ @job_copy_for_worker_2 = described_class.find(@job.id)
+ end
+
+ it "should not allow a second worker to get exclusive access if already successfully processed by worker1" do
+ @job.destroy
+ @job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
+ end
+
+ it "should not allow a second worker to get exclusive access if failed to be processed by worker1 and run_at time is now in future (due to backing off behaviour)" do
+ @job.update_attributes(:attempts => 1, :run_at => described_class.db_time_now + 1.day)
+ @job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
+ end
+ end
+
+ context "#name" do
+ it "should be the class name of the job that was enqueued" do
+ described_class.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
+ end
+
+ it "should be the method that will be called if its a performable method object" do
+ job = described_class.new(:payload_object => NamedJob.new)
+ job.name.should == 'named_job'
+ end
+
+ it "should be the instance method that will be called if its a performable method object" do
+ @job = Story.create(:text => "...").delay.save
+ @job.name.should == 'Story#save'
+ end
+ end
+
+ context "worker prioritization" do
+ before(:each) do
+ Delayed::Worker.max_priority = nil
+ Delayed::Worker.min_priority = nil
+ end
+
+ it "should fetch jobs ordered by priority" do
+ 10.times { described_class.enqueue SimpleJob.new, rand(10) }
+ jobs = described_class.find_available('worker', 10)
+ jobs.size.should == 10
+ jobs.each_cons(2) do |a, b|
+ a.priority.should <= b.priority
+ end
+ end
+
+ it "should only find jobs greater than or equal to min priority" do
+ min = 5
+ Delayed::Worker.min_priority = min
+ 10.times {|i| described_class.enqueue SimpleJob.new, i }
+ jobs = described_class.find_available('worker', 10)
+ jobs.each {|job| job.priority.should >= min}
+ end
+
+ it "should only find jobs less than or equal to max priority" do
+ max = 5
+ Delayed::Worker.max_priority = max
+ 10.times {|i| described_class.enqueue SimpleJob.new, i }
+ jobs = described_class.find_available('worker', 10)
+ jobs.each {|job| job.priority.should <= max}
+ end
+ end
+
+ context "clear_locks!" do
+ before do
+ @job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now)
+ end
+
+ it "should clear locks for the given worker" do
+ described_class.clear_locks!('worker')
+ described_class.find_available('worker2', 5, 1.minute).should include(@job)
+ end
+
+ it "should not clear locks for other workers" do
+ described_class.clear_locks!('worker1')
+ described_class.find_available('worker1', 5, 1.minute).should_not include(@job)
+ end
+ end
+
+ context "unlock" do
+ before do
+ @job = create_job(:locked_by => 'worker', :locked_at => described_class.db_time_now)
+ end
+
+ it "should clear locks" do
+ @job.unlock
+ @job.locked_by.should be_nil
+ @job.locked_at.should be_nil
+ end
+ end
+
+ context "large handler" do
+ before do
+ text = "Lorem ipsum dolor sit amet. " * 1000
+ @job = described_class.enqueue Delayed::PerformableMethod.new(text, :length, {})
+ end
+
+ it "should have an id" do
+ @job.id.should_not be_nil
+ end
+ end
+
+ describe "yaml serialization" do
+ it "should reload changed attributes" do
+ job = described_class.enqueue SimpleJob.new
+ yaml = job.to_yaml
+ job.priority = 99
+ job.save
+ YAML.load(yaml).priority.should == 99
+ end
+
+ it "should ignore destroyed records" do
+ job = described_class.enqueue SimpleJob.new
+ yaml = job.to_yaml
+ job.destroy
+ lambda { YAML.load(yaml).should be_nil }.should_not raise_error
+ end
+ end
+
+ describe "worker integration" do
+ before do
+ Delayed::Job.delete_all
+
+ @worker = Delayed::Worker.new(:max_priority => nil, :min_priority => nil, :quiet => true)
+
+ SimpleJob.runs = 0
+ end
+
+ describe "running a job" do
+ it "should fail after Worker.max_run_time" do
+ begin
+ old_max_run_time = Delayed::Worker.max_run_time
+ Delayed::Worker.max_run_time = 1.second
+ @job = Delayed::Job.create :payload_object => LongRunningJob.new
+ @worker.run(@job)
+ @job.reload.last_error.should =~ /expired/
+ @job.attempts.should == 1
+ ensure
+ Delayed::Worker.max_run_time = old_max_run_time
+ end
+ end
+ end
+
+ context "worker prioritization" do
+ before(:each) do
+ @worker = Delayed::Worker.new(:max_priority => 5, :min_priority => -5, :quiet => true)
+ end
+
+ it "should only work_off jobs that are >= min_priority" do
+ create_job(:priority => -10)
+ create_job(:priority => 0)
+ @worker.work_off
+
+ SimpleJob.runs.should == 1
+ end
+
+ it "should only work_off jobs that are <= max_priority" do
+ create_job(:priority => 10)
+ create_job(:priority => 0)
+
+ @worker.work_off
+
+ SimpleJob.runs.should == 1
+ end
+ end
+
+ context "while running with locked and expired jobs" do
+ before(:each) do
+ @worker.name = 'worker1'
+ end
+
+ it "should not run jobs locked by another worker" do
+ create_job(:locked_by => 'other_worker', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
+ lambda { @worker.work_off }.should_not change { SimpleJob.runs }
+ end
+
+ it "should run open jobs" do
+ create_job
+ lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
+ end
+
+ it "should run expired jobs" do
+ expired_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Worker.max_run_time)
+ create_job(:locked_by => 'other_worker', :locked_at => expired_time)
+ lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
+ end
+
+ it "should run own jobs" do
+ create_job(:locked_by => @worker.name, :locked_at => (Delayed::Job.db_time_now - 1.minutes))
+ lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
+ end
+ end
+
+ describe "failed jobs" do
+ before do
+ # reset defaults
+ Delayed::Worker.destroy_failed_jobs = true
+ Delayed::Worker.max_attempts = 25
+
+ @job = Delayed::Job.enqueue ErrorJob.new
+ end
+
+ it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
+ Delayed::Worker.destroy_failed_jobs = false
+ Delayed::Worker.max_attempts = 1
+ @worker.run(@job)
+ @job.reload
+ @job.last_error.should =~ /did not work/
+ @job.attempts.should == 1
+ @job.failed_at.should_not be_nil
+ end
+
+ it "should re-schedule jobs after failing" do
+ @worker.run(@job)
+ @job.reload
+ @job.last_error.should =~ /did not work/
+ @job.last_error.should =~ /sample_jobs.rb:\d+:in `perform'/
+ @job.attempts.should == 1
+ @job.run_at.should > Delayed::Job.db_time_now - 10.minutes
+ @job.run_at.should < Delayed::Job.db_time_now + 10.minutes
+ end
+ end
+
+ context "reschedule" do
+ before do
+ @job = Delayed::Job.create :payload_object => SimpleJob.new
+ end
+
+ share_examples_for "any failure more than Worker.max_attempts times" do
+ context "when the job's payload has an #on_permanent_failure hook" do
+ before do
+ @job = Delayed::Job.create :payload_object => OnPermanentFailureJob.new
+ @job.payload_object.should respond_to :on_permanent_failure
+ end
+
+ it "should run that hook" do
+ @job.payload_object.should_receive :on_permanent_failure
+ Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
+ end
+ end
+
+ context "when the job's payload has no #on_permanent_failure hook" do
+ # It's a little tricky to test this in a straightforward way,
+ # because putting a should_not_receive expectation on
+ # @job.payload_object.on_permanent_failure makes that object
+ # incorrectly return true to
+ # payload_object.respond_to? :on_permanent_failure, which is what
+ # reschedule uses to decide whether to call on_permanent_failure.
+ # So instead, we just make sure that the payload_object as it
+ # already stands doesn't respond_to? on_permanent_failure, then
+ # shove it through the iterated reschedule loop and make sure we
+ # don't get a NoMethodError (caused by calling that nonexistent
+ # on_permanent_failure method).
+
+ before do
+ @job.payload_object.should_not respond_to(:on_permanent_failure)
+ end
+
+ it "should not try to run that hook" do
+ lambda do
+ Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
+ end.should_not raise_exception(NoMethodError)
+ end
+ end
+ end
+
+ context "and we want to destroy jobs" do
+ before do
+ Delayed::Worker.destroy_failed_jobs = true
+ end
+
+ it_should_behave_like "any failure more than Worker.max_attempts times"
+
+ it "should be destroyed if it failed more than Worker.max_attempts times" do
+ @job.should_receive(:destroy)
+ Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
+ end
+
+ it "should not be destroyed if failed fewer than Worker.max_attempts times" do
+ @job.should_not_receive(:destroy)
+ (Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
+ end
+ end
+
+ context "and we don't want to destroy jobs" do
+ before do
+ Delayed::Worker.destroy_failed_jobs = false
+ end
+
+ it_should_behave_like "any failure more than Worker.max_attempts times"
+
+ it "should be failed if it failed more than Worker.max_attempts times" do
+ @job.reload.failed_at.should == nil
+ Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
+ @job.reload.failed_at.should_not == nil
+ end
+
+ it "should not be failed if it failed fewer than Worker.max_attempts times" do
+ (Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
+ @job.reload.failed_at.should == nil
+ end
+ end
+ end
+ end
+end
View
1 lib/delayed/message_sending.rb
@@ -1,4 +1,5 @@
require 'active_support/basic_object'
+require 'active_support/core_ext/module/aliasing'
module Delayed
class DelayProxy < ActiveSupport::BasicObject
View
8 lib/delayed/tasks.rb
@@ -1,17 +1,13 @@
require File.join(File.dirname(__FILE__), 'daemon_tasks')
-# Re-definitions are appended to existing tasks
-task :environment
-task :merb_env
-
namespace :jobs do
desc "Clear the delayed_job queue."
- task :clear => [:merb_env, :environment] do
+ task :clear => :environment do
Delayed::Job.delete_all
end
desc "Start a delayed_job worker."
- task :work => [:merb_env, :environment] do
+ task :work => :environment do
Delayed::Worker.new(:min_priority => ENV['MIN_PRIORITY'], :max_priority => ENV['MAX_PRIORITY']).start
end
end
View
14 lib/delayed/worker.rb
@@ -2,6 +2,7 @@
require 'active_support/core_ext/numeric/time'
require 'active_support/core_ext/class/attribute_accessors'
require 'active_support/core_ext/kernel'
+require 'logger'
module Delayed
class Worker
@@ -16,8 +17,8 @@ class Worker
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true
- self.logger = if defined?(Merb::Logger)
- Merb.logger
+ self.logger = if defined?(Rails)
+ Rails.logger
elsif defined?(RAILS_DEFAULT_LOGGER)
RAILS_DEFAULT_LOGGER
end
@@ -37,14 +38,7 @@ def self.backend=(backend)
end
def self.guess_backend
- self.backend ||= if defined?(ActiveRecord)
- :active_record
- elsif defined?(MongoMapper)
- :mongo_mapper
- else
- logger.warn "Could not decide on a backend, defaulting to active_record"
- :active_record
- end
+ self.backend = :active_record if defined?(ActiveRecord)
end
def initialize(options={})
View
4 lib/delayed_job.rb
@@ -9,7 +9,3 @@
Object.send(:include, Delayed::MessageSending)
Module.send(:include, Delayed::MessageSending::ClassMethods)
-
-if defined?(Merb::Plugins)
- Merb::Plugins.add_rakefiles File.dirname(__FILE__) / 'delayed' / 'tasks'
-end
View
12 spec/backend/active_record_job_spec.rb → spec/active_record_job_spec.rb
@@ -1,22 +1,12 @@
require 'spec_helper'
-require 'backend/shared_backend_spec'
require 'delayed/backend/active_record'
describe Delayed::Backend::ActiveRecord::Job do
- before(:all) do
- @backend = Delayed::Backend::ActiveRecord::Job
- end
-
- before(:each) do
- Delayed::Backend::ActiveRecord::Job.delete_all
- SimpleJob.runs = 0
- end
-
after do
Time.zone = nil
end
- it_should_behave_like 'a backend'
+ it_should_behave_like 'a delayed_job backend'
context "db_time_now" do
it "should return time in current time zone if set" do
View
15 spec/backend/couch_rest_job_spec.rb
@@ -1,15 +0,0 @@
-require 'spec_helper'
-require 'backend/shared_backend_spec'
-require 'delayed/backend/couch_rest'
-
-describe Delayed::Backend::CouchRest::Job do
- before(:all) do
- @backend = Delayed::Backend::CouchRest::Job
- end
-
- before(:each) do
- @backend.delete_all
- end
-
- it_should_behave_like 'a backend'
-end
View
16 spec/backend/data_mapper_job_spec.rb
@@ -1,16 +0,0 @@
-require 'spec_helper'
-require 'backend/shared_backend_spec'
-require 'delayed/backend/data_mapper'
-
-describe Delayed::Backend::DataMapper::Job do
- before(:all) do
- @backend = Delayed::Backend::DataMapper::Job
- end
-
- before(:each) do
- # reset database before each example is run
- DataMapper.auto_migrate!
- end
-
- it_should_behave_like 'a backend'
-end
View
94 spec/backend/mongo_mapper_job_spec.rb
@@ -1,94 +0,0 @@
-require 'spec_helper'
-require 'backend/shared_backend_spec'
-require 'delayed/backend/mongo_mapper'
-
-describe Delayed::Backend::MongoMapper::Job do
- before(:all) do
- @backend = Delayed::Backend::MongoMapper::Job
- end
-
- before(:each) do
- MongoMapper.database.collections.each(&:remove)
- end
-
- it_should_behave_like 'a backend'
-
- describe "indexes" do
- it "should have combo index on priority and run_at" do
- @backend.collection.index_information.detect { |index| index[0] == 'priority_1_run_at_1' }.should_not be_nil
- end
-
- it "should have index on locked_by" do
- @backend.collection.index_information.detect { |index| index[0] == 'locked_by_1' }.should_not be_nil
- end
- end
-
- describe "delayed method" do
- class MongoStoryReader
- def read(story)
- "Epilog: #{story.tell}"
- end
- end
-
- class MongoStory
- include ::MongoMapper::Document
- key :text, String
-
- def tell
- text
- end
- end
-
- it "should ignore not found errors because they are permanent" do
- story = MongoStory.create :text => 'Once upon a time...'
- job = story.delay.tell
- story.destroy
- lambda { job.invoke_job }.should_not raise_error
- end
-
- it "should store the object as string" do
- story = MongoStory.create :text => 'Once upon a time...'
- job = story.delay.tell
-
- job.payload_object.class.should == Delayed::PerformableMethod
- job.payload_object.object.should == story
- job.payload_object.method.should == :tell
- job.payload_object.args.should == []
- job.payload_object.perform.should == 'Once upon a time...'
- end
-
- it "should store arguments as string" do
- story = MongoStory.create :text => 'Once upon a time...'
- job = MongoStoryReader.new.delay.read(story)
- job.payload_object.class.should == Delayed::PerformableMethod
- job.payload_object.method.should == :read
- job.payload_object.args.should == [story]
- job.payload_object.perform.should == 'Epilog: Once upon a time...'
- end
- end
-
- describe "before_fork" do
- after do
- MongoMapper.connection.connect_to_master
- end
-
- it "should disconnect" do
- lambda do
- Delayed::Backend::MongoMapper::Job.before_fork
- end.should change { !!MongoMapper.connection.connected? }.from(true).to(false)
- end
- end
-
- describe "after_fork" do
- before do
- MongoMapper.connection.close
- end
-
- it "should call reconnect" do
- lambda do
- Delayed::Backend::MongoMapper::Job.after_fork
- end.should change { !!MongoMapper.connection.connected? }.from(false).to(true)
- end
- end
-
-end
View
279 spec/backend/shared_backend_spec.rb
@@ -1,279 +0,0 @@
-class NamedJob < Struct.new(:perform)
- def display_name
- 'named_job'
- end
-end
-
-shared_examples_for 'a backend' do
- def create_job(opts = {})
- @backend.create(opts.merge(:payload_object => SimpleJob.new))
- end
-
- before do
- Delayed::Worker.max_priority = nil
- Delayed::Worker.min_priority = nil
- Delayed::Worker.default_priority = 99
- SimpleJob.runs = 0
- end
-
- it "should set run_at automatically if not set" do
- @backend.create(:payload_object => ErrorJob.new ).run_at.should_not be_nil
- end
-
- it "should not set run_at automatically if already set" do
- later = @backend.db_time_now + 5.minutes
- @backend.create(:payload_object => ErrorJob.new, :run_at => later).run_at.should be_close(later, 1)
- end
-
- it "should raise ArgumentError when handler doesn't respond_to :perform" do
- lambda { @backend.enqueue(Object.new) }.should raise_error(ArgumentError)
- end
-
- it "should increase count after enqueuing items" do
- @backend.enqueue SimpleJob.new
- @backend.count.should == 1
- end
-
- it "should be able to set priority when enqueuing items" do
- @job = @backend.enqueue SimpleJob.new, 5
- @job.priority.should == 5
- end
-
- it "should use default priority when it is not set" do
- @job = @backend.enqueue SimpleJob.new
- @job.priority.should == 99
- end
-
- it "should be able to set run_at when enqueuing items" do
- later = @backend.db_time_now + 5.minutes
- @job = @backend.enqueue SimpleJob.new, 5, later
- @job.run_at.should be_close(later, 1)
- end
-
- it "should work with jobs in modules" do
- M::ModuleJob.runs = 0
- job = @backend.enqueue M::ModuleJob.new
- lambda { job.invoke_job }.should change { M::ModuleJob.runs }.from(0).to(1)
- end
-
- describe "payload_object" do
- it "should raise a DeserializationError when the job class is totally unknown" do
- job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
- lambda { job.payload_object }.should raise_error(Delayed::Backend::DeserializationError)
- end
-
- it "should raise a DeserializationError when the job struct is totally unknown" do
- job = @backend.new :handler => "--- !ruby/struct:StructThatDoesNotExist {}"
- lambda { job.payload_object }.should raise_error(Delayed::Backend::DeserializationError)
- end
-
- it "should autoload classes that are unknown at runtime" do
- job = @backend.new :handler => "--- !ruby/object:Autoloaded::Clazz {}"
- lambda { job.payload_object }.should_not raise_error(Delayed::Backend::DeserializationError)
- end
-
- it "should autoload structs that are unknown at runtime" do
- job = @backend.new :handler => "--- !ruby/struct:Autoloaded::Struct {}"
- lambda { job.payload_object }.should_not raise_error(Delayed::Backend::DeserializationError)
- end
- end
-
- describe "find_available" do
- it "should not find failed jobs" do
- @job = create_job :attempts => 50, :failed_at => @backend.db_time_now
- @backend.find_available('worker', 5, 1.second).should_not include(@job)
- end
-
- it "should not find jobs scheduled for the future" do
- @job = create_job :run_at => (@backend.db_time_now + 1.minute)
- @backend.find_available('worker', 5, 4.hours).should_not include(@job)
- end
-
- it "should not find jobs locked by another worker" do
- @job = create_job(:locked_by => 'other_worker', :locked_at => @backend.db_time_now - 1.minute)
- @backend.find_available('worker', 5, 4.hours).should_not include(@job)
- end
-
- it "should find open jobs" do
- @job = create_job
- @backend.find_available('worker', 5, 4.hours).should include(@job)
- end
-
- it "should find expired jobs" do
- @job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now - 2.minutes)
- @backend.find_available('worker', 5, 1.minute).should include(@job)
- end
-
- it "should find own jobs" do
- @job = create_job(:locked_by => 'worker', :locked_at => (@backend.db_time_now - 1.minutes))
- @backend.find_available('worker', 5, 4.hours).should include(@job)
- end
-
- it "should find only the right amount of jobs" do
- 10.times { create_job }
- @backend.find_available('worker', 7, 4.hours).should have(7).jobs
- end
- end
-
- context "when another worker is already performing an task, it" do
-
- before :each do
- @job = @backend.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => @backend.db_time_now - 5.minutes
- end
-
- it "should not allow a second worker to get exclusive access" do
- @job.lock_exclusively!(4.hours, 'worker2').should == false
- end
-
- it "should allow a second worker to get exclusive access if the timeout has passed" do
- @job.lock_exclusively!(1.minute, 'worker2').should == true
- end
-
- it "should be able to get access to the task if it was started more then max_age ago" do
- @job.locked_at = 5.hours.ago
- @job.save
-
- @job.lock_exclusively! 4.hours, 'worker2'
- @job.reload
- @job.locked_by.should == 'worker2'
- @job.locked_at.should > 1.minute.ago
- end
-
- it "should not be found by another worker" do
- @backend.find_available('worker2', 1, 6.minutes).length.should == 0
- end
-
- it "should be found by another worker if the time has expired" do
- @backend.find_available('worker2', 1, 4.minutes).length.should == 1
- end
-
- it "should be able to get exclusive access again when the worker name is the same" do
- @job.lock_exclusively!(5.minutes, 'worker1').should be_true
- @job.lock_exclusively!(5.minutes, 'worker1').should be_true
- @job.lock_exclusively!(5.minutes, 'worker1').should be_true
- end
- end
-
- context "when another worker has worked on a task since the job was found to be available, it" do
-
- before :each do
- @job = @backend.create :payload_object => SimpleJob.new
- @job_copy_for_worker_2 = @backend.find(@job.id)
- end
-
- it "should not allow a second worker to get exclusive access if already successfully processed by worker1" do
- @job.destroy
- @job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
- end
-
- it "should not allow a second worker to get exclusive access if failed to be processed by worker1 and run_at time is now in future (due to backing off behaviour)" do
- @job.update_attributes(:attempts => 1, :run_at => 1.day.from_now)
- @job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
- end
- end
-
- context "#name" do
- it "should be the class name of the job that was enqueued" do
- @backend.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
- end
-
- it "should be the method that will be called if its a performable method object" do
- job = @backend.new(:payload_object => NamedJob.new)
- job.name.should == 'named_job'
- end
-
- it "should be the instance method that will be called if its a performable method object" do
- @job = Story.create(:text => "...").delay.save
- @job.name.should == 'Story#save'
- end
- end
-
- context "worker prioritization" do
- before(:each) do
- Delayed::Worker.max_priority = nil
- Delayed::Worker.min_priority = nil
- end
-
- it "should fetch jobs ordered by priority" do
- 10.times { @backend.enqueue SimpleJob.new, rand(10) }
- jobs = @backend.find_available('worker', 10)
- jobs.size.should == 10
- jobs.each_cons(2) do |a, b|
- a.priority.should <= b.priority
- end
- end
-
- it "should only find jobs greater than or equal to min priority" do
- min = 5
- Delayed::Worker.min_priority = min
- 10.times {|i| @backend.enqueue SimpleJob.new, i }
- jobs = @backend.find_available('worker', 10)
- jobs.each {|job| job.priority.should >= min}
- end
-
- it "should only find jobs less than or equal to max priority" do
- max = 5
- Delayed::Worker.max_priority = max
- 10.times {|i| @backend.enqueue SimpleJob.new, i }
- jobs = @backend.find_available('worker', 10)
- jobs.each {|job| job.priority.should <= max}
- end
- end
-
- context "clear_locks!" do
- before do
- @job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now)
- end
-
- it "should clear locks for the given worker" do
- @backend.clear_locks!('worker')
- @backend.find_available('worker2', 5, 1.minute).should include(@job)
- end
-
- it "should not clear locks for other workers" do
- @backend.clear_locks!('worker1')
- @backend.find_available('worker1', 5, 1.minute).should_not include(@job)
- end
- end
-
- context "unlock" do
- before do
- @job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now)
- end
-
- it "should clear locks" do
- @job.unlock
- @job.locked_by.should be_nil
- @job.locked_at.should be_nil
- end
- end
-
- context "large handler" do
- before do
- text = "Lorem ipsum dolor sit amet. " * 1000
- @job = @backend.enqueue Delayed::PerformableMethod.new(text, :length, {})
- end
-
- it "should have an id" do
- @job.id.should_not be_nil
- end
- end
-
- describe "yaml serialization" do
- it "should reload changed attributes" do
- job = @backend.enqueue SimpleJob.new
- yaml = job.to_yaml
- job.priority = 99
- job.save
- YAML.load(yaml).priority.should == 99
- end
-
- it "should ignore destroyed records" do
- job = @backend.enqueue SimpleJob.new
- yaml = job.to_yaml
- job.destroy
- lambda { YAML.load(yaml).should be_nil }.should_not raise_error
- end
- end
-
-end
View
36 spec/sample_jobs.rb
@@ -1,3 +1,9 @@
+class NamedJob < Struct.new(:perform)
+ def display_name
+ 'named_job'
+ end
+end
+
class SimpleJob
cattr_accessor :runs; self.runs = 0
def perform; @@runs += 1; end
@@ -23,3 +29,33 @@ class ModuleJob
def perform; @@runs += 1; end
end
end
+
+class SuccessfulCallbackJob
+ cattr_accessor :messages
+
+ def before(job)
+ SuccessfulCallbackJob.messages << 'before perform'
+ end
+
+ def perform
+ SuccessfulCallbackJob.messages << 'perform'
+ end
+
+ def after(job, error = nil)
+ SuccessfulCallbackJob.messages << 'after perform'
+ end
+
+ def success(job)
+ SuccessfulCallbackJob.messages << 'success!'
+ end
+
+ def failure(job, error)
+ SuccessfulCallbackJob.messages << "error: #{error.class}"
+ end
+end
+
+class FailureCallbackJob < SuccessfulCallbackJob
+ def perform
+ raise "failure job"
+ end
+end
View
33 spec/setup/active_record.rb
@@ -1,33 +0,0 @@
-require 'active_record'
-
-ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => ':memory:')
-ActiveRecord::Base.logger = Delayed::Worker.logger
-ActiveRecord::Migration.verbose = false
-
-ActiveRecord::Schema.define do
- create_table :delayed_jobs, :force => true do |table|
- table.integer :priority, :default => 0
- table.integer :attempts, :default => 0
- table.text :handler
- table.text :last_error
- table.datetime :run_at
- table.datetime :locked_at
- table.datetime :failed_at
- table.string :locked_by
- table.timestamps
- end
-
- add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
-
- create_table :stories, :force => true do |table|
- table.string :text
- end
-end
-
-# Purely useful for test cases...
-class Story < ActiveRecord::Base
- def tell; text; end
- def whatever(n, _); tell*n; end
-
- handle_asynchronously :whatever
-end
View
7 spec/setup/couch_rest.rb
@@ -1,7 +0,0 @@
-require 'couchrest'
-require 'delayed/backend/couch_rest'
-
-Delayed::Backend::CouchRest::Job.use_database CouchRest::Server.new.database!('delayed_job_spec')
-
-# try to perform a query to check that we can connect
-Delayed::Backend::CouchRest::Job.all
View
8 spec/setup/data_mapper.rb
@@ -1,8 +0,0 @@
-require 'dm-core'
-require 'dm-validations'
-
-require 'delayed/backend/data_mapper'
-
-DataMapper.logger = Delayed::Worker.logger
-DataMapper.setup(:default, "sqlite3::memory:")
-DataMapper.auto_migrate!
View
17 spec/setup/mongo_mapper.rb
@@ -1,17 +0,0 @@
-require 'mongo_mapper'
-
-MongoMapper.config = {
- RAILS_ENV => {'database' => 'delayed_job'}
-}
-MongoMapper.connect RAILS_ENV
-
-unless defined?(Story)
- class Story
- include ::MongoMapper::Document
- def tell; text; end
- def whatever(n, _); tell*n; end
- def self.count; end
-
- handle_asynchronously :whatever
- end
-end
View
48 spec/spec_helper.rb
@@ -1,31 +1,53 @@
$:.unshift(File.dirname(__FILE__) + '/../lib')
require 'rubygems'
+require 'bundler/setup'
require 'spec'
require 'logger'
gem 'activerecord', ENV['RAILS_VERSION'] if ENV['RAILS_VERSION']
require 'delayed_job'
-require 'sample_jobs'
+require 'delayed/backend/shared_spec'
Delayed::Worker.logger = Logger.new('/tmp/dj.log')
RAILS_ENV = 'test'
-# determine the available backends
-BACKENDS = []
-Dir.glob("#{File.dirname(__FILE__)}/setup/*.rb") do |backend|
- begin
- backend = File.basename(backend, '.rb')
- require "setup/#{backend}"
- require "backend/#{backend}_job_spec"
- BACKENDS << backend.to_sym
- rescue Exception
- puts "Unable to load #{backend} backend: #{$!}"
+require 'active_record'
+
+ActiveRecord::Base.establish_connection(:adapter => 'sqlite3', :database => ':memory:')
+ActiveRecord::Base.logger = Delayed::Worker.logger
+ActiveRecord::Migration.verbose = false
+
+ActiveRecord::Schema.define do
+ create_table :delayed_jobs, :force => true do |table|
+ table.integer :priority, :default => 0
+ table.integer :attempts, :default => 0
+ table.text :handler
+ table.text :last_error
+ table.datetime :run_at
+ table.datetime :locked_at
+ table.datetime :failed_at
+ table.string :locked_by
+ table.timestamps
end
+
+ add_index :delayed_jobs, [:priority, :run_at], :name => 'delayed_jobs_priority'
+
+ create_table :stories, :force => true do |table|
+ table.string :text
+ end
+end
+
+# Purely useful for test cases...
+class Story < ActiveRecord::Base
+ def tell; text; end
+ def whatever(n, _); tell*n; end
+
+ handle_asynchronously :whatever
end
-Delayed::Worker.backend = BACKENDS.first
+Delayed::Worker.backend = :active_record
# Add this directory so the ActiveSupport autoloading works
-ActiveSupport::Dependencies.load_paths << File.dirname(__FILE__)
+ActiveSupport::Dependencies.autoload_paths << File.dirname(__FILE__)
View
195 spec/worker_spec.rb
@@ -1,10 +1,6 @@
require 'spec_helper'
describe Delayed::Worker do
- def job_create(opts = {})
- Delayed::Job.create(opts.merge(:payload_object => SimpleJob.new))
- end
-
describe "backend=" do
before do
@clazz = Class.new
@@ -20,195 +16,4 @@ def job_create(opts = {})
Delayed::Worker.backend.should == Delayed::Backend::ActiveRecord::Job
end
end
-
- BACKENDS.each do |backend|
- describe "with the #{backend} backend" do
- before do
- Delayed::Worker.backend = backend
- Delayed::Job.delete_all
-
- @worker = Delayed::Worker.new(:max_priority => nil, :min_priority => nil, :quiet => true)
-
- SimpleJob.runs = 0
- end
-
- describe "running a job" do
- it "should fail after Worker.max_run_time" do
- begin
- old_max_run_time = Delayed::Worker.max_run_time
- Delayed::Worker.max_run_time = 1.second
- @job = Delayed::Job.create :payload_object => LongRunningJob.new
- @worker.run(@job)
- @job.reload.last_error.should =~ /expired/
- @job.attempts.should == 1
- ensure
- Delayed::Worker.max_run_time = old_max_run_time
- end
- end
- end
-
- context "worker prioritization" do
- before(:each) do
- @worker = Delayed::Worker.new(:max_priority => 5, :min_priority => -5, :quiet => true)
- end
-
- it "should only work_off jobs that are >= min_priority" do
- job_create(:priority => -10)
- job_create(:priority => 0)
- @worker.work_off
-
- SimpleJob.runs.should == 1
- end
-
- it "should only work_off jobs that are <= max_priority" do
- job_create(:priority => 10)
- job_create(:priority => 0)
-
- @worker.work_off
-
- SimpleJob.runs.should == 1
- end
- end
-
- context "while running with locked and expired jobs" do
- before(:each) do
- @worker.name = 'worker1'
- end
-
- it "should not run jobs locked by another worker" do
- job_create(:locked_by => 'other_worker', :locked_at => (Delayed::Job.db_time_now - 1.minutes))
- lambda { @worker.work_off }.should_not change { SimpleJob.runs }
- end
-
- it "should run open jobs" do
- job_create
- lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
- end
-
- it "should run expired jobs" do
- expired_time = Delayed::Job.db_time_now - (1.minutes + Delayed::Worker.max_run_time)
- job_create(:locked_by => 'other_worker', :locked_at => expired_time)
- lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
- end
-
- it "should run own jobs" do
- job_create(:locked_by => @worker.name, :locked_at => (Delayed::Job.db_time_now - 1.minutes))
- lambda { @worker.work_off }.should change { SimpleJob.runs }.from(0).to(1)
- end
- end
-
- describe "failed jobs" do
- before do
- # reset defaults
- Delayed::Worker.destroy_failed_jobs = true
- Delayed::Worker.max_attempts = 25
-
- @job = Delayed::Job.enqueue ErrorJob.new
- end
-
- it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
- Delayed::Worker.destroy_failed_jobs = false
- Delayed::Worker.max_attempts = 1
- @worker.run(@job)
- @job.reload
- @job.last_error.should =~ /did not work/
- @job.last_error.should =~ /worker_spec.rb/
- @job.attempts.should == 1
- @job.failed_at.should_not be_nil
- end
-
- it "should re-schedule jobs after failing" do
- @worker.run(@job)
- @job.reload
- @job.last_error.should =~ /did not work/
- @job.last_error.should =~ /sample_jobs.rb:8:in `perform'/
- @job.attempts.should == 1
- @job.run_at.should > Delayed::Job.db_time_now - 10.minutes
- @job.run_at.should < Delayed::Job.db_time_now + 10.minutes
- end
- end
-
- context "reschedule" do
- before do
- @job = Delayed::Job.create :payload_object => SimpleJob.new
- end
-
- share_examples_for "any failure more than Worker.max_attempts times" do
- context "when the job's payload has an #on_permanent_failure hook" do
- before do
- @job = Delayed::Job.create :payload_object => OnPermanentFailureJob.new
- @job.payload_object.should respond_to :on_permanent_failure
- end
-
- it "should run that hook" do
- @job.payload_object.should_receive :on_permanent_failure
- Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
- end
- end
-
- context "when the job's payload has no #on_permanent_failure hook" do
- # It's a little tricky to test this in a straightforward way,
- # because putting a should_not_receive expectation on
- # @job.payload_object.on_permanent_failure makes that object
- # incorrectly return true to
- # payload_object.respond_to? :on_permanent_failure, which is what
- # reschedule uses to decide whether to call on_permanent_failure.
- # So instead, we just make sure that the payload_object as it
- # already stands doesn't respond_to? on_permanent_failure, then
- # shove it through the iterated reschedule loop and make sure we
- # don't get a NoMethodError (caused by calling that nonexistent
- # on_permanent_failure method).
-
- before do
- @job.payload_object.should_not respond_to(:on_permanent_failure)
- end
-
- it "should not try to run that hook" do
- lambda do
- Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
- end.should_not raise_exception(NoMethodError)
- end
- end
- end
-
- context "and we want to destroy jobs" do
- before do
- Delayed::Worker.destroy_failed_jobs = true
- end
-
- it_should_behave_like "any failure more than Worker.max_attempts times"
-
- it "should be destroyed if it failed more than Worker.max_attempts times" do
- @job.should_receive(:destroy)
- Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
- end
-
- it "should not be destroyed if failed fewer than Worker.max_attempts times" do
- @job.should_not_receive(:destroy)
- (Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
- end
- end
-
- context "and we don't want to destroy jobs" do
- before do
- Delayed::Worker.destroy_failed_jobs = false
- end
-
- it_should_behave_like "any failure more than Worker.max_attempts times"
-
- it "should be failed if it failed more than Worker.max_attempts times" do
- @job.reload.failed_at.should == nil
- Delayed::Worker.max_attempts.times { @worker.reschedule(@job) }
- @job.reload.failed_at.should_not == nil
- end
-
- it "should not be failed if it failed fewer than Worker.max_attempts times" do
- (Delayed::Worker.max_attempts - 1).times { @worker.reschedule(@job) }
- @job.reload.failed_at.should == nil
- end
- end
- end
- end
- end
-
end
View
16 spec/yaml_ext_spec.rb
@@ -0,0 +1,16 @@
+require 'spec_helper'
+
+describe YAML do
+ it "should autoload classes that are unknown at runtime" do
+ lambda {
+ YAML.load("--- !ruby/object:Autoloaded::Clazz {}")
+ }.should_not raise_error
+ end
+
+ it "should autoload structs that are unknown at runtime" do
+ lambda {
+ YAML.load("--- !ruby/struct:Autoloaded::Struct {}")
+ }.should_not raise_error
+ end
+
+end

0 comments on commit 4c3e1c9

Please sign in to comment.
Something went wrong with that request. Please try again.