Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

check in fixes for scheduled at and stuff

  • Loading branch information...
commit 7f0ebd0c13951c0096d80ebfba19cfcba00c1f0a 1 parent 98435f5
@gnufied authored
View
8 lib/backgroundrb/bdrb_config.rb
@@ -24,9 +24,15 @@ def self.parse_cmd_options(argv)
def self.read_config(config_file)
config = YAML.load(ERB.new(IO.read(config_file)).result)
environment = ENV["RAILS_ENV"] || config[:backgroundrb][:environment] || "development"
- silence_warnings do
+
+ if respond_to?(:silence_warnings)
+ silence_warnings do
+ Object.const_set("RAILS_ENV",environment)
+ end
+ else
Object.const_set("RAILS_ENV",environment)
end
+
ENV["RAILS_ENV"] = environment
config
end
View
16 lib/backgroundrb/bdrb_job_queue.rb
@@ -4,9 +4,10 @@ def self.find_next(worker_name,worker_key = nil)
returned_job = nil
transaction do
unless worker_key
- t_job = find(:first,:conditions => { :worker_name => worker_name,:taken => 0},:lock => true)
+ #use ruby time stamps for time calculations as db might have different times than what is calculated by ruby/rails
+ t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND scheduled_at <= ? ", worker_name, 0, Time.now.utc ],:lock => true)
else
- t_job = find(:first,:conditions => { :worker_name => worker_name,:taken => 0,:worker_key => worker_key },:lock => true)
+ t_job = find(:first,:conditions => [" worker_name = ? AND taken = ? AND worker_key = ? AND scheduled_at <= ? ", worker_name, 0, worker_key, Time.now.utc ],:lock => true)
end
if t_job
t_job.taken = 1
@@ -34,12 +35,23 @@ def self.insert_job(options = { })
end
end
+ def self.remove_job(options = { })
+ transaction do
+ t_job_id = find(:first, :conditions => options.merge(:finished => 0,:taken => 0),:lock => true)
+ delete(t_job_id)
+ end
+ end
+
def finish!
self.class.transaction do
self.finished = 1
self.finished_at = Time.now
+ self.job_key = "finished_#{Time.now.to_i}_#{job_key}"
self.save
end
+ Thread.current[:persistent_job_id] = nil
+ Thread.current[:job_key] = nil
+ nil
end
end
View
14 lib/backgroundrb/rails_worker_proxy.rb
@@ -13,7 +13,8 @@ def method_missing(method_id,*args)
worker_method = method_id.to_s
arguments = args.first
- arg,job_key,host_info = arguments && arguments.values_at(:arg,:job_key,:host)
+ arg,job_key,host_info,scheduled_at = arguments && arguments.values_at(:arg,:job_key,:host,:scheduled_at)
+ new_schedule = (scheduled_at && scheduled_at.respond_to?(:utc)) ? scheduled_at.utc : Time.now.utc
if worker_method =~ /^async_(\w+)/
method_name = $1
@@ -26,7 +27,12 @@ def method_missing(method_id,*args)
marshalled_args = Marshal.dump(arg)
enqueue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,
:worker_method => method_name.to_s,:job_key => job_key.to_s,
- :args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil))
+ :args => marshalled_args,:timeout => arguments ? arguments[:timeout] : nil,:scheduled_at => new_schedule))
+ elsif worker_method =~ /^deq_(\w+)/i
+ raise NoJobKey.new("Must specify a job key to dequeue tasks") if job_key.blank?
+ method_name = $1
+ dequeue_task(compact(:worker_name => worker_name.to_s,:worker_key => worker_key.to_s,
+ :worker_method => method_name.to_s,:job_key => job_key.to_s))
else
worker_options = compact(:worker => worker_name,:worker_key => worker_key,
:worker_method => worker_method,:job_key => job_key,:arg => arg)
@@ -38,6 +44,10 @@ def enqueue_task options = {}
BdrbJobQueue.insert_job(options)
end
+ def dequeue_task options = {}
+ BdrbJobQueue.remove_job(options)
+ end
+
def run_method host_info,method_name,worker_options = {}
result = []
connection = choose_connection(host_info)
View
13 server/lib/meta_worker.rb
@@ -5,20 +5,27 @@ class PacketLogger
def initialize(worker,log_flag = true)
@log_flag = log_flag
@worker = worker
+ @log_mutex = Mutex.new
end
def info(p_data)
return unless @log_flag
- @worker.send_request(:worker => :log_worker, :data => p_data)
+ @log_mutex.synchronize do
+ @worker.send_request(:worker => :log_worker, :data => p_data)
+ end
end
def debug(p_data)
return unless @log_flag
- @worker.send_request(:worker => :log_worker, :data => p_data)
+ @log_mutex.synchronize do
+ @worker.send_request(:worker => :log_worker, :data => p_data)
+ end
end
def error(p_data)
return unless @log_flag
- @worker.send_request(:worker => :log_worker, :data => p_data)
+ @log_mutex.synchronize do
+ @worker.send_request(:worker => :log_worker, :data => p_data)
+ end
end
end
# == MetaWorker class
View
20 tasks/backgroundrb_tasks.rake
@@ -1,5 +1,5 @@
namespace :backgroundrb do
- def setup_queue_migration
+ def generate_queue_migration
config_file = "#{RAILS_ROOT}/config/database.yml"
require "erb"
require "active_record"
@@ -22,6 +22,7 @@ namespace :backgroundrb do
t.column :started_at, :datetime
t.column :finished_at, :datetime
t.column :archived_at, :datetime
+ t.column :scheduled_at, :datetime
t.column :tag, :string
t.column :submitter_info, :string
t.column :runner_info, :string
@@ -33,7 +34,17 @@ namespace :backgroundrb do
drop_table :bdrb_job_queues
end
end
- migration_klass.up
+ migration_klass
+ end
+
+ def setup_queue_migration
+ generate_queue_migration.up
+ end
+
+ def redo_queue_migration
+ klass = generate_queue_migration
+ klass.down
+ klass.up
end
require 'yaml'
@@ -90,6 +101,11 @@ namespace :backgroundrb do
setup_queue_migration
end
+ desc "Drops and recreate backgroundrb queue table"
+ task :redo_queue do
+ redo_queue_migration
+ end
+
desc 'update backgroundrb config files from your rails application'
task :update do
temp_scripts = ["backgroundrb","load_worker_env.rb"].map {|x| "#{RAILS_ROOT}/script/#{x}"}
View
2  test/client/backgroundrb.yml
@@ -3,7 +3,7 @@
:ip: 0.0.0.0
:port: 11008
:log: foreground
- :environment: production
+ :environment: development
:result_storage: memcache
:memcache: "localhost:11211"
View
5 test/client/test_bdrb_config.rb
@@ -11,9 +11,10 @@
end
specify "should setup correct environment from conf file" do
+ ENV["RAILS_ENV"] = nil
BackgrounDRb::Config.parse_cmd_options([])
BackgrounDRb::Config.read_config(conf_file)
- ENV["RAILS_ENV"].should == "production"
- RAILS_ENV.should == "production"
+ ENV["RAILS_ENV"].should == "development"
+ RAILS_ENV.should == "development"
end
end
View
39 test/client/test_bdrb_job_queue.rb
@@ -3,5 +3,44 @@
require "bdrb_job_queue"
context "For BackgrounDRb job Queues" do
+ setup do
+ db_config_file = YAML.load(ERB.new(IO.read("#{RAILS_HOME}/config/database.yml")).result)
+ ActiveRecord::Base.establish_connection(db_config_file["test"])
+ BdrbJobQueue.destroy_all
+ end
+ specify "should insert job with proper params" do
+ BdrbJobQueue.insert_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats",:args => "hello_world",:scheduled_at => Time.now.utc)
+ next_job = BdrbJobQueue.find_next("hello_world")
+ next_job.taken.should == 1
+ next_job.started_at.should.not.be nil
+ next_job.job_key.should == "cats"
+ next_job.worker_name.should == "hello_world"
+ next_job.worker_method.should == "foovar"
+ end
+
+ specify "release_job should worker properly" do
+ BdrbJobQueue.insert_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats",:args => "hello_world",:scheduled_at => Time.now.utc)
+ next_job = BdrbJobQueue.find_next("hello_world")
+ next_job.release_job
+ t = BdrbJobQueue.find_by_job_key("cats")
+ t.taken.should == 0
+ t.started_at.should == nil
+ end
+
+ specify "remove job should work properly" do
+ BdrbJobQueue.insert_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats",:args => "hello_world",:scheduled_at => Time.now.utc)
+ BdrbJobQueue.remove_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats")
+ t = BdrbJobQueue.find_by_job_key("cats")
+ t.should == nil
+ end
+
+ specify "finish should work properly" do
+ BdrbJobQueue.insert_job(:worker_name => "hello_world",:worker_method => "foovar",:job_key => "cats",:args => "hello_world",:scheduled_at => Time.now.utc)
+ t = BdrbJobQueue.find_next("hello_world")
+ t.finish!
+ t.finished.should == 1
+ t.finished_at.should.not == nil
+ t.job_key.should.match(/finished_\d+_cats/i)
+ end
end
View
28 test/client/test_worker_proxy.rb
@@ -11,6 +11,7 @@
@cluster_conn = mock
@worker_proxy = BackgrounDRb::RailsWorkerProxy.new(:hello_worker,nil,@cluster_conn)
end
+
specify "should let you fetch results" do
@cluster_conn.expects(:backend_connections).returns([])
foo = @worker_proxy.ask_result(:foobar)
@@ -34,6 +35,13 @@
end
specify "delete method should run on all nodes" do
+ conn_array = (0..3).map do |i|
+ t = mock()
+ t.expects(:delete_worker).with(:worker => :hello_worker).returns(nil)
+ t
+ end
+ @cluster_conn.expects(:backend_connections).returns(conn_array)
+ @worker_proxy.delete
end
specify "should let you invoke worker_info method" do
@@ -54,6 +62,26 @@
@cluster_conn.expects(:find_connection).returns(actual_conn)
@worker_proxy.async_foobar(:arg => :hello,:job_key => "boy",
:host => "192.168.2.100:100")
+ end
+
+ specify "for enqueued tasks" do
+ BdrbJobQueue = mock() unless Object.const_defined?(:BdrbJobQueue)
+ BdrbJobQueue.expects(:insert_job).with() { |value|
+ value[:worker_name].should == "hello_worker"
+ value[:worker_method].should == "foobar"
+ value[:scheduled_at].should.not == nil
+ value[:job_key] == "catz"
+ }
+ @worker_proxy.enq_foobar(:arg => :hello,:job_key => "catz")
+ end
+ specify "for removing tasks from the queue" do
+ BdrbJobQueue = mock() unless Object.const_defined?(:BdrbJobQueue)
+ BdrbJobQueue.expects(:remove_job).with() do |value|
+ value[:worker_name] == "hello_worker"
+ value[:worker_method] == "foobar"
+ value[:job_key] == "catz"
+ end
+ @worker_proxy.deq_foobar(:job_key => "catz")
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.