Skip to content

Commit

Permalink
* new datamapper backend for delayed job
Browse files Browse the repository at this point in the history
  • Loading branch information
lpetre authored and bkeepers committed Mar 26, 2010
1 parent 3a7c29e commit 3a80488
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 0 deletions.
100 changes: 100 additions & 0 deletions lib/delayed/backend/datamapper.rb
@@ -0,0 +1,100 @@
require 'dm-core'
require 'dm-observer'
require 'dm-aggregates'

module DataMapper
module Resource
module ClassMethods
def load_for_delayed_job(id)
find!(id)
end
end

module InstanceMethods
def dump_for_delayed_job
"#{self.class};#{id}"
end
end
end
end

module Delayed
module Backend
module DataMapper
class Job
include ::DataMapper::Resource
include Delayed::Backend::Base

storage_names[:default] = 'delayed_jobs'

property :id, Serial
property :priority, Integer, :default => 0
property :attempts, Integer, :default => 0
property :handler, String
property :run_at, Time
property :locked_at, Time
property :locked_by, String
property :failed_at, Time
property :last_error, String

def self.db_time_now
Time.now.utc
end

def self.find_available(worker_name, limit = 5, max_run_time = Worker.max_run_time)
where = "this.run_at <= new Date(#{db_time_now.to_f * 1000}) && (this.locked_at == null || this.locked_at < new Date(#{(db_time_now - max_run_time).to_f * 1000})) || this.locked_by == #{worker_name.to_json}"
# all(:limit => limit, :failed_at => nil, '$where' => where)

complex = all(:run_at.lte => db_time_now) &
(( all(:locked_at => nil ) | all(:locked_at.lt => db_time_now - max_run_time)) |
all(:locked_by => worker_name));

complex.all( :limit => limit,
:failed_at => nil,
:order => [:priority.asc, :run_at.asc] )

end

# When a worker is exiting, make sure we don't have any locked jobs.
def self.clear_locks!(worker_name)
all(:locked_by => worker_name).update(:locked_at => nil, :locked_by => nil)
end

# Lock this job for this worker.
# Returns true if we have the lock, false otherwise.
def lock_exclusively!(max_run_time, worker = worker_name)
now = self.class.db_time_now
overtime = now - max_run_time

# FIXME - this is a bit gross
# DM doesn't give us the number of rows affected by a collection update
# so we have to circumvent some niceness in DM::Collection here
collection = locked_by != worker ?
(self.class.all(:id => id, :run_at.lte => now) & ( self.class.all(:locked_at => nil) | self.class.all(:locked_at.lt => overtime) ) ) :
self.class.all(:id => id, :locked_by => worker)

attributes = collection.model.new(:locked_at => now, :locked_by => worker).dirty_attributes
affected_rows = self.repository.update(attributes, collection)

if affected_rows == 1
self.locked_at = now
self.locked_by = worker
return true
else
return false
end
end
end

class JobObserver
include ::DataMapper::Observer

observe Job

before :save do
self.run_at ||= self.class.db_time_now
end
end
end
end
end
54 changes: 54 additions & 0 deletions spec/datamapper_job_spec.rb
@@ -0,0 +1,54 @@
require 'spec_helper'

require 'delayed/backend/datamapper'

DataMapper.logger = ActiveRecord::Base.logger
DataMapper.setup(:default, "sqlite3::memory:")

module Delayed
module Backend
module DataMapper
class Job
def self.find id
get id
end

def update_attributes(attributes)
self.update attributes
self.save
end
end
end
end
end

describe Delayed::Backend::DataMapper::Job do
before(:all) do
@backend = Delayed::Backend::DataMapper::Job
end

before(:each) do
# reset database before each example is run
DataMapper.auto_migrate!
end

it_should_behave_like 'a backend'

describe "delayed method" do
class DMStoryReader
def read(story)
"Epilog: #{story.tell}"
end
end

class DMStory
include DataMapper::Resource
property :id, Serial
property :text, String

def tell
text
end
end
end
end

0 comments on commit 3a80488

Please sign in to comment.