Permalink
Browse files

Extracted Mongoid backend from delayed_job

  • Loading branch information...
0 parents commit d589a6ae39c86737f449a57fb5709380d28a5fbe @bkeepers bkeepers committed Sep 4, 2010
21 .gitignore
@@ -0,0 +1,21 @@
+## MAC OS
+.DS_Store
+
+## TEXTMATE
+*.tmproj
+tmtags
+
+## EMACS
+*~
+\#*
+.\#*
+
+## VIM
+*.swp
+
+## PROJECT::GENERAL
+coverage
+rdoc
+pkg
+
+## PROJECT::SPECIFIC
9 Gemfile
@@ -0,0 +1,9 @@
+source 'http://rubygems.org'
+gem 'delayed_job', :git => 'git://github.com/collectiveidea/delayed_job.git'
+gem 'bson_ext'
+gem 'mongoid', :git => 'git://github.com/mongoid/mongoid.git'
+
+group :development do
+ gem 'rspec'
+ gem 'rake'
+end
47 Gemfile.lock
@@ -0,0 +1,47 @@
+GIT
+ remote: git://github.com/collectiveidea/delayed_job.git
+ revision: 50d0d808ededd0a13eaf4f59dd2a6ed1e70d34b1
+ specs:
+ delayed_job (2.1.0.pre)
+ daemons
+
+GIT
+ remote: git://github.com/mongoid/mongoid.git
+ revision: 5e6466e
+ specs:
+ mongoid (2.0.0.beta.17)
+ activemodel (~> 3.0.0)
+ bson (= 1.0.4)
+ mongo (= 1.0.7)
+ tzinfo (~> 0.3.22)
+ will_paginate (~> 3.0.pre)
+
+GEM
+ remote: http://rubygems.org/
+ specs:
+ activemodel (3.0.0)
+ activesupport (= 3.0.0)
+ builder (~> 2.1.2)
+ i18n (~> 0.4.1)
+ activesupport (3.0.0)
+ bson (1.0.4)
+ bson_ext (1.0.4)
+ builder (2.1.2)
+ daemons (1.1.0)
+ i18n (0.4.1)
+ mongo (1.0.7)
+ bson (>= 1.0.4)
+ rake (0.8.7)
+ rspec (1.3.0)
+ tzinfo (0.3.23)
+ will_paginate (3.0.pre2)
+
+PLATFORMS
+ ruby
+
+DEPENDENCIES
+ bson_ext
+ delayed_job!
+ mongoid!
+ rake
+ rspec
20 LICENSE
@@ -0,0 +1,20 @@
+Copyright © 2010 Collective Idea
+
+Permission is hereby granted, free of charge, to any person obtaining
+a copy of this software and associated documentation files (the
+"Software"), to deal in the Software without restriction, including
+without limitation the rights to use, copy, modify, merge, publish,
+distribute, sublicense, and/or sell copies of the Software, and to
+permit persons to whom the Software is furnished to do so, subject to
+the following conditions:
+
+The above copyright notice and this permission notice shall be
+included in all copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
+EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
+NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
+LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
+OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
+WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
11 README.md
@@ -0,0 +1,11 @@
+# delayed_job Mongoid backend
+
+## Note on Patches/Pull Requests
+
+* Fork the project.
+* Make your feature addition or bug fix.
+* Add tests for it. This is important so I don't break it in a
+ future version unintentionally.
+* Commit, do not mess with rakefile, version, or history.
+ (if you want to have your own version, that is fine but bump version in a commit by itself I can ignore when I pull)
+* Send me a pull request. Bonus points for topic branches.
27 Rakefile
@@ -0,0 +1,27 @@
+require 'rubygems'
+require 'bundler/setup'
+require 'rake'
+
+require 'spec/rake/spectask'
+Spec::Rake::SpecTask.new(:spec) do |spec|
+ spec.libs << 'lib' << 'spec'
+ spec.spec_files = FileList['spec/**/*_spec.rb']
+end
+
+Spec::Rake::SpecTask.new(:rcov) do |spec|
+ spec.libs << 'lib' << 'spec'
+ spec.pattern = 'spec/**/*_spec.rb'
+ spec.rcov = true
+end
+
+task :default => :spec
+
+require 'rake/rdoctask'
+Rake::RDocTask.new do |rdoc|
+ version = File.exist?('VERSION') ? File.read('VERSION') : ""
+
+ rdoc.rdoc_dir = 'rdoc'
+ rdoc.title = "delayed_job_mongoid #{version}"
+ rdoc.rdoc_files.include('README*')
+ rdoc.rdoc_files.include('lib/**/*.rb')
+end
21 delayed_job_mongoid.gemspec
@@ -0,0 +1,21 @@
+# -*- encoding: utf-8 -*-
+
+Gem::Specification.new do |s|
+ s.name = 'delayed_job_mongoid'
+ s.summary = "Mongoid backend for delayed_job"
+ s.version = '1.0.0.rc'
+ s.authors = 'Brandon Keepers'
+ s.date = Date.today.to_s
+ s.email = 'brandon@collectiveidea.com'
+ s.extra_rdoc_files = ["LICENSE", "README.md"]
+ s.files = Dir.glob("{lib,spec}/**/*") + %w[LICENSE README.md]
+ s.homepage = 'http://github.com/collectiveidea/delayed_job_mongoid'
+ s.rdoc_options = ['--charset=UTF-8']
+ s.require_paths = ['lib']
+ s.test_files = Dir.glob('spec/**/*')
+
+ s.add_runtime_dependency 'mongoid', '~> 2.0'
+ s.add_runtime_dependency 'delayed_job', '~> 2.1'
+ s.add_development_dependency 'rspec', '>= 1.2.9'
+end
+
82 lib/delayed/backend/mongoid.rb
@@ -0,0 +1,82 @@
+module Delayed
+ module Backend
+ module Mongoid
+ class Job
+ include ::Mongoid::Document
+ include ::Mongoid::Timestamps
+ include Delayed::Backend::Base
+ field :priority, :type=> Integer, :default => 0
+ field :attempts, :type=> Integer, :default => 0
+ field :handler, :type=> String
+ field :run_at, :type=> Time
+ field :locked_at, :type=> Time
+ field :locked_by, :type=> String, :index => true
+ field :failed_at, :type=> Time
+ field :last_error, :type=> String
+
+
+ before_save :set_default_run_at
+
+ def self.before_fork
+ ::Mongoid.master.connection.close
+ end
+
+ def self.after_fork
+ ::Mongoid.master.connection.connect_to_master
+ end
+
+ def self.db_time_now
+ Time.now.utc
+ end
+
+ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
+ right_now = db_time_now
+
+ conditions = {:run_at => {"$lte" => right_now}, :failed_at => nil}
+ (conditions[:priority] ||= {})['$gte'] = Worker.min_priority.to_i if Worker.min_priority
+ (conditions[:priority] ||= {})['$lte'] = Worker.max_priority.to_i if Worker.max_priority
+
+
+ where = "this.locked_at == null || this.locked_at < #{make_date(right_now - max_run_time)}"
+ results = self.where(conditions.merge(:locked_by => worker_name)).limit(-limit).order_by([['priority', 1], ['run_at', 1]]).to_a
+ results += self.where(conditions.merge('$where' => where)).limit(-limit+results.size).order_by([['priority', 1], ['run_at', 1]]).to_a if results.size < limit
+ results
+ end
+ # When a worker is exiting, make sure we don't have any locked jobs.
+ def self.clear_locks!(worker_name)
+ self.collection.update({:locked_by => worker_name}, {"$set" => {:locked_at => nil, :locked_by => nil}}, :multi => true)
+ end
+
+ # Lock this job for this worker.
+ # Returns true if we have the lock, false otherwise.
+ def lock_exclusively!(max_run_time, worker = worker_name)
+ right_now = self.class.db_time_now
+ overtime = right_now - max_run_time.to_i
+
+ query = "this.locked_at == null || this.locked_at < #{make_date(overtime)} || this.locked_by == #{worker.to_json}"
+ conditions = {:_id => id, :run_at => {"$lte" => right_now}, "$where" => query}
+
+ self.collection.update(conditions, {"$set" => {:locked_at => right_now, :locked_by => worker}})
+ affected_rows = self.collection.find({:_id => id, :locked_by => worker}).count
+ if affected_rows == 1
+ self.locked_at = right_now
+ self.locked_by = worker
+ return true
+ else
+ return false
+ end
+ end
+
+ private
+
+ def self.make_date(date_or_seconds)
+ "new Date(#{date_or_seconds.to_f * 1000})"
+ end
+
+ def make_date(date)
+ self.class.make_date(date)
+ end
+ end
+ end
+ end
+end
16 lib/delayed/serialization/mongoid.rb
@@ -0,0 +1,16 @@
+Mongoid::Document.class_eval do
+ yaml_as "tag:ruby.yaml.org,2002:Mongoid"
+
+ def self.yaml_new(klass, tag, val)
+ begin
+ klass.find(val['attributes']['_id'])
+ rescue Mongoid::Errors::DocumentNotFound
+ nil
+ end
+ end
+
+ def to_yaml_properties
+ ['@attributes']
+ end
+end
+
6 lib/delayed_job_mongoid.rb
@@ -0,0 +1,6 @@
+require 'mongoid'
+require 'delayed_job'
+require 'delayed/serialization/mongoid'
+require 'delayed/backend/mongoid'
+
+Delayed::Worker.backend = :mongoid
29 spec/delayed_job_mongoid_spec.rb
@@ -0,0 +1,29 @@
+require 'spec_helper'
+
+describe Delayed::Backend::Mongoid::Job do
+ it_should_behave_like 'a delayed_job backend'
+
+ describe "before_fork" do
+ after do
+ ::Mongoid.master.connection.close
+ end
+
+ it "should disconnect" do
+ lambda do
+ Delayed::Backend::Mongoid::Job.before_fork
+ end.should change { !!Mongoid.master.connection.connected? }.from(true).to(false)
+ end
+ end
+
+ describe "after_fork" do
+ before do
+ ::Mongoid.master.connection.close
+ end
+
+ it "should call reconnect" do
+ lambda do
+ Delayed::Backend::Mongoid::Job.after_fork
+ end.should change { !!Mongoid.master.connection.connected? }.from(false).to(true)
+ end
+ end
+end
1 spec/spec.opts
@@ -0,0 +1 @@
+--color
26 spec/spec_helper.rb
@@ -0,0 +1,26 @@
+require 'rubygems'
+require 'bundler/setup'
+$LOAD_PATH.unshift(File.dirname(__FILE__))
+$LOAD_PATH.unshift(File.join(File.dirname(__FILE__), '..', 'lib'))
+require 'spec'
+require 'spec/autorun'
+require 'delayed_job_mongoid'
+require 'delayed/backend/shared_spec'
+
+Spec::Runner.configure do |config|
+
+end
+
+Mongoid.configure do |config|
+ config.master = config.master = Mongo::Connection.new.db('dl_spec')
+end
+
+class Story
+ include ::Mongoid::Document
+ def tell; text; end
+ def whatever(n, _); tell*n; end
+ def self.count; end
+
+ handle_asynchronously :whatever
+end
+

0 comments on commit d589a6a

Please sign in to comment.