Permalink
Browse files

* new datamapper backend for delayed job

  • Loading branch information...
1 parent 3a7c29e commit 3a804888cc131b90287a534622716b36077a15fc @lpetre lpetre committed with bkeepers Mar 25, 2010
Showing with 154 additions and 0 deletions.
  1. +100 −0 lib/delayed/backend/datamapper.rb
  2. +54 −0 spec/datamapper_job_spec.rb
View
100 lib/delayed/backend/datamapper.rb
@@ -0,0 +1,100 @@
+require 'dm-core'
+require 'dm-observer'
+require 'dm-aggregates'
+
+module DataMapper
+ module Resource
+ module ClassMethods
+ def load_for_delayed_job(id)
+ find!(id)
+ end
+ end
+
+ module InstanceMethods
+ def dump_for_delayed_job
+ "#{self.class};#{id}"
+ end
+ end
+ end
+end
+
+module Delayed
+ module Backend
+ module DataMapper
+ class Job
+ include ::DataMapper::Resource
+ include Delayed::Backend::Base
+
+ storage_names[:default] = 'delayed_jobs'
+
+ property :id, Serial
+ property :priority, Integer, :default => 0
+ property :attempts, Integer, :default => 0
+ property :handler, String
+ property :run_at, Time
+ property :locked_at, Time
+ property :locked_by, String
+ property :failed_at, Time
+ property :last_error, String
+
+ def self.db_time_now
+ Time.now.utc
+ end
+
+ def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
+ where = "this.run_at <= new Date(#{db_time_now.to_f * 1000}) && (this.locked_at == null || this.locked_at < new Date(#{(db_time_now - max_run_time).to_f * 1000})) || this.locked_by == #{worker_name.to_json}"
+ # all(:limit => limit, :failed_at => nil, '$where' => where)
+
+ complex = all(:run_at.lte => db_time_now) &
+ (( all(:locked_at => nil ) | all(:locked_at.lt => db_time_now - max_run_time)) |
+ all(:locked_by => worker_name));
+
+ complex.all( :limit => limit,
+ :failed_at => nil,
+ :order => [:priority.asc, :run_at.asc] )
+
+ end
+
+ # When a worker is exiting, make sure we don't have any locked jobs.
+ def self.clear_locks!(worker_name)
+ all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil)
+ end
+
+ # Lock this job for this worker.
+ # Returns true if we have the lock, false otherwise.
+ def lock_exclusively!(max_run_time, worker = worker_name)
+ now = self.class.db_time_now
+ overtime = now - max_run_time
+
+ # FIXME - this is a bit gross
+ # DM doesn't give us the number of rows affected by a collection update
+ # so we have to circumvent some niceness in DM::Collection here
+ collection = locked_by != worker ?
+ (self.class.all(:id => id, :run_at.lte => now) & ( self.class.all(:locked_at => nil) | self.class.all(:locked_at.lt => overtime) ) ) :
+ self.class.all(:id => id, :locked_by => worker)
+
+ attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes
+ affected_rows = self.repository.update(attributes, collection)
+
+ if affected_rows == 1
+ self.locked_at = now
+ self.locked_by = worker
+ return true
+ else
+ return false
+ end
+ end
+ end
+
+ class JobObserver
+ include ::DataMapper::Observer
+
+ observe Job
+
+ before :save do
+ self.run_at ||= self.class.db_time_now
+ end
+ end
+ end
+ end
+end
View
54 spec/datamapper_job_spec.rb
@@ -0,0 +1,54 @@
+require 'spec_helper'
+
+require 'delayed/backend/datamapper'
+
+DataMapper.logger = ActiveRecord::Base.logger
+DataMapper.setup(:default, "sqlite3::memory:")
+
+module Delayed
+ module Backend
+ module DataMapper
+ class Job
+ def self.find id
+ get id
+ end
+
+ def update_attributes(attributes)
+ self.update attributes
+ self.save
+ end
+ end
+ end
+ end
+end
+
+describe Delayed::Backend::DataMapper::Job do
+ before(:all) do
+ @backend = Delayed::Backend::DataMapper::Job
+ end
+
+ before(:each) do
+ # reset database before each example is run
+ DataMapper.auto_migrate!
+ end
+
+ it_should_behave_like 'a backend'
+
+ describe "delayed method" do
+ class DMStoryReader
+ def read(story)
+ "Epilog: #{story.tell}"
+ end
+ end
+
+ class DMStory
+ include DataMapper::Resource
+ property :id, Serial
+ property :text, String
+
+ def tell
+ text
+ end
+ end
+ end
+end

0 comments on commit 3a80488

Please sign in to comment.