Skip to content

Commit

Permalink
mongoid backend support. courtesy of dohmoose.
Browse files Browse the repository at this point in the history
  • Loading branch information
iamnader committed Sep 2, 2010
1 parent f61a855 commit ec0cdf5
Show file tree
Hide file tree
Showing 6 changed files with 186 additions and 2 deletions.
26 changes: 26 additions & 0 deletions README.textile
Expand Up @@ -62,6 +62,32 @@ MongoMapper.setup(config, Rails.env)
Delayed::Worker.backend = :mongo_mapper
</pre>

h3. Mongoid

You need to create your DelayedJob mongoid model:
<pre>
# app/models/delayed_job.rb
class DelayedJob
include Mongoid::Document
include Mongoid::Timestamps

field :priority, :type => Integer, :default => 0 # Allows some jobs to jump to the front of the queue
field :attempts, :type => Integer, :default => 0 # Provides for retries, but still fail eventually.
field :handler # YAML-encoded string of the object that will do work
field :last_error # reason for last failure (See Note below)
field :run_at, :type => DateTime # When to run. Could be Time.zone.now for immediately, or sometime in the future.
field :locked_at, :type => DateTime # Set when a client is working on this object
field :failed_at, :type => DateTime # Set when all retries have failed (actually, by default, the record is deleted instead)
field :locked_by # Who is working on this object (if locked)
end
</pre>

And set the backend in the initializer:
<pre>
# config/initializers/delayed_job.rb
Delayed::Worker.backend = :mongoid
</pre>

h3. DataMapper

<pre>
Expand Down
98 changes: 98 additions & 0 deletions lib/delayed/backend/mongoid.rb
@@ -0,0 +1,98 @@
Mongoid::Document.class_eval do
yaml_as "tag:ruby.yaml.org,2002:Mongoid"

def self.yaml_new(klass, tag, val)
begin
klass.find(val['attributes']['_id'])
rescue Mongoid::Errors::DocumentNotFound
nil
end
end

def to_yaml_properties
['@attributes']
end
end

module Delayed
module Backend
module Mongoid
class Job
include ::Mongoid::Document
include ::Mongoid::Timestamps
include Delayed::Backend::Base
field :priority, :type=> Integer, :default => 0
field :attempts, :type=> Integer, :default => 0
field :handler, :type=> String
field :run_at, :type=> Time
field :locked_at, :type=> Time
field :locked_by, :type=> String, :index => true
field :failed_at, :type=> Time
field :last_error, :type=> String


before_save :set_default_run_at

def self.before_fork
::Mongoid.master.connection.close
end

def self.after_fork
::Mongoid.master.connection.connect_to_master
end

def self.db_time_now
Time.now.utc
end

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
right_now = db_time_now

conditions = {:run_at => {"$lte" => right_now}, :failed_at => nil}
(conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
(conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority


where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}"
results = self.where(conditions.merge(:locked_by => worker_name)).limit(-limit).order_by([['priority', 1], ['run_at', 1]]).to_a
results += self.where(conditions.merge('$where' => where)).limit(-limit+results.size).order_by([['priority', 1], ['run_at', 1]]).to_a if results.size < limit
results
end
# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
self.collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
end

# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker = worker_name)
right_now = self.class.db_time_now
overtime = right_now - max_run_time.to_i

query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}"
conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query}

self.collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}})
affected_rows = self.collection.find({:_id => id, :locked_by => worker}).count
if affected_rows == 1
self.locked_at = right_now
self.locked_by = worker
return true
else
return false
end
end

private

def self.make_date(date_or_seconds)
"new Date(#{date_or_seconds.to_f * 1000})"
end

def make_date(date)
self.class.make_date(date)
end
end
end
end
end
2 changes: 2 additions & 0 deletions lib/delayed/worker.rb
Expand Up @@ -43,6 +43,8 @@ def self.guess_backend
:active_record
elsif defined?(MongoMapper)
:mongo_mapper
elsif defined?(Mongoid)
:mongoid
else
logger.warn "Could not decide on a backend, defaulting to active_record"
:active_record
Expand Down
41 changes: 41 additions & 0 deletions spec/backend/mongoid_job_spec.rb
@@ -0,0 +1,41 @@
require 'spec_helper'
require 'backend/shared_backend_spec'
require 'delayed/backend/mongoid'

describe Delayed::Backend::Mongoid::Job do
before(:all) do
@backend = Delayed::Backend::Mongoid::Job
Delayed::Worker.backend = :mongoid
end

before(:each) do
Delayed::Backend::Mongoid::Job.destroy_all
end

it_should_behave_like 'a backend'


describe "before_fork" do
after do
::Mongoid.master.connection.close
end

it "should disconnect" do
lambda do
Delayed::Backend::Mongoid::Job.before_fork
end.should change { !!Mongoid.master.connection.connected? }.from(true).to(false)
end
end

describe "after_fork" do
before do
::Mongoid.master.connection.close
end

it "should call reconnect" do
lambda do
Delayed::Backend::Mongoid::Job.after_fork
end.should change { !!Mongoid.master.connection.connected? }.from(false).to(true)
end
end
end
17 changes: 17 additions & 0 deletions spec/setup/mongoid.rb
@@ -0,0 +1,17 @@
require 'mongoid'

require 'delayed/backend/mongoid'

Mongoid.configure do |config|
config.master = config.master = Mongo::Connection.new.db('dl_spec')
end

class Story
include ::Mongoid::Document
def tell; text; end
def whatever(n, _); tell*n; end
def self.count; end

handle_asynchronously :whatever
end

4 changes: 2 additions & 2 deletions spec/spec_helper.rb
Expand Up @@ -20,8 +20,8 @@
require "setup/#{backend}"
require "backend/#{backend}_job_spec"
BACKENDS << backend.to_sym
rescue Exception
puts "Unable to load #{backend} backend: #{$!}"
rescue LoadError
puts "Unable to load #{backend} backend: #{$!}"
end
end

Expand Down

0 comments on commit ec0cdf5

Please sign in to comment.