Permalink
Browse files

mongoid backend support. courtesy of dohmoose.

  • Loading branch information...
1 parent f61a855 commit ec0cdf511217615960fe42e8eb04e6091662c2fa @iamnader iamnader committed Sep 2, 2010
Showing with 186 additions and 2 deletions.
  1. +26 −0 README.textile
  2. +98 −0 lib/delayed/backend/mongoid.rb
  3. +2 −0 lib/delayed/worker.rb
  4. +41 −0 spec/backend/mongoid_job_spec.rb
  5. +17 −0 spec/setup/mongoid.rb
  6. +2 −2 spec/spec_helper.rb
View
26 README.textile
@@ -62,6 +62,32 @@ MongoMapper.setup(config, Rails.env)
Delayed::Worker.backend = :mongo_mapper
</pre>
+h3. Mongoid
+
+You need to create your DelayedJob mongoid model:
+<pre>
+# app/models/delayed_job.rb
+class DelayedJob
+ include Mongoid::Document
+ include Mongoid::Timestamps
+
+ field :priority, :type => Integer, :default => 0 # Allows some jobs to jump to the front of the queue
+ field :attempts, :type => Integer, :default => 0 # Provides for retries, but still fail eventually.
+ field :handler # YAML-encoded string of the object that will do work
+ field :last_error # reason for last failure (See Note below)
+ field :run_at, :type => DateTime # When to run. Could be Time.zone.now for immediately, or sometime in the future.
+ field :locked_at, :type => DateTime # Set when a client is working on this object
+ field :failed_at, :type => DateTime # Set when all retries have failed (actually, by default, the record is deleted instead)
+ field :locked_by # Who is working on this object (if locked)
+end
+</pre>
+
+And set the backend in the initializer:
+<pre>
+# config/initializers/delayed_job.rb
+Delayed::Worker.backend = :mongoid
+</pre>
+
h3. DataMapper
<pre>
View
98 lib/delayed/backend/mongoid.rb
@@ -0,0 +1,98 @@
+Mongoid::Document.class_eval do
+ yaml_as "tag:ruby.yaml.org,2002:Mongoid"
+
+ def self.yaml_new(klass, tag, val)
+ begin
+ klass.find(val['attributes']['_id'])
+ rescue Mongoid::Errors::DocumentNotFound
+ nil
+ end
+ end
+
+ def to_yaml_properties
+ ['@attributes']
+ end
+end
+
+module Delayed
+ module Backend
+ module Mongoid
+ class Job
+ include ::Mongoid::Document
+ include ::Mongoid::Timestamps
+ include Delayed::Backend::Base
+ field :priority, :type=> Integer, :default => 0
+ field :attempts, :type=> Integer, :default => 0
+ field :handler, :type=> String
+ field :run_at, :type=> Time
+ field :locked_at, :type=> Time
+ field :locked_by, :type=> String, :index => true
+ field :failed_at, :type=> Time
+ field :last_error, :type=> String
+
+
+ before_save :set_default_run_at
+
+ def self.before_fork
+ ::Mongoid.master.connection.close
+ end
+
+ def self.after_fork
+ ::Mongoid.master.connection.connect_to_master
+ end
+
+ def self.db_time_now
+ Time.now.utc
+ end
+
+ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
+ right_now = db_time_now
+
+ conditions = {:run_at => {"$lte" => right_now}, :failed_at => nil}
+ (conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
+ (conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority
+
+
+ where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}"
+ results = self.where(conditions.merge(:locked_by => worker_name)).limit(-limit).order_by([['priority', 1], ['run_at', 1]]).to_a
+ results += self.where(conditions.merge('$where' => where)).limit(-limit+results.size).order_by([['priority', 1], ['run_at', 1]]).to_a if results.size < limit
+ results
+ end
+ # When a worker is exiting, make sure we don't have any locked jobs.
+ def self.clear_locks!(worker_name)
+ self.collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
+ end
+
+ # Lock this job for this worker.
+ # Returns true if we have the lock, false otherwise.
+ def lock_exclusively!(max_run_time, worker = worker_name)
+ right_now = self.class.db_time_now
+ overtime = right_now - max_run_time.to_i
+
+ query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}"
+ conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query}
+
+ self.collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}})
+ affected_rows = self.collection.find({:_id => id, :locked_by => worker}).count
+ if affected_rows == 1
+ self.locked_at = right_now
+ self.locked_by = worker
+ return true
+ else
+ return false
+ end
+ end
+
+ private
+
+ def self.make_date(date_or_seconds)
+ "new Date(#{date_or_seconds.to_f * 1000})"
+ end
+
+ def make_date(date)
+ self.class.make_date(date)
+ end
+ end
+ end
+ end
+end
View
2 lib/delayed/worker.rb
@@ -43,6 +43,8 @@ def self.guess_backend
:active_record
elsif defined?(MongoMapper)
:mongo_mapper
+ elsif defined?(Mongoid)
+ :mongoid
else
logger.warn "Could not decide on a backend, defaulting to active_record"
:active_record
View
41 spec/backend/mongoid_job_spec.rb
@@ -0,0 +1,41 @@
+require 'spec_helper'
+require 'backend/shared_backend_spec'
+require 'delayed/backend/mongoid'
+
+describe Delayed::Backend::Mongoid::Job do
+ before(:all) do
+ @backend = Delayed::Backend::Mongoid::Job
+ Delayed::Worker.backend = :mongoid
+ end
+
+ before(:each) do
+ Delayed::Backend::Mongoid::Job.destroy_all
+ end
+
+ it_should_behave_like 'a backend'
+
+
+ describe "before_fork" do
+ after do
+ ::Mongoid.master.connection.close
+ end
+
+ it "should disconnect" do
+ lambda do
+ Delayed::Backend::Mongoid::Job.before_fork
+ end.should change { !!Mongoid.master.connection.connected? }.from(true).to(false)
+ end
+ end
+
+ describe "after_fork" do
+ before do
+ ::Mongoid.master.connection.close
+ end
+
+ it "should call reconnect" do
+ lambda do
+ Delayed::Backend::Mongoid::Job.after_fork
+ end.should change { !!Mongoid.master.connection.connected? }.from(false).to(true)
+ end
+ end
+end
View
17 spec/setup/mongoid.rb
@@ -0,0 +1,17 @@
+require 'mongoid'
+
+require 'delayed/backend/mongoid'
+
+Mongoid.configure do |config|
+ config.master = config.master = Mongo::Connection.new.db('dl_spec')
+end
+
+class Story
+ include ::Mongoid::Document
+ def tell; text; end
+ def whatever(n, _); tell*n; end
+ def self.count; end
+
+ handle_asynchronously :whatever
+end
+
View
4 spec/spec_helper.rb
@@ -20,8 +20,8 @@
require "setup/#{backend}"
require "backend/#{backend}_job_spec"
BACKENDS << backend.to_sym
- rescue Exception
- puts "Unable to load #{backend} backend: #{$!}"
+ rescue LoadError
+ puts "Unable to load #{backend} backend: #{$!}"
end
end

0 comments on commit ec0cdf5

Please sign in to comment.