Skip to content

Commit

Permalink
Mongoid 3 support
Browse files Browse the repository at this point in the history
  • Loading branch information
asavartsov authored and sferik committed Jul 26, 2012
1 parent 68bcf62 commit b24f871
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 56 deletions.
43 changes: 13 additions & 30 deletions lib/delayed/backend/mongoid.rb
Expand Up @@ -24,55 +24,38 @@ class Job

before_save :set_default_run_at

def self.before_fork
::Mongoid.master.connection.close
end

def self.after_fork
::Mongoid.master.connection.connect
end

def self.db_time_now
Time.now.utc
end

# Reserves this job for the worker.
#
# Uses Mongo's findAndModify operation to atomically pick and lock one
# job from from the collection. findAndModify is not yet available
# directly thru Mongoid so go down to the Mongo Ruby driver instead.
# job from from the collection.
def self.reserve(worker, max_run_time = Worker.max_run_time)
right_now = db_time_now

conditions = {:run_at => {"$lte" => right_now}, :failed_at => nil}
(conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
(conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority
(conditions[:queue] ||= {})['$in'] = Worker.queues if Worker.queues.any?

conditions['$or'] = [
criteria = self.where(
:run_at => {"$lte" => right_now},
:failed_at => nil
).any_of(
{ :locked_by => worker.name },
{ :locked_at => nil },
{ :locked_at => { '$lt' => (right_now - max_run_time) }}
]
)

begin
result = self.db.collection(self.collection.name).find_and_modify(
:query => conditions,
:sort => [['locked_by', -1], ['priority', 1], ['run_at', 1]],
:update => {"$set" => {:locked_at => right_now, :locked_by => worker.name}}
)
criteria = criteria.where(:priority => {"$gte" => Worker.min_priority.to_i}) if Worker.min_priority
criteria = criteria.where(:priority => {"$lte" => Worker.max_priority.to_i}) if Worker.max_priority
criteria = criteria.any_in(:queue => Worker.queues) if Worker.queues.any?

# Return result as a Mongoid document.
# When Mongoid starts supporting findAndModify, this extra step should no longer be necessary.
self.find(result["_id"]) unless result.nil?
rescue Mongo::OperationFailure
nil # no jobs available
end
criteria.desc(:locked_by).asc(:priority).asc(:run_at).find_and_modify(
"$set" => {:locked_at => right_now, :locked_by => worker.name}
)
end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
self.collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
self.where(:locked_by => worker_name).update_all({:locked_at => nil, :locked_by => nil})
end

def reload(*args)
Expand Down
26 changes: 1 addition & 25 deletions spec/delayed_job_mongoid_spec.rb
Expand Up @@ -2,28 +2,4 @@

describe Delayed::Backend::Mongoid::Job do
it_should_behave_like 'a delayed_job backend'

describe "before_fork" do
after do
::Mongoid.master.connection.close
end

it "should disconnect" do
lambda do
Delayed::Backend::Mongoid::Job.before_fork
end.should change { !!Mongoid.master.connection.connected? }.from(true).to(false)
end
end

describe "after_fork" do
before do
::Mongoid.master.connection.close
end

it "should call reconnect" do
lambda do
Delayed::Backend::Mongoid::Job.after_fork
end.should change { !!Mongoid.master.connection.connected? }.from(false).to(true)
end
end
end
end
2 changes: 1 addition & 1 deletion spec/spec_helper.rb
Expand Up @@ -7,7 +7,7 @@
require 'delayed/backend/shared_spec'

Mongoid.configure do |config|
config.master = config.master = Mongo::Connection.new.db('dl_spec')
config.connect_to("dl_spec")
end

class Story
Expand Down

0 comments on commit b24f871

Please sign in to comment.